Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ private static Set<String> getPinnedTimestampLockedFiles(
for (Long pinnedTimestamp : pinnedTimestampSet) {
String cachedFile = metadataFilePinnedTimestampMap.get(pinnedTimestamp);
if (cachedFile != null) {
assert metadataFiles.contains(cachedFile) : "Metadata files should contain [" + cachedFile + "]";
implicitLockedFiles.add(cachedFile);
} else {
newPinnedTimestamps.add(pinnedTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public void onResponse(List<BlobMetadata> blobMetadata) {

// Update cache to keep only those metadata files that are not getting deleted
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);
oldFormatMetadataFilePrimaryTermMap.keySet().retainAll(metadataFilesNotToBeDeleted);
// Delete stale primary terms
deleteStaleRemotePrimaryTerms(metadataFilesNotToBeDeleted);
} else {
Expand Down Expand Up @@ -408,9 +409,9 @@ protected Tuple<Long, Long> getMinMaxTranslogGenerationFromMetadataFile(
}
}

private void deleteStaleRemotePrimaryTerms(List<String> metadataFiles) {
private void deleteStaleRemotePrimaryTerms(List<String> metadataFilesNotToBeDeleted) {
deleteStaleRemotePrimaryTerms(
metadataFiles,
metadataFilesNotToBeDeleted,
translogTransferManager,
oldFormatMetadataFilePrimaryTermMap,
minPrimaryTermInRemote,
Expand All @@ -425,7 +426,7 @@ private void deleteStaleRemotePrimaryTerms(List<String> metadataFiles) {
* This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator.
*/
protected static void deleteStaleRemotePrimaryTerms(
List<String> metadataFiles,
List<String> metadataFilesNotToBeDeleted,
TranslogTransferManager translogTransferManager,
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap,
AtomicLong minPrimaryTermInRemoteAtomicLong,
Expand All @@ -434,15 +435,15 @@ protected static void deleteStaleRemotePrimaryTerms(
// The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there
// are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part
// of older primary term.
if (metadataFiles.isEmpty()) {
if (metadataFilesNotToBeDeleted.isEmpty()) {
logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms");
return;
}
Optional<Long> minPrimaryTermFromMetadataFiles = metadataFiles.stream().map(file -> {
Optional<Long> minPrimaryTermFromMetadataFiles = metadataFilesNotToBeDeleted.stream().map(file -> {
try {
return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap).v1();
} catch (IOException e) {
return Long.MAX_VALUE;
return Long.MIN_VALUE;
}
}).min(Long::compareTo);
// First we delete all stale primary terms folders from remote store
Expand All @@ -459,7 +460,7 @@ protected static void deleteStaleRemotePrimaryTerms(
}
}

private static Long getMinPrimaryTermInRemote(
protected static Long getMinPrimaryTermInRemote(
AtomicLong minPrimaryTermInRemote,
TranslogTransferManager translogTransferManager,
Logger logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1083,4 +1083,101 @@ public void testIsPinnedTimestampStateStaleFeatureEnabled() {
setupRemotePinnedTimestampFeature(true);
assertTrue(RemoteStoreUtils.isPinnedTimestampStateStale());
}

public void testGetPinnedTimestampLockedFilesWithCache() {
setupRemotePinnedTimestampFeature(true);

Map<Long, String> metadataFilePinnedTimestampCache = new HashMap<>();

// Pinned timestamps 800, 900, 1000, 2000
// Metadata with timestamp 990, 995, 1000, 1001
// Metadata timestamp 1000 <= Pinned Timestamp 1000
// Metadata timestamp 1001 <= Pinned Timestamp 2000
Tuple<Map<Long, String>, Set<String>> metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
List.of(990L, 995L, 1000L, 1001L),
Set.of(800L, 900L, 1000L, 2000L),
metadataFilePinnedTimestampCache
);
Map<Long, String> metadataFiles = metadataAndLocks.v1();
Set<String> implicitLockedFiles = metadataAndLocks.v2();

assertEquals(2, implicitLockedFiles.size());
assertTrue(implicitLockedFiles.contains(metadataFiles.get(1000L)));
assertTrue(implicitLockedFiles.contains(metadataFiles.get(1001L)));
// Now we cache all the matches except the last one.
assertEquals(1, metadataFilePinnedTimestampCache.size());
assertEquals(metadataFiles.get(1000L), metadataFilePinnedTimestampCache.get(1000L));

metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
List.of(990L, 995L, 1000L, 1001L, 2000L, 2200L),
Set.of(800L, 900L, 1000L, 2000L, 3000L),
metadataFilePinnedTimestampCache
);
metadataFiles = metadataAndLocks.v1();
implicitLockedFiles = metadataAndLocks.v2();
assertEquals(3, implicitLockedFiles.size());
assertTrue(implicitLockedFiles.contains(metadataFiles.get(1000L)));
assertTrue(implicitLockedFiles.contains(metadataFiles.get(2000L)));
assertTrue(implicitLockedFiles.contains(metadataFiles.get(2200L)));
assertEquals(2, metadataFilePinnedTimestampCache.size());
assertEquals(metadataFiles.get(1000L), metadataFilePinnedTimestampCache.get(1000L));
assertEquals(metadataFiles.get(2000L), metadataFilePinnedTimestampCache.get(2000L));

metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
List.of(990L, 995L, 1000L, 1001L, 2000L, 2200L, 2500L),
Set.of(2000L, 3000L),
metadataFilePinnedTimestampCache
);
metadataFiles = metadataAndLocks.v1();
implicitLockedFiles = metadataAndLocks.v2();
assertEquals(2, implicitLockedFiles.size());
assertTrue(implicitLockedFiles.contains(metadataFiles.get(2000L)));
assertTrue(implicitLockedFiles.contains(metadataFiles.get(2500L)));
assertEquals(1, metadataFilePinnedTimestampCache.size());
assertEquals(metadataFiles.get(2000L), metadataFilePinnedTimestampCache.get(2000L));

metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
List.of(2000L, 2200L, 2500L, 3001L, 4200L, 4600L, 5010L),
Set.of(3000L, 4000L, 5000L, 6000L),
metadataFilePinnedTimestampCache
);
metadataFiles = metadataAndLocks.v1();
implicitLockedFiles = metadataAndLocks.v2();
assertEquals(4, implicitLockedFiles.size());
assertTrue(implicitLockedFiles.contains(metadataFiles.get(2500L)));
assertTrue(implicitLockedFiles.contains(metadataFiles.get(3001L)));
assertTrue(implicitLockedFiles.contains(metadataFiles.get(4600L)));
assertTrue(implicitLockedFiles.contains(metadataFiles.get(5010L)));
assertEquals(3, metadataFilePinnedTimestampCache.size());
assertEquals(metadataFiles.get(2500L), metadataFilePinnedTimestampCache.get(3000L));
assertEquals(metadataFiles.get(3001L), metadataFilePinnedTimestampCache.get(4000L));
assertEquals(metadataFiles.get(4600L), metadataFilePinnedTimestampCache.get(5000L));

metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
List.of(),
Set.of(3000L, 4000L, 5000L, 6000L),
metadataFilePinnedTimestampCache
);
implicitLockedFiles = metadataAndLocks.v2();
assertEquals(0, implicitLockedFiles.size());
assertEquals(3, metadataFilePinnedTimestampCache.size());

assertThrows(
AssertionError.class,
() -> testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
List.of(2000L, 2200L, 3001L, 4200L, 4600L, 5010L),
Set.of(3000L, 4000L, 5000L, 6000L),
metadataFilePinnedTimestampCache
)
);

metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
List.of(2000L, 2200L, 2500L, 3001L, 4200L, 4600L, 5010L),
Set.of(),
metadataFilePinnedTimestampCache
);
implicitLockedFiles = metadataAndLocks.v2();
assertEquals(0, implicitLockedFiles.size());
assertEquals(0, metadataFilePinnedTimestampCache.size());
}
}
Loading