Skip to content

Commit 72f6115

Browse files
sachinpkalemch2
authored andcommitted
[Remote Store] Get SeqNo from Segment Info and integ tests for restore flow (opensearch-project#6067)
* Get SeqNo from Segment Info and add integ tests for restore flow from remote store Signed-off-by: Sachin Kale <kalsac@amazon.com>
1 parent e278611 commit 72f6115

4 files changed

Lines changed: 194 additions & 13 deletions

File tree

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore;
10+
11+
import org.junit.After;
12+
import org.junit.Before;
13+
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
14+
import org.opensearch.action.index.IndexResponse;
15+
import org.opensearch.action.support.PlainActionFuture;
16+
import org.opensearch.cluster.metadata.IndexMetadata;
17+
import org.opensearch.common.UUIDs;
18+
import org.opensearch.common.settings.Settings;
19+
import org.opensearch.common.util.FeatureFlags;
20+
import org.opensearch.index.IndexModule;
21+
import org.opensearch.indices.replication.common.ReplicationType;
22+
import org.opensearch.plugins.Plugin;
23+
import org.opensearch.test.InternalTestCluster;
24+
import org.opensearch.test.OpenSearchIntegTestCase;
25+
import org.opensearch.test.transport.MockTransportService;
26+
27+
import java.io.IOException;
28+
import java.nio.file.Path;
29+
import java.util.Arrays;
30+
import java.util.Collection;
31+
import java.util.HashMap;
32+
import java.util.Map;
33+
34+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
35+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
36+
37+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
38+
public class RemoteStoreIT extends OpenSearchIntegTestCase {
39+
40+
private static final String REPOSITORY_NAME = "test-remore-store-repo";
41+
private static final String INDEX_NAME = "remote-store-test-idx-1";
42+
private static final String TOTAL_OPERATIONS = "total-operations";
43+
private static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations";
44+
private static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total";
45+
private static final String MAX_SEQ_NO_REFRESHED_OR_FLUSHED = "max-seq-no-refreshed-or-flushed";
46+
47+
@Override
48+
protected Collection<Class<? extends Plugin>> nodePlugins() {
49+
return Arrays.asList(MockTransportService.TestPlugin.class);
50+
}
51+
52+
@Override
53+
public Settings indexSettings() {
54+
return remoteStoreIndexSettings(0);
55+
}
56+
57+
private Settings remoteStoreIndexSettings(int numberOfReplicas) {
58+
return Settings.builder()
59+
.put(super.indexSettings())
60+
.put("index.refresh_interval", "300s")
61+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
62+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
63+
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
64+
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
65+
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
66+
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
67+
.build();
68+
}
69+
70+
private Settings remoteTranslogIndexSettings(int numberOfReplicas) {
71+
return Settings.builder()
72+
.put(remoteStoreIndexSettings(numberOfReplicas))
73+
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
74+
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
75+
.build();
76+
}
77+
78+
@Override
79+
protected boolean addMockInternalEngine() {
80+
return false;
81+
}
82+
83+
@Override
84+
protected Settings featureFlagSettings() {
85+
return Settings.builder()
86+
.put(super.featureFlagSettings())
87+
.put(FeatureFlags.REPLICATION_TYPE, "true")
88+
.put(FeatureFlags.REMOTE_STORE, "true")
89+
.build();
90+
}
91+
92+
@Before
93+
public void setup() {
94+
Path absolutePath = randomRepoPath().toAbsolutePath();
95+
assertAcked(
96+
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
97+
);
98+
}
99+
100+
@After
101+
public void teardown() {
102+
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
103+
}
104+
105+
private IndexResponse indexSingleDoc() {
106+
return client().prepareIndex(INDEX_NAME)
107+
.setId(UUIDs.randomBase64UUID())
108+
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
109+
.get();
110+
}
111+
112+
private Map<String, Long> indexData() {
113+
long totalOperations = 0;
114+
long refreshedOrFlushedOperations = 0;
115+
long maxSeqNo = 0;
116+
long maxSeqNoRefreshedOrFlushed = 0;
117+
for (int i = 0; i < randomIntBetween(1, 10); i++) {
118+
if (randomBoolean()) {
119+
flush(INDEX_NAME);
120+
} else {
121+
refresh(INDEX_NAME);
122+
}
123+
maxSeqNoRefreshedOrFlushed = maxSeqNo;
124+
refreshedOrFlushedOperations = totalOperations;
125+
int numberOfOperations = randomIntBetween(20, 50);
126+
for (int j = 0; j < numberOfOperations; j++) {
127+
IndexResponse response = indexSingleDoc();
128+
maxSeqNo = response.getSeqNo();
129+
}
130+
totalOperations += numberOfOperations;
131+
}
132+
Map<String, Long> indexingStats = new HashMap<>();
133+
indexingStats.put(TOTAL_OPERATIONS, totalOperations);
134+
indexingStats.put(REFRESHED_OR_FLUSHED_OPERATIONS, refreshedOrFlushedOperations);
135+
indexingStats.put(MAX_SEQ_NO_TOTAL, maxSeqNo);
136+
indexingStats.put(MAX_SEQ_NO_REFRESHED_OR_FLUSHED, maxSeqNoRefreshedOrFlushed);
137+
return indexingStats;
138+
}
139+
140+
private void verifyRestoredData(Map<String, Long> indexStats, boolean checkTotal) {
141+
String statsGranularity = checkTotal ? TOTAL_OPERATIONS : REFRESHED_OR_FLUSHED_OPERATIONS;
142+
String maxSeqNoGranularity = checkTotal ? MAX_SEQ_NO_TOTAL : MAX_SEQ_NO_REFRESHED_OR_FLUSHED;
143+
ensureYellowAndNoInitializingShards(INDEX_NAME);
144+
ensureGreen(INDEX_NAME);
145+
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity));
146+
IndexResponse response = indexSingleDoc();
147+
assertEquals(indexStats.get(maxSeqNoGranularity) + 1, response.getSeqNo());
148+
refresh(INDEX_NAME);
149+
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity) + 1);
150+
}
151+
152+
public void testRemoteStoreRestoreFromRemoteSegmentStore() throws IOException {
153+
internalCluster().startNodes(3);
154+
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
155+
ensureYellowAndNoInitializingShards(INDEX_NAME);
156+
ensureGreen(INDEX_NAME);
157+
158+
Map<String, Long> indexStats = indexData();
159+
160+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
161+
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
162+
163+
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
164+
165+
verifyRestoredData(indexStats, false);
166+
}
167+
168+
public void testRemoteTranslogRestore() throws IOException {
169+
internalCluster().startNodes(3);
170+
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
171+
ensureYellowAndNoInitializingShards(INDEX_NAME);
172+
ensureGreen(INDEX_NAME);
173+
174+
Map<String, Long> indexStats = indexData();
175+
176+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
177+
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
178+
179+
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
180+
181+
verifyRestoredData(indexStats, true);
182+
}
183+
}

