Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
* because the cluster-manager could have failed some of the in-sync shard copies and marked them as stale. That is ok though, as this
* information is conveyed through cluster state updates, and the new primary relocation target will also eventually learn about those.
*/
boolean handoffInProgress;
volatile boolean handoffInProgress;

/**
* Boolean flag that indicates whether a relocation handoff completed (see {@link #completeRelocationHandoff}).
Expand Down Expand Up @@ -846,6 +846,13 @@ public boolean isPrimaryMode() {
return primaryMode;
}

/**
* Returns whether the replication tracker is in primary relocation hand off progress.
*/
public boolean isHandoffInProgress() {
return handoffInProgress;
}

/**
* Returns the current operation primary term.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4380,6 +4380,10 @@ public boolean isStartedPrimary() {
return (getReplicationTracker().isPrimaryMode() && state() == IndexShardState.STARTED);
}

public boolean isHandoffInProgress() {
return (getReplicationTracker().isHandoffInProgress());
}

public boolean enableUploadToRemoteTranslog() {
return isStartedPrimary() || (shouldSeedRemoteStore() && hasOneRemoteSegmentSyncHappened());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,17 @@ public abstract class AbstractSegmentReplicationTarget extends ReplicationTarget
protected final SegmentReplicationSource source;
protected final SegmentReplicationState state;
protected final MultiFileWriter multiFileWriter;
protected final boolean isRetry;

public AbstractSegmentReplicationTarget(
String name,
IndexShard indexShard,
ReplicationCheckpoint checkpoint,
SegmentReplicationSource source,
boolean isRetry,
ReplicationListener listener
) {
super(name, indexShard, new ReplicationLuceneIndex(), listener);
this.checkpoint = checkpoint;
this.source = source;
this.isRetry = isRetry;
this.state = new SegmentReplicationState(
indexShard.routingEntry(),
stateIndex,
Expand Down Expand Up @@ -165,28 +162,6 @@ public void startReplication(ActionListener<Void> listener, BiConsumer<Replicati
getCheckpointMetadata(checkpointInfoListener);

checkpointInfoListener.whenComplete(checkpointInfo -> {
ReplicationCheckpoint getMetadataCheckpoint = checkpointInfo.getCheckpoint();
// Only enforce strict checkpoint validation during normal replication, not during recovery.
// During recovery (shard is INITIALIZING or RELOCATING), the replica may have a stale checkpoint
// from before a restart, and should accept the primary's current state even if it appears older.
// See: https://github.com/opensearch-project/OpenSearch/issues/19234
boolean isRecovering = indexShard.routingEntry().initializing() || indexShard.routingEntry().relocating();
if (indexShard.indexSettings().isSegRepLocalEnabled()
&& checkpoint.isAheadOf(getMetadataCheckpoint)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot remove checkpoint.isAheadOf(getMetadataCheckpoint). isHandoffInProgress handles the relocation race but fails against split-brain or Zombie Primaries returning outdated metadata. Without this check, the replica will accept older metadata causing state rollback (deletion of committed segments). The safety check must stay.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether an existing test would catch this?

&& false == isRecovering
&& false == isRetry) {
// Fixes https://github.com/opensearch-project/OpenSearch/issues/18490
listener.onFailure(
new ReplicationFailedException(
"Rejecting stale metadata checkpoint ["
+ getMetadataCheckpoint
+ "] since initial checkpoint ["
+ checkpoint
+ "] is ahead of it"
)
);
return;
}
updateCheckpoint(checkpointInfo.getCheckpoint(), checkpointUpdater);
final List<StoreFileMetadata> filesToFetch = getFiles(checkpointInfo);
state.setStage(SegmentReplicationState.Stage.GET_FILES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public MergedSegmentReplicationTarget(
SegmentReplicationSource source,
ReplicationListener listener
) {
super("merged_segment_replication_target", indexShard, checkpoint, source, false, listener);
super("merged_segment_replication_target", indexShard, checkpoint, source, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,23 @@ private class CheckpointInfoRequestHandler implements TransportRequestHandler<Ch
public void messageReceived(CheckpointInfoRequest request, TransportChannel channel, Task task) throws Exception {
final ReplicationTimer timer = new ReplicationTimer();
timer.start();

final ShardId shardId = request.getCheckpoint().getShardId();
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.id());
if (false == indexShard.isPrimaryMode() || IndexShardState.STARTED != indexShard.state() || indexShard.isHandoffInProgress()) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"%s must be a started primary shard that is not in the hand-off process. However, the current states are isPrimaryMode %s, state %s, isHandoffInProgress %s",
shardId,
indexShard.isPrimaryMode(),
indexShard.state(),
indexShard.isHandoffInProgress()
)
);
}

final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = getRemoteSegmentFileChunkWriter(
SegmentReplicationTargetService.Actions.FILE_CHUNK,
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,7 @@ public SegmentReplicationTarget(
SegmentReplicationSource source,
ReplicationListener listener
) {
this(indexShard, checkpoint, source, false, listener);
}

public SegmentReplicationTarget(
IndexShard indexShard,
ReplicationCheckpoint checkpoint,
SegmentReplicationSource source,
boolean isRetry,
ReplicationListener listener
) {
super("replication_target", indexShard, checkpoint, source, isRetry, listener);
super("replication_target", indexShard, checkpoint, source, listener);
}

@Override
Expand Down Expand Up @@ -138,6 +128,6 @@ protected void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse

@Override
public SegmentReplicationTarget retryCopy() {
return new SegmentReplicationTarget(indexShard, checkpoint, source, isRetry, listener);
return new SegmentReplicationTarget(indexShard, checkpoint, source, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -295,23 +295,7 @@ public SegmentReplicationTarget get(ShardId shardId) {
* @param receivedCheckpoint received checkpoint that is checked for processing
* @param replicaShard replica shard on which checkpoint is received
*/
public void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) {
onNewCheckpoint(receivedCheckpoint, replicaShard, false);
}

