Skip to content

Commit 71d89ca

Browse files
[Remote Store] Sync segments in refresh listener on refresh after commit (#10830) (#10849)
* [Remote Store] Sync segments in refresh listener on refresh after commit * Add Integration Tests * Add comments and java doc --------- (cherry picked from commit 7453daa) Signed-off-by: Ashish Singh <ssashish@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent fe51ca8 commit 71d89ca

5 files changed

Lines changed: 215 additions & 18 deletions

File tree

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
1616
import org.opensearch.action.support.PlainActionFuture;
1717
import org.opensearch.cluster.ClusterState;
18+
import org.opensearch.cluster.coordination.FollowersChecker;
19+
import org.opensearch.cluster.coordination.LeaderChecker;
1820
import org.opensearch.cluster.node.DiscoveryNode;
1921
import org.opensearch.cluster.routing.ShardRouting;
2022
import org.opensearch.cluster.routing.ShardRoutingState;
@@ -23,15 +25,20 @@
2325
import org.opensearch.index.IndexSettings;
2426
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
2527
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
28+
import org.opensearch.plugins.Plugin;
2629
import org.opensearch.test.InternalTestCluster;
2730
import org.opensearch.test.OpenSearchIntegTestCase;
28-
import org.junit.Before;
31+
import org.opensearch.test.disruption.NetworkDisruption;
32+
import org.opensearch.test.transport.MockTransportService;
2933

3034
import java.io.IOException;
3135
import java.util.ArrayList;
3236
import java.util.Arrays;
37+
import java.util.Collection;
38+
import java.util.HashSet;
3339
import java.util.List;
3440
import java.util.Locale;
41+
import java.util.Set;
3542
import java.util.concurrent.TimeUnit;
3643
import java.util.stream.Collectors;
3744
import java.util.stream.Stream;
@@ -45,12 +52,17 @@ public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase {
4552

4653
private static final String INDEX_NAME = "remote-store-test-idx-1";
4754

48-
@Before
55+
@Override
56+
protected Collection<Class<? extends Plugin>> nodePlugins() {
57+
return Arrays.asList(MockTransportService.TestPlugin.class);
58+
}
59+
4960
public void setup() {
5061
internalCluster().startNodes(3);
5162
}
5263

5364
public void testStatsResponseFromAllNodes() {
65+
setup();
5466

5567
// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
5668
// during this time frame. This ensures that the segment upload has started.
@@ -121,6 +133,7 @@ public void testStatsResponseFromAllNodes() {
121133
}
122134

123135
public void testStatsResponseAllShards() {
136+
setup();
124137

125138
// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
126139
// during this time frame. This ensures that the segment upload has started.
@@ -181,6 +194,7 @@ public void testStatsResponseAllShards() {
181194
}
182195

183196
public void testStatsResponseFromLocalNode() {
197+
setup();
184198

185199
// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
186200
// during this time frame. This ensures that the segment upload has started.
@@ -244,6 +258,7 @@ public void testStatsResponseFromLocalNode() {
244258
}
245259

246260
public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exception {
261+
setup();
247262
// Scenario:
248263
// - Create index with single primary and single replica shard
249264
// - Disable Refresh Interval for the index
@@ -333,6 +348,7 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
333348
}
334349

335350
public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() throws Exception {
351+
setup();
336352
// Scenario:
337353
// - Create index with single primary and N-1 replica shards (N = no of data nodes)
338354
// - Disable Refresh Interval for the index
@@ -424,6 +440,7 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
424440
}
425441

426442
public void testStatsOnShardRelocation() {
443+
setup();
427444
// Scenario:
428445
// - Create index with single primary and single replica shard
429446
// - Index documents
@@ -479,6 +496,7 @@ public void testStatsOnShardRelocation() {
479496
}
480497

481498
public void testStatsOnShardUnassigned() throws IOException {
499+
setup();
482500
// Scenario:
483501
// - Create index with single primary and two replica shard
484502
// - Index documents
@@ -505,6 +523,7 @@ public void testStatsOnShardUnassigned() throws IOException {
505523
}
506524

507525
public void testStatsOnRemoteStoreRestore() throws IOException {
526+
setup();
508527
// Creating an index with primary shard count == total nodes in cluster and 0 replicas
509528
int dataNodeCount = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes();
510529
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, dataNodeCount));
@@ -552,6 +571,7 @@ public void testStatsOnRemoteStoreRestore() throws IOException {
552571
}
553572

554573
public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exception {
574+
setup();
555575
// Create an index with one primary and one replica shard
556576
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1));
557577
ensureGreen(INDEX_NAME);
@@ -589,6 +609,58 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce
589609
}, 5, TimeUnit.SECONDS);
590610
}
591611

612+
public void testStatsCorrectnessOnFailover() {
613+
Settings clusterSettings = Settings.builder()
614+
.put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "100ms")
615+
.put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "500ms")
616+
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
617+
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "100ms")
618+
.put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "500ms")
619+
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
620+
.put(nodeSettings(0))
621+
.build();
622+
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(clusterSettings);
623+
internalCluster().startDataOnlyNodes(2, clusterSettings);
624+
625+
// Create an index with one primary and one replica shard
626+
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1));
627+
ensureGreen(INDEX_NAME);
628+
629+
// Index some docs and refresh
630+
indexDocs();
631+
refresh(INDEX_NAME);
632+
633+
String primaryNode = primaryNodeName(INDEX_NAME);
634+
String replicaNode = replicaNodeName(INDEX_NAME);
635+
636+
// Start network disruption - primary node will be isolated
637+
Set<String> nodesInOneSide = Stream.of(clusterManagerNode, replicaNode).collect(Collectors.toCollection(HashSet::new));
638+
Set<String> nodesInOtherSide = Stream.of(primaryNode).collect(Collectors.toCollection(HashSet::new));
639+
NetworkDisruption networkDisruption = new NetworkDisruption(
640+
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
641+
NetworkDisruption.DISCONNECT
642+
);
643+
internalCluster().setDisruptionScheme(networkDisruption);
644+
logger.info("--> network disruption is started");
645+
networkDisruption.startDisrupting();
646+
ensureStableCluster(2, clusterManagerNode);
647+
648+
RemoteStoreStatsResponse response = client(clusterManagerNode).admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get();
649+
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, "0");
650+
List<RemoteStoreStats> matches = Arrays.stream(response.getRemoteStoreStats())
651+
.filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString()))
652+
.collect(Collectors.toList());
653+
assertEquals(1, matches.size());
654+
RemoteSegmentTransferTracker.Stats segmentStats = matches.get(0).getSegmentStats();
655+
assertEquals(0, segmentStats.refreshTimeLagMs);
656+
657+
networkDisruption.stopDisrupting();
658+
internalCluster().clearDisruptionScheme();
659+
ensureStableCluster(3, clusterManagerNode);
660+
ensureGreen(INDEX_NAME);
661+
logger.info("Test completed");
662+
}
663+
592664
private void indexDocs() {
593665
for (int i = 0; i < randomIntBetween(5, 10); i++) {
594666
if (randomBoolean()) {

server/src/main/java/org/opensearch/index/remote/RemoteTranslogTransferTracker.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,63 @@ public RemoteTranslogTransferTracker.Stats stats() {
232232
);
233233
}
234234

235+
@Override
236+
public String toString() {
237+
return "RemoteTranslogTransferStats{"
238+
+ "lastSuccessfulUploadTimestamp="
239+
+ lastSuccessfulUploadTimestamp.get()
240+
+ ","
241+
+ "totalUploadsStarted="
242+
+ totalUploadsStarted.get()
243+
+ ","
244+
+ "totalUploadsSucceeded="
245+
+ totalUploadsSucceeded.get()
246+
+ ","
247+
+ "totalUploadsFailed="
248+
+ totalUploadsFailed.get()
249+
+ ","
250+
+ "uploadBytesStarted="
251+
+ uploadBytesStarted.get()
252+
+ ","
253+
+ "uploadBytesFailed="
254+
+ uploadBytesFailed.get()
255+
+ ","
256+
+ "totalUploadTimeInMillis="
257+
+ totalUploadTimeInMillis.get()
258+
+ ","
259+
+ "uploadBytesMovingAverage="
260+
+ uploadBytesMovingAverageReference.get().getAverage()
261+
+ ","
262+
+ "uploadBytesPerSecMovingAverage="
263+
+ uploadBytesPerSecMovingAverageReference.get().getAverage()
264+
+ ","
265+
+ "uploadTimeMovingAverage="
266+
+ uploadTimeMsMovingAverageReference.get().getAverage()
267+
+ ","
268+
+ "lastSuccessfulDownloadTimestamp="
269+
+ lastSuccessfulDownloadTimestamp.get()
270+
+ ","
271+
+ "totalDownloadsSucceeded="
272+
+ totalDownloadsSucceeded.get()
273+
+ ","
274+
+ "downloadBytesSucceeded="
275+
+ downloadBytesSucceeded.get()
276+
+ ","
277+
+ "totalDownloadTimeInMillis="
278+
+ totalDownloadTimeInMillis.get()
279+
+ ","
280+
+ "downloadBytesMovingAverage="
281+
+ downloadBytesMovingAverageReference.get().getAverage()
282+
+ ","
283+
+ "downloadBytesPerSecMovingAverage="
284+
+ downloadBytesPerSecMovingAverageReference.get().getAverage()
285+
+ ","
286+
+ "downloadTimeMovingAverage="
287+
+ downloadTimeMsMovingAverageReference.get().getAverage()
288+
+ ","
289+
+ "}";
290+
}
291+
235292
/**
236293
* Represents the tracker's state as seen in the stats API.
237294
*

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4783,6 +4783,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
47834783
* @throws IOException if exception occurs while reading segments from remote store.
47844784
*/
47854785
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException {
4786+
boolean syncSegmentSuccess = false;
4787+
long startTimeMs = System.currentTimeMillis();
47864788
assert indexSettings.isRemoteStoreEnabled();
47874789
logger.trace("Downloading segments from remote segment store");
47884790
RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory();
@@ -4832,9 +4834,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
48324834
: "There should not be any segments file in the dir";
48334835
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
48344836
}
4837+
syncSegmentSuccess = true;
48354838
} catch (IOException e) {
48364839
throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e);
48374840
} finally {
4841+
logger.trace(
4842+
"syncSegmentsFromRemoteSegmentStore success={} elapsedTime={}",
4843+
syncSegmentSuccess,
4844+
(System.currentTimeMillis() - startTimeMs)
4845+
);
48384846
store.decRef();
48394847
remoteStore.decRef();
48404848
}

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

