Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public SearchPhaseName getSearchPhaseName() {
/**
* Returns an Optional of the SearchPhase name as {@link SearchPhaseName}. If there's not a matching SearchPhaseName,
* returns an empty Optional.
* @return {@link Optional<SearchPhaseName>}
* @return {@link Optional} of {@link SearchPhaseName}
*/
public Optional<SearchPhaseName> getSearchPhaseNameOptional() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,12 @@ public void startReplication(ActionListener<Void> listener) {

checkpointInfoListener.whenComplete(checkpointInfo -> {
ReplicationCheckpoint getMetadataCheckpoint = checkpointInfo.getCheckpoint();
if (indexShard.indexSettings().isSegRepLocalEnabled() && checkpoint.isAheadOf(getMetadataCheckpoint)) {
// Only enforce strict checkpoint validation during normal replication, not during recovery.
// During recovery (shard is INITIALIZING or RELOCATING), the replica may have a stale checkpoint
// from before a restart, and should accept the primary's current state even if it appears older.
// See: https://github.com/opensearch-project/OpenSearch/issues/19234
boolean isRecovering = indexShard.routingEntry().initializing() || indexShard.routingEntry().relocating();
if (indexShard.indexSettings().isSegRepLocalEnabled() && checkpoint.isAheadOf(getMetadataCheckpoint) && !isRecovering) {
// Fixes https://github.com/opensearch-project/OpenSearch/issues/18490
listener.onFailure(
new ReplicationFailedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
Expand Down Expand Up @@ -540,6 +541,146 @@ private static void deleteContent(Directory directory) throws IOException {
ExceptionsHelper.rethrowAndSuppress(exceptions);
}

/**
* Test that stale checkpoint is rejected during normal replication when shard is active.
* Addresses: https://github.com/opensearch-project/OpenSearch/issues/18490
*/
public void testStaleCheckpointRejected_duringNormalReplication() throws IOException {
// Create a newer checkpoint (higher segmentInfosVersion)
ReplicationCheckpoint newerCheckpoint = new ReplicationCheckpoint(
repCheckpoint.getShardId(),
repCheckpoint.getPrimaryTerm(),
repCheckpoint.getSegmentsGen(),
200L, // higher segmentInfosVersion
repCheckpoint.getCodec()
);

// Source returns an older checkpoint
SegmentReplicationSource segrepSource = new TestReplicationSource() {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
// Return the older checkpoint (repCheckpoint with version 100)
listener.onResponse(new CheckpointInfoResponse(repCheckpoint, SI_SNAPSHOT, buffer.toArrayCopy()));
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
BiConsumer<String, Long> fileProgressTracker,
ActionListener<GetSegmentFilesResponse> listener
) {
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
}
};

SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock(
SegmentReplicationTargetService.SegmentReplicationListener.class
);

// Mock ShardRouting as active (not initializing, not relocating)
ShardRouting mockRouting = mock(ShardRouting.class);
when(mockRouting.initializing()).thenReturn(false);
when(mockRouting.relocating()).thenReturn(false);
when(spyIndexShard.routingEntry()).thenReturn(mockRouting);

// Create target with newer checkpoint
segrepTarget = new SegmentReplicationTarget(spyIndexShard, newerCheckpoint, segrepSource, segRepListener);

// Start replication - should fail due to stale checkpoint
segrepTarget.startReplication(new ActionListener<Void>() {
@Override
public void onResponse(Void replicationResponse) {
Assert.fail("Expected replication to fail due to stale checkpoint");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof ReplicationFailedException);
assertTrue(e.getMessage().contains("Rejecting stale metadata checkpoint"));
segrepTarget.fail(new ReplicationFailedException(e), false);
}
});
}

/**
* Test that stale checkpoint is accepted during recovery when shard is initializing.
* Addresses: https://github.com/opensearch-project/OpenSearch/issues/19234
*/
public void testStaleCheckpointAccepted_duringRecovery() throws IOException {
// Create a newer checkpoint (higher segmentInfosVersion)
ReplicationCheckpoint newerCheckpoint = new ReplicationCheckpoint(
repCheckpoint.getShardId(),
repCheckpoint.getPrimaryTerm(),
repCheckpoint.getSegmentsGen(),
200L, // higher segmentInfosVersion
repCheckpoint.getCodec()
);

// Source returns an older checkpoint
SegmentReplicationSource segrepSource = new TestReplicationSource() {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
// Return the older checkpoint (repCheckpoint with version 100)
listener.onResponse(new CheckpointInfoResponse(repCheckpoint, SI_SNAPSHOT, buffer.toArrayCopy()));
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
BiConsumer<String, Long> fileProgressTracker,
ActionListener<GetSegmentFilesResponse> listener
) {
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
}
};

SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock(
SegmentReplicationTargetService.SegmentReplicationListener.class
);

// Mock ShardRouting as initializing (recovering)
ShardRouting mockRouting = mock(ShardRouting.class);
when(mockRouting.initializing()).thenReturn(true);
when(mockRouting.relocating()).thenReturn(false);
when(spyIndexShard.routingEntry()).thenReturn(mockRouting);

// Create target with newer checkpoint
segrepTarget = new SegmentReplicationTarget(spyIndexShard, newerCheckpoint, segrepSource, segRepListener);

// Start replication - should succeed despite stale checkpoint
segrepTarget.startReplication(new ActionListener<Void>() {
@Override
public void onResponse(Void replicationResponse) {
// Success - stale checkpoint was accepted during recovery
try {
verify(spyIndexShard, times(1)).finalizeReplication(any());
segrepTarget.markAsDone();
} catch (IOException ex) {
Assert.fail("Unexpected IOException: " + ex.getMessage());
}
}

@Override
public void onFailure(Exception e) {
Assert.fail("Replication should succeed during recovery despite stale checkpoint: " + e.getMessage());
}
});
}

@Override
public void tearDown() throws Exception {
super.tearDown();
Expand Down
Loading