/**
* Invoked when a new checkpoint is received from a primary shard.
* It checks if a new checkpoint should be processed or not and starts replication if needed.
*
* @param receivedCheckpoint received checkpoint that is checked for processing
* @param replicaShard replica shard on which checkpoint is received
* @param isRetry is it a retry after failure
*/
public synchronized void onNewCheckpoint(
final ReplicationCheckpoint receivedCheckpoint,
final IndexShard replicaShard,
boolean isRetry
) {
public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) {
logger.debug(() -> new ParameterizedMessage("Replica received new replication checkpoint from primary [{}]", receivedCheckpoint));
// if the shard is in any state
if (replicaShard.state().equals(IndexShardState.CLOSED)) {
Expand Down Expand Up @@ -348,7 +332,7 @@ public synchronized void onNewCheckpoint(
}
final Thread thread = Thread.currentThread();
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
startReplication(replicaShard, receivedCheckpoint, isRetry, new SegmentReplicationListener() {
startReplication(replicaShard, receivedCheckpoint, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.debug(
Expand Down Expand Up @@ -382,7 +366,7 @@ public void onReplicationFailure(
if (sendShardFailure == true) {
failShard(e, replicaShard);
} else {
processLatestReceivedCheckpoint(replicaShard, thread, true);
processLatestReceivedCheckpoint(replicaShard, thread);
}
}
});
Expand Down Expand Up @@ -497,11 +481,6 @@ private DiscoveryNode getPrimaryNode(ShardRouting primaryShard) {

// visible to tests
protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Thread thread) {
return processLatestReceivedCheckpoint(replicaShard, thread, false);
}

// visible to tests
protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Thread thread, boolean isRetry) {
final ReplicationCheckpoint latestPublishedCheckpoint = replicator.getPrimaryCheckpoint(replicaShard.shardId());
if (latestPublishedCheckpoint != null) {
logger.trace(
Expand All @@ -515,7 +494,7 @@ protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Threa
// if we retry ensure the shard is not in the process of being closed.
// it will be removed from indexService's collection before the shard is actually marked as closed.
if (indicesService.getShardOrNull(replicaShard.shardId()) != null) {
onNewCheckpoint(replicator.getPrimaryCheckpoint(replicaShard.shardId()), replicaShard, isRetry);
onNewCheckpoint(replicator.getPrimaryCheckpoint(replicaShard.shardId()), replicaShard);
}
};
// Checks if we are using same thread and forks if necessary.
Expand Down Expand Up @@ -546,24 +525,7 @@ public SegmentReplicationTarget startReplication(
final ReplicationCheckpoint checkpoint,
final SegmentReplicationListener listener
) {
return startReplication(indexShard, checkpoint, false, listener);
}

/**
* Start a round of replication and sync to at least the given checkpoint.
* @param indexShard - {@link IndexShard} replica shard
* @param checkpoint - {@link ReplicationCheckpoint} checkpoint to sync to
* @param isRetry - is it a retry after failure
* @param listener - {@link ReplicationListener}
* @return {@link SegmentReplicationTarget} target event orchestrating the event.
*/
public SegmentReplicationTarget startReplication(
final IndexShard indexShard,
final ReplicationCheckpoint checkpoint,
final boolean isRetry,
final SegmentReplicationListener listener
) {
return replicator.startReplication(indexShard, checkpoint, sourceFactory.get(indexShard), isRetry, listener);
return replicator.startReplication(indexShard, checkpoint, sourceFactory.get(indexShard), listener);
}

// pkg-private for integration tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public void startReplication(IndexShard shard) {
shard,
shard.getLatestReplicationCheckpoint(),
sourceFactory.get().get(shard),
false,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
Expand All @@ -107,18 +106,16 @@ void setSourceFactory(SegmentReplicationSourceFactory sourceFactory) {
* @param indexShard - {@link IndexShard} replica shard
* @param checkpoint - {@link ReplicationCheckpoint} checkpoint to sync to
* @param source - {@link SegmentReplicationSource} segment replication source
* @param isRetry - is it a retry after failure
* @param listener - {@link ReplicationListener}
* @return {@link SegmentReplicationTarget} target event orchestrating the event.
*/
SegmentReplicationTarget startReplication(
final IndexShard indexShard,
final ReplicationCheckpoint checkpoint,
final SegmentReplicationSource source,
final boolean isRetry,
final SegmentReplicationTargetService.SegmentReplicationListener listener
) {
final SegmentReplicationTarget target = new SegmentReplicationTarget(indexShard, checkpoint, source, isRetry, listener);
final SegmentReplicationTarget target = new SegmentReplicationTarget(indexShard, checkpoint, source, listener);
startReplication(target, indexShard.getRecoverySettings().activityTimeout());
return target;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -258,6 +259,22 @@ public void onFailure(Exception e) {
});
}

public void testCheckpointInfoDuringPrimaryHandOff() {
when(mockIndexShard.isHandoffInProgress()).thenReturn(true);
executeGetCheckpointInfo(new ActionListener<>() {
@Override
public void onResponse(CheckpointInfoResponse response) {
Assert.fail("Expected replication to fail due to primary shard hand off");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof RemoteTransportException);
assertTrue(e.getCause().getMessage().contains("must be a started primary shard that is not in the hand-off process"));
}
});
}

public void testPrimaryClearsOutOfSyncIds() {
final ClusterChangedEvent mock = mock(ClusterChangedEvent.class);
when(mock.routingTableChanged()).thenReturn(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@

import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -410,7 +409,6 @@ public void testShardAlreadyReplicating_HigherPrimaryTermReceived() throws Inter
// skip post replication actions so we can assert execution counts. This will continue to process bc replica's pterm is not advanced
// post replication.
doReturn(true).when(serviceSpy).processLatestReceivedCheckpoint(any(), any());
doReturn(true).when(serviceSpy).processLatestReceivedCheckpoint(any(), any(), anyBoolean());
// Create a Mockito spy of target to stub response of few method calls.

CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -478,7 +476,7 @@ public void cancel() {

// ensure the old target is cancelled. and new iteration kicks off.
verify(targetSpy, times(1)).cancel("Cancelling stuck target after new primary");
verify(serviceSpy, times(1)).startReplication(eq(replicaShard), any(), anyBoolean(), any());
verify(serviceSpy, times(1)).startReplication(eq(replicaShard), any(), any());
}

public void testMergedSegmentReplicating_HigherPrimaryTermReceived() throws IOException {
Expand Down Expand Up @@ -630,10 +628,10 @@ public void testStartReplicationListenerSuccess() throws InterruptedException {
SegmentReplicationTargetService spy = spy(sut);
CountDownLatch latch = new CountDownLatch(1);
doAnswer(i -> {
((SegmentReplicationTargetService.SegmentReplicationListener) i.getArgument(3)).onReplicationDone(state);
((SegmentReplicationTargetService.SegmentReplicationListener) i.getArgument(2)).onReplicationDone(state);
latch.countDown();
return null;
}).when(spy).startReplication(any(), any(), anyBoolean(), any());
}).when(spy).startReplication(any(), any(), any());
doNothing().when(spy).updateVisibleCheckpoint(eq(0L), any());
spy.afterIndexShardStarted(replicaShard);

Expand Down Expand Up @@ -677,7 +675,7 @@ public void testProcessLatestCheckpointIfCheckpointAhead() {
doReturn(mock(SegmentReplicationTarget.class)).when(service).startReplication(any(), any(), any());
service.updateLatestReceivedCheckpoint(aheadCheckpoint, replicaShard);
service.processLatestReceivedCheckpoint(replicaShard, null);
verify(service, times(1)).startReplication(eq(replicaShard), eq(aheadCheckpoint), anyBoolean(), any());
verify(service, times(1)).startReplication(eq(replicaShard), eq(aheadCheckpoint), any());
}

public void testOnNewCheckpointInvokedOnClosedShardDoesNothing() throws IOException {
Expand Down
Loading
Loading