Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,13 @@ public void beforeRefresh() throws IOException {}
@Override
public void afterRefresh(boolean didRefresh) {

if (didRefresh) {
if (didRefresh || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) {
updateLocalRefreshTimeAndSeqNo();
}

try {
indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get();
} catch (InterruptedException | ExecutionException e) {
logger.info("Exception occurred while scheduling syncSegments", e);
try {
indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get();
} catch (InterruptedException | ExecutionException e) {
logger.info("Exception occurred while scheduling syncSegments", e);
}
}
}

Expand Down Expand Up @@ -244,9 +243,7 @@ private synchronized void syncSegments(boolean isRetry) {
segmentInfos.getGeneration()
);
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo);
indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);
checkpointPublisher.publish(indexShard, checkpoint);
onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo, lastRefreshedCheckpoint, checkpoint);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
// along with marking minSeqNoToKeep, upload has succeeded completely.
shouldRetry = false;
Expand Down Expand Up @@ -298,7 +295,12 @@ private void beforeSegmentsSync(boolean isRetry) {
segmentTracker.incrementTotalUploadsStarted();
}

private void onSuccessfulSegmentsSync(long refreshTimeMs, long refreshSeqNo) {
private void onSuccessfulSegmentsSync(
long refreshTimeMs,
long refreshSeqNo,
long lastRefreshedCheckpoint,
ReplicationCheckpoint checkpoint
) {
// Update latest uploaded segment files name in segment tracker
segmentTracker.setLatestUploadedFiles(latestFileNameSizeOnLocalMap.keySet());
// Update the remote refresh time and refresh seq no
Expand All @@ -307,6 +309,10 @@ private void onSuccessfulSegmentsSync(long refreshTimeMs, long refreshSeqNo) {
resetBackOffDelayIterator();
// Cancel the scheduled cancellable retry if possible and set it to null
cancelAndResetScheduledCancellableRetry();
// Set the minimum sequence number for keeping translog
indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);
// Publishing the new checkpoint which is used for remote store + segrep indexes
checkpointPublisher.publish(indexShard, checkpoint);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void testNoTranslogHistoryTransferred() throws Exception {
shards.flush();
List<DocIdSeqNoAndSource> docIdAndSeqNosAfterFlush = getDocIdAndSeqNos(primary);
int moreDocs = shards.indexDocs(randomIntBetween(20, 100));
assertEquals(moreDocs, getTranslog(primary).totalOperations());
assertEquals(numDocs + moreDocs, getTranslog(primary).totalOperations());

// Step 2 - Start replica, recovery happens, check docs recovered till last flush
final IndexShard replica = shards.addReplica();
Expand Down