Skip to content

Commit c3ea2ca

Browse files
committed
[Semgnet Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test
Signed-off-by: Suraj Singh <surajrider@gmail.com>
1 parent 4331d2a commit c3ea2ca

3 files changed

Lines changed: 14 additions & 9 deletions

File tree

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,9 @@ public void startReplication(ActionListener<Void> listener) {
160160
final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>();
161161
final StepListener<Void> finalizeListener = new StepListener<>();
162162

163+
cancellableThreads.checkForCancel();
163164
logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId());
164165
// Get list of files to copy from this checkpoint.
165-
cancellableThreads.checkForCancel();
166166
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
167167
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);
168168

server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.cluster.metadata.IndexMetadata;
1616
import org.opensearch.common.settings.ClusterSettings;
1717
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.common.util.CancellableThreads;
1819
import org.opensearch.index.engine.NRTReplicationEngineFactory;
1920
import org.opensearch.index.shard.IndexShard;
2021
import org.opensearch.index.shard.IndexShardTestCase;
@@ -29,6 +30,7 @@
2930
import java.util.concurrent.TimeUnit;
3031

3132
import static org.mockito.ArgumentMatchers.any;
33+
import static org.mockito.Mockito.doReturn;
3234
import static org.mockito.Mockito.mock;
3335
import static org.mockito.Mockito.when;
3436
import static org.mockito.Mockito.doAnswer;
@@ -37,6 +39,7 @@
3739
import static org.mockito.Mockito.times;
3840
import static org.mockito.Mockito.spy;
3941
import static org.mockito.Mockito.eq;
42+
import static org.opensearch.indices.replication.SegmentReplicationState.Stage.CANCELLED;
4043

4144
public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {
4245

@@ -215,24 +218,25 @@ public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws I
215218
// Mocking response when startReplication is called on targetSpy we send a new checkpoint to serviceSpy and later reduce countdown
216219
// of latch.
217220
doAnswer(invocation -> {
218-
final ActionListener<Void> listener = invocation.getArgument(0);
221+
// short circuit loop on new checkpoint request
222+
doReturn(null).when(serviceSpy).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any());
219223
// a new checkpoint arrives before we've completed.
220224
serviceSpy.onNewCheckpoint(newPrimaryCheckpoint, replicaShard);
221-
listener.onResponse(null);
222-
latch.countDown();
225+
try {
226+
invocation.callRealMethod();
227+
} catch (CancellableThreads.ExecutionCancelledException e) {
228+
latch.countDown();
229+
}
223230
return null;
224231
}).when(targetSpy).startReplication(any());
225-
doNothing().when(targetSpy).onDone();
226232

227233
// start replication. This adds the target to on-ongoing replication collection
228234
serviceSpy.startReplication(targetSpy);
229-
235+
latch.await();
230236
// wait for the new checkpoint to arrive, before the listener completes.
231-
latch.await(5, TimeUnit.SECONDS);
232-
doNothing().when(targetSpy).startReplication(any());
237+
assertEquals(CANCELLED, targetSpy.state().getStage());
233238
verify(targetSpy, times(1)).cancel("Cancelling stuck target after new primary");
234239
verify(serviceSpy, times(1)).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any());
235-
closeShards(replicaShard);
236240
}
237241

238242
public void testNewCheckpointBehindCurrentCheckpoint() {

test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1207,6 +1207,7 @@ public void getCheckpointMetadata(
12071207
copyState.getPendingDeleteFiles()
12081208
)
12091209
);
1210+
copyState.decRef();
12101211
} catch (IOException e) {
12111212
logger.error("Unexpected error computing CopyState", e);
12121213
Assert.fail("Failed to compute copyState");

0 commit comments

Comments
 (0)