Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fixed the ignore_malformed setting to also ignore objects ([#4494](https://github.com/opensearch-project/OpenSearch/pull/4494))
- [Segment Replication] Ignore lock file when testing cleanupAndPreserveLatestCommitPoint ([#4544](https://github.com/opensearch-project/OpenSearch/pull/4544))
- Updated jackson to 2.13.4 and snakeyml to 1.32 ([#4556](https://github.com/opensearch-project/OpenSearch/pull/4556))
- [Segment Replication] Adding check to make sure checkpoint is not processed when a shard's shard routing is primary ([#4630](https://github.com/opensearch-project/OpenSearch/pull/4630))
- [Bug]: Fixed invalid location of JDK dependency for arm64 architecture([#4613](https://github.com/opensearch-project/OpenSearch/pull/4613))
- [Bug]: Alias filter lost after rollover ([#4499](https://github.com/opensearch-project/OpenSearch/pull/4499))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,10 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints.");
return false;
}
if (this.routingEntry().primary()) {
logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints.");
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;

public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase {

Expand Down Expand Up @@ -208,6 +209,36 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException {
closeShards(shard);
}

/**
* here we are starting a new primary shard and testing that we don't process a checkpoint on a shard when it's shard routing is primary.
*/
public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
IndexShard primaryShard = newStartedShard(true);
SegmentReplicationTargetService sut;
sut = prepareForReplication(primaryShard);
SegmentReplicationTargetService spy = spy(sut);

// Starting a new shard in PrimaryMode and shard routing primary.
IndexShard spyShard = spy(primaryShard);
String id = primaryShard.routingEntry().allocationId().getId();

// Starting relocation handoff
primaryShard.getReplicationTracker().startRelocationHandoff(id);

// Completing relocation handoff.
primaryShard.getReplicationTracker().completeRelocationHandoff();

// Assert that primary shard is no longer in Primary Mode and shard routing is still Primary
assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode());
assertEquals(true, primaryShard.routingEntry().primary());

spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, 0L), spyShard);

// Verify that checkpoint is not processed as shard routing is primary.
verify(spy, times(0)).startReplication(any(), any(), any());
closeShards(primaryShard);
}

public void testReplicaReceivesGenIncrease() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
shards.startAll();
Expand Down