server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,8 @@ private IndexMetadata.Builder updateInSyncAllocations(
202202
if (recoverySource == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE) {
203203
allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID;
204204
} else {
205-
assert recoverySource instanceof RecoverySource.SnapshotRecoverySource : recoverySource;
205+
assert (recoverySource instanceof RecoverySource.SnapshotRecoverySource
206+
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource) : recoverySource;
206207
allocationId = updates.initializedPrimary.allocationId().getId();
207208
}
208209
// forcing a stale primary resets the in-sync allocations to the singleton set with the stale id

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -180,20 +180,11 @@ private boolean isRefreshAfterCommit() throws IOException {
180180
}
181181

182182
String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos segmentInfosSnapshot) throws IOException {
183-
// We use lastRefreshedCheckpoint as local checkpoint for the SegmentInfosSnapshot. This is better than using
184-
// getProcessedLocalCheckpoint() as processedCheckpoint can advance between reading the value and setting up
185-
// in SegmentInfos.userData. This could lead to data loss as, during recovery, translog will be replayed based on
186-
// LOCAL_CHECKPOINT_KEY.
187-
// lastRefreshedCheckpoint is updated after refresh listeners are executed, this means, InternalEngine.lastRefreshedCheckpoint()
188-
// will return checkpoint of last refresh but that does not impact the correctness as duplicate sequence numbers
189-
// will not be replayed.
190-
assert indexShard.getEngine() instanceof InternalEngine : "Expected shard with InternalEngine, got: "
191-
+ indexShard.getEngine().getClass();
192-
final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
183+
final long maxSeqNoFromSegmentInfos = indexShard.getEngine().getMaxSeqNoFromSegmentInfos(segmentInfosSnapshot);
193184

194185
Map<String, String> userData = segmentInfosSnapshot.getUserData();
195-
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(lastRefreshedCheckpoint));
196-
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(lastRefreshedCheckpoint));
186+
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(maxSeqNoFromSegmentInfos));
187+
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNoFromSegmentInfos));
197188
segmentInfosSnapshot.setUserData(userData, false);
198189

199190
long commitGeneration = SegmentInfos.generationFromSegmentsFileName(latestSegmentsNFilename);

test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2470,4 +2470,10 @@ public void manageReplicaSettingForDefaultReplica(boolean apply) {
24702470
updateSettingsRequest.persistentSettings(settings);
24712471
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
24722472
}
2473+
2474+
protected String primaryNodeName(String indexName) {
2475+
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
2476+
String nodeId = clusterState.getRoutingTable().index(indexName).shard(0).primaryShard().currentNodeId();
2477+
return clusterState.getRoutingNodes().node(nodeId).node().getName();
2478+
}
24732479
}

0 commit comments

Comments
 (0)