Lines changed: 69 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -123,14 +123,13 @@ public void beforeRefresh() throws IOException {}
123123

124124
@Override
125125
protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
126-
if (shouldSync(didRefresh)) {
126+
// We have 2 separate methods to check if sync needs to be done or not. This is required since we use the return boolean
127+
// from isReadyForUpload to schedule refresh retries as the index shard or the primary mode are not in complete
128+
// ready state.
129+
if (shouldSync(didRefresh) && isReadyForUpload()) {
127130
segmentTracker.updateLocalRefreshTimeAndSeqNo();
128131
try {
129-
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
130-
logger.debug("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm());
131-
this.primaryTerm = indexShard.getOperationPrimaryTerm();
132-
this.remoteDirectory.init();
133-
}
132+
initializeRemoteDirectoryOnTermUpdate();
134133
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
135134
Collection<String> localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true);
136135
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
@@ -160,20 +159,20 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
160159
}
161160

162161
private boolean shouldSync(boolean didRefresh) {
163-
// The third condition exists for uploading the zero state segments where the refresh has not changed the reader reference, but it
164-
// is important to upload the zero state segments so that the restore does not break.
165162
return this.primaryTerm != indexShard.getOperationPrimaryTerm()
163+
// If the readers change, didRefresh is always true.
166164
|| didRefresh
167-
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty();
165+
// The third condition exists for uploading the zero state segments where the refresh has not changed the reader
166+
// reference, but it is important to upload the zero state segments so that the restore does not break.
167+
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()
168+
// When the shouldSync is called the first time, then 1st condition on primary term is true. But after that
169+
// we update the primary term and the same condition would not evaluate to true again in syncSegments.
170+
// Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call.
171+
|| isRefreshAfterCommitSafe();
168172
}
169173

170174
private boolean syncSegments() {
171-
if (indexShard.getReplicationTracker().isPrimaryMode() == false || indexShard.state() == IndexShardState.CLOSED) {
172-
logger.debug(
173-
"Skipped syncing segments with primaryMode={} indexShardState={}",
174-
indexShard.getReplicationTracker().isPrimaryMode(),
175-
indexShard.state()
176-
);
175+
if (isReadyForUpload() == false) {
177176
// Following check is required to enable retry and make sure that we do not lose this refresh event
178177
// When primary shard is restored from remote store, the recovery happens first followed by changing
179178
// primaryMode to true. Due to this, the refresh that is triggered post replay of translog will not go through
@@ -323,6 +322,19 @@ private boolean isRefreshAfterCommit() throws IOException {
323322
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
324323
}
325324

325+
/**
326+
* Returns if the current refresh has happened after a commit.
327+
* @return true if this refresh has happened on account of a commit. If otherwise or exception, returns false.
328+
*/
329+
private boolean isRefreshAfterCommitSafe() {
330+
try {
331+
return isRefreshAfterCommit();
332+
} catch (Exception e) {
333+
logger.info("Exception occurred in isRefreshAfterCommitSafe", e);
334+
}
335+
return false;
336+
}
337+
326338
void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint)
327339
throws IOException {
328340
final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint();
@@ -440,6 +452,48 @@ private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesB
440452
}
441453
}
442454

455+
/**
456+
* On primary term update, we (re)initialise the remote segment directory to reflect the latest metadata file that
457+
* has been uploaded to remote store successfully. This method also updates the segment tracker about the latest
458+
* uploaded segment files onto remote store.
459+
*/
460+
private void initializeRemoteDirectoryOnTermUpdate() throws IOException {
461+
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
462+
logger.trace("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm());
463+
this.primaryTerm = indexShard.getOperationPrimaryTerm();
464+
RemoteSegmentMetadata uploadedMetadata = this.remoteDirectory.init();
465+
466+
// During failover, the uploaded metadata would have names of files that have been uploaded to remote store.
467+
// Here we update the tracker with latest remote uploaded files.
468+
if (uploadedMetadata != null) {
469+
segmentTracker.setLatestUploadedFiles(uploadedMetadata.getMetadata().keySet());
470+
}
471+
}
472+
}
473+
474+
/**
475+
* This checks for readiness of the index shard and primary mode. This has separated from shouldSync since we use the
476+
* returned value of this method for scheduling retries in syncSegments method.
477+
* @return true iff primaryMode is true and index shard is not in closed state.
478+
*/
479+
private boolean isReadyForUpload() {
480+
boolean isReady = indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED;
481+
if (isReady == false) {
482+
StringBuilder sb = new StringBuilder("Skipped syncing segments with");
483+
if (indexShard.getReplicationTracker() != null) {
484+
sb.append(" primaryMode=").append(indexShard.getReplicationTracker().isPrimaryMode());
485+
}
486+
if (indexShard.state() != null) {
487+
sb.append(" indexShardState=").append(indexShard.state());
488+
}
489+
if (indexShard.getEngineOrNull() != null) {
490+
sb.append(" engineType=").append(indexShard.getEngine().getClass().getSimpleName());
491+
}
492+
logger.trace(sb.toString());
493+
}
494+
return isReady;
495+
}
496+
443497
/**
444498
* Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events
445499
*/

0 commit comments

Comments
 (0)