diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhase.java b/server/src/main/java/org/opensearch/action/search/SearchPhase.java index 351c23fec3d80..4e611ff85a56a 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhase.java @@ -83,7 +83,7 @@ public SearchPhaseName getSearchPhaseName() { /** * Returns an Optional of the SearchPhase name as {@link SearchPhaseName}. If there's not a matching SearchPhaseName, * returns an empty Optional. - * @return {@link Optional} + * @return {@link Optional} of {@link SearchPhaseName} */ public Optional getSearchPhaseNameOptional() { try { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 0e41f89d22358..51ebe80a47848 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -178,7 +178,12 @@ public void startReplication(ActionListener listener) { checkpointInfoListener.whenComplete(checkpointInfo -> { ReplicationCheckpoint getMetadataCheckpoint = checkpointInfo.getCheckpoint(); - if (indexShard.indexSettings().isSegRepLocalEnabled() && checkpoint.isAheadOf(getMetadataCheckpoint)) { + // Only enforce strict checkpoint validation during normal replication, not during recovery. + // During recovery (shard is INITIALIZING or RELOCATING), the replica may have a stale checkpoint + // from before a restart, and should accept the primary's current state even if it appears older. + // See: https://github.com/opensearch-project/OpenSearch/issues/19234 + boolean isRecovering = indexShard.routingEntry().initializing() || indexShard.routingEntry().relocating(); + if (indexShard.indexSettings().isSegRepLocalEnabled() && checkpoint.isAheadOf(getMetadataCheckpoint) && !isRecovering) { // Fixes https://github.com/opensearch-project/OpenSearch/issues/18490 listener.onFailure( new ReplicationFailedException( diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 3804230942430..bc85d92ef05fc 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -27,6 +27,7 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; @@ -540,6 +541,146 @@ private static void deleteContent(Directory directory) throws IOException { ExceptionsHelper.rethrowAndSuppress(exceptions); } + /** + * Test that stale checkpoint is rejected during normal replication when shard is active. + * Addresses: https://github.com/opensearch-project/OpenSearch/issues/18490 + */ + public void testStaleCheckpointRejected_duringNormalReplication() throws IOException { + // Create a newer checkpoint (higher segmentInfosVersion) + ReplicationCheckpoint newerCheckpoint = new ReplicationCheckpoint( + repCheckpoint.getShardId(), + repCheckpoint.getPrimaryTerm(), + repCheckpoint.getSegmentsGen(), + 200L, // higher segmentInfosVersion + repCheckpoint.getCodec() + ); + + // Source returns an older checkpoint + SegmentReplicationSource segrepSource = new TestReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + // Return the older checkpoint (repCheckpoint with version 100) + listener.onResponse(new CheckpointInfoResponse(repCheckpoint, SI_SNAPSHOT, buffer.toArrayCopy())); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + BiConsumer fileProgressTracker, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + + // Mock ShardRouting as active (not initializing, not relocating) + ShardRouting mockRouting = mock(ShardRouting.class); + when(mockRouting.initializing()).thenReturn(false); + when(mockRouting.relocating()).thenReturn(false); + when(spyIndexShard.routingEntry()).thenReturn(mockRouting); + + // Create target with newer checkpoint + segrepTarget = new SegmentReplicationTarget(spyIndexShard, newerCheckpoint, segrepSource, segRepListener); + + // Start replication - should fail due to stale checkpoint + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail("Expected replication to fail due to stale checkpoint"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof ReplicationFailedException); + assertTrue(e.getMessage().contains("Rejecting stale metadata checkpoint")); + segrepTarget.fail(new ReplicationFailedException(e), false); + } + }); + } + + /** + * Test that stale checkpoint is accepted during recovery when shard is initializing. + * Addresses: https://github.com/opensearch-project/OpenSearch/issues/19234 + */ + public void testStaleCheckpointAccepted_duringRecovery() throws IOException { + // Create a newer checkpoint (higher segmentInfosVersion) + ReplicationCheckpoint newerCheckpoint = new ReplicationCheckpoint( + repCheckpoint.getShardId(), + repCheckpoint.getPrimaryTerm(), + repCheckpoint.getSegmentsGen(), + 200L, // higher segmentInfosVersion + repCheckpoint.getCodec() + ); + + // Source returns an older checkpoint + SegmentReplicationSource segrepSource = new TestReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + // Return the older checkpoint (repCheckpoint with version 100) + listener.onResponse(new CheckpointInfoResponse(repCheckpoint, SI_SNAPSHOT, buffer.toArrayCopy())); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + BiConsumer fileProgressTracker, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + + // Mock ShardRouting as initializing (recovering) + ShardRouting mockRouting = mock(ShardRouting.class); + when(mockRouting.initializing()).thenReturn(true); + when(mockRouting.relocating()).thenReturn(false); + when(spyIndexShard.routingEntry()).thenReturn(mockRouting); + + // Create target with newer checkpoint + segrepTarget = new SegmentReplicationTarget(spyIndexShard, newerCheckpoint, segrepSource, segRepListener); + + // Start replication - should succeed despite stale checkpoint + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + // Success - stale checkpoint was accepted during recovery + try { + verify(spyIndexShard, times(1)).finalizeReplication(any()); + segrepTarget.markAsDone(); + } catch (IOException ex) { + Assert.fail("Unexpected IOException: " + ex.getMessage()); + } + } + + @Override + public void onFailure(Exception e) { + Assert.fail("Replication should succeed during recovery despite stale checkpoint: " + e.getMessage()); + } + }); + } + @Override public void tearDown() throws Exception { super.tearDown();