Skip to content

Commit a75ec2e

Browse files
[Remote Store] Fix relocation failure due to transport receive timeout (#10761)
* [Remote Store] Fix relocation failure due to transport receive timeout Signed-off-by: Ashish Singh <ssashish@amazon.com> * Fix existing extended shardIdle for remote backed shards Signed-off-by: Ashish Singh <ssashish@amazon.com> * Incorporate PR review comments Signed-off-by: Ashish Singh <ssashish@amazon.com> --------- Signed-off-by: Ashish Singh <ssashish@amazon.com> (cherry picked from commit a1fde65) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent eed9d99 commit a75ec2e

10 files changed

Lines changed: 75 additions & 13 deletions

File tree

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,4 +509,27 @@ public void testRestoreSnapshotToIndexWithSameNameDifferentUUID() throws Excepti
509509
assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50);
510510
});
511511
}
512+
513+
public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, InterruptedException {
514+
internalCluster().startClusterManagerOnlyNode();
515+
String primaryShardNode = internalCluster().startDataOnlyNodes(1).get(0);
516+
517+
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
518+
ensureGreen(INDEX_NAME);
519+
IndexShard indexShard = getIndexShard(primaryShardNode);
520+
assertFalse(indexShard.isSearchIdleSupported());
521+
522+
String replicaShardNode = internalCluster().startDataOnlyNodes(1).get(0);
523+
assertAcked(
524+
client().admin()
525+
.indices()
526+
.prepareUpdateSettings(INDEX_NAME)
527+
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
528+
);
529+
ensureGreen(INDEX_NAME);
530+
assertFalse(indexShard.isSearchIdleSupported());
531+
532+
indexShard = getIndexShard(replicaShardNode);
533+
assertFalse(indexShard.isSearchIdleSupported());
534+
}
512535
}

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,6 +1023,9 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
10231023
}
10241024

10251025
private void setSearchIdleAfter(TimeValue searchIdleAfter) {
1026+
if (this.isRemoteStoreEnabled) {
1027+
logger.warn("Search idle is not supported for remote backed indices");
1028+
}
10261029
if (this.replicationType == ReplicationType.SEGMENT && this.getNumberOfReplicas() > 0) {
10271030
logger.warn("Search idle is not supported for indices with replicas using 'replication.type: SEGMENT'");
10281031
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4436,7 +4436,6 @@ public final boolean isSearchIdle() {
44364436
}
44374437

44384438
/**
4439-
*
44404439
* Returns true if this shard supports search idle.
44414440
* <p>
44424441
* Indices using Segment Replication will ignore search idle unless there are no replicas.
@@ -4445,6 +4444,11 @@ public final boolean isSearchIdle() {
44454444
* a new set of segments.
44464445
*/
44474446
public final boolean isSearchIdleSupported() {
4447+
// If the index is remote store backed, then search idle is not supported. This is to ensure that async refresh
4448+
// task continues to upload to remote store periodically.
4449+
if (isRemoteTranslogEnabled()) {
4450+
return false;
4451+
}
44484452
return indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0;
44494453
}
44504454

server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,10 @@ public String getTranslogUUID() {
429429
* @return if the translog should be flushed
430430
*/
431431
public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) {
432-
final long translogGenerationOfLastCommit = translog.getMinGenerationForSeqNo(
433-
localCheckpointOfLastCommit + 1
434-
).translogFileGeneration;
435-
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
432+
// This is the minimum seqNo that is referred in translog and considered for calculating translog size
433+
long minTranslogRefSeqNo = translog.getMinUnreferencedSeqNoInSegments(localCheckpointOfLastCommit + 1);
434+
final long minReferencedTranslogGeneration = translog.getMinGenerationForSeqNo(minTranslogRefSeqNo).translogFileGeneration;
435+
if (translog.sizeInBytesByMinGen(minReferencedTranslogGeneration) < flushThreshold) {
436436
return false;
437437
}
438438
/*
@@ -453,7 +453,7 @@ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long fl
453453
final long translogGenerationOfNewCommit = translog.getMinGenerationForSeqNo(
454454
localCheckpointTrackerSupplier.get().getProcessedCheckpoint() + 1
455455
).translogFileGeneration;
456-
return translogGenerationOfLastCommit < translogGenerationOfNewCommit
456+
return minReferencedTranslogGeneration < translogGenerationOfNewCommit
457457
|| localCheckpointTrackerSupplier.get().getProcessedCheckpoint() == localCheckpointTrackerSupplier.get().getMaxSeqNo();
458458
}
459459

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,4 +544,9 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro
544544
}
545545
}
546546
}
547+
548+
@Override
549+
public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
550+
return minSeqNoToKeep;
551+
}
547552
}

server/src/main/java/org/opensearch/index/translog/Translog.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2034,4 +2034,8 @@ public static String createEmptyTranslog(
20342034
writer.close();
20352035
return uuid;
20362036
}
2037+
2038+
public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
2039+
return minUnrefCheckpointInLastCommit;
2040+
}
20372041
}

server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,8 @@ private Tuple<RecoverySourceHandler, RemoteRecoveryTargetHandler> createRecovery
376376
transportService,
377377
request.targetNode(),
378378
recoverySettings,
379-
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)
379+
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime),
380+
shard.isRemoteTranslogEnabled()
380381
);
381382
handler = RecoverySourceHandlerFactory.create(shard, recoveryTarget, request, recoverySettings);
382383
return Tuple.tuple(handler, recoveryTarget);

server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,16 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
7575
private final AtomicLong requestSeqNoGenerator = new AtomicLong(0);
7676
private final RetryableTransportClient retryableTransportClient;
7777
private final RemoteSegmentFileChunkWriter fileChunkWriter;
78+
private final boolean remoteStoreEnabled;
7879

7980
public RemoteRecoveryTargetHandler(
8081
long recoveryId,
8182
ShardId shardId,
8283
TransportService transportService,
8384
DiscoveryNode targetNode,
8485
RecoverySettings recoverySettings,
85-
Consumer<Long> onSourceThrottle
86+
Consumer<Long> onSourceThrottle,
87+
boolean remoteStoreEnabled
8688
) {
8789
this.transportService = transportService;
8890
// It is safe to pass the retry timeout value here because RemoteRecoveryTargetHandler
@@ -111,6 +113,7 @@ public RemoteRecoveryTargetHandler(
111113
requestSeqNoGenerator,
112114
onSourceThrottle
113115
);
116+
this.remoteStoreEnabled = remoteStoreEnabled;
114117
}
115118

116119
public DiscoveryNode targetNode() {
@@ -129,7 +132,13 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Vo
129132
);
130133
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
131134
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
132-
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
135+
if (remoteStoreEnabled) {
136+
// If remote store is enabled, during the prepare_translog phase, translog is also downloaded on the
137+
// target host along with incremental segments download.
138+
retryableTransportClient.executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader);
139+
} else {
140+
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
141+
}
133142
}
134143

135144
@Override

server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,15 @@ public void onReplicationFailure(
471471
}
472472
}
473473

474+
@Override
475+
protected void validateShardIdleWithNoReplicas(IndexShard primary) {
476+
// ensure search idle conditions are met.
477+
assertFalse(primary.isSearchIdleSupported());
478+
assertTrue(primary.isSearchIdle());
479+
assertTrue(primary.scheduledRefresh());
480+
assertFalse(primary.hasRefreshPending());
481+
}
482+
474483
private void assertSingleSegmentFile(IndexShard shard, String fileName) throws IOException {
475484
final Set<String> segmentsFileNames = Arrays.stream(shard.store().directory().listAll())
476485
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))

server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -436,13 +436,17 @@ public void testShardIdleWithNoReplicas() throws Exception {
436436
shards.startAll();
437437
final IndexShard primary = shards.getPrimary();
438438
shards.indexDocs(randomIntBetween(1, 10));
439-
// ensure search idle conditions are met.
440-
assertTrue(primary.isSearchIdle());
441-
assertFalse(primary.scheduledRefresh());
442-
assertTrue(primary.hasRefreshPending());
439+
validateShardIdleWithNoReplicas(primary);
443440
}
444441
}
445442

443+
protected void validateShardIdleWithNoReplicas(IndexShard primary) {
444+
// ensure search idle conditions are met.
445+
assertTrue(primary.isSearchIdle());
446+
assertFalse(primary.scheduledRefresh());
447+
assertTrue(primary.hasRefreshPending());
448+
}
449+
446450
/**
447451
* here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh.
448452
*/

0 commit comments

Comments
 (0)