Skip to content

Commit 8bfef5c

Browse files
dreamer-89brusic
authored andcommitted
[Segment Replication] Refactor RemoteStoreReplicationSource (opensearch-project#8767)
* [Segment Replication] Refactor remote replication source Signed-off-by: Suraj Singh <surajrider@gmail.com> * Unit test updates Signed-off-by: Suraj Singh <surajrider@gmail.com> * Self review Signed-off-by: Suraj Singh <surajrider@gmail.com> * Self review Signed-off-by: Suraj Singh <surajrider@gmail.com> * Segregate shard level tests for node to node and remote store segment replication Signed-off-by: Suraj Singh <surajrider@gmail.com> * Fix failing unit tests Signed-off-by: Suraj Singh <surajrider@gmail.com> * Fix failing UT Signed-off-by: Suraj Singh <surajrider@gmail.com> * Fix failing UT Signed-off-by: Suraj Singh <surajrider@gmail.com> * Address review comments Signed-off-by: Suraj Singh <surajrider@gmail.com> * Fix more unit tests Signed-off-by: Suraj Singh <surajrider@gmail.com> * Improve RemoteStoreReplicationSourceTests, remove unnecessary mocks and use actual failures for failure/exception use cases Signed-off-by: Suraj Singh <surajrider@gmail.com> * Spotless check fix Signed-off-by: Suraj Singh <surajrider@gmail.com> * Address review comments Signed-off-by: Suraj Singh <surajrider@gmail.com> * Ignore files already in store while computing segment file diff with primary Signed-off-by: Suraj Singh <surajrider@gmail.com> * Spotless fix Signed-off-by: Suraj Singh <surajrider@gmail.com> * Fix failing UT Signed-off-by: Suraj Singh <surajrider@gmail.com> * Spotless fix Signed-off-by: Suraj Singh <surajrider@gmail.com> * Move read/writes from IndexInput/Output to RemoteSegmentMetadata Signed-off-by: Suraj Singh <surajrider@gmail.com> * Address review commnt Signed-off-by: Suraj Singh <surajrider@gmail.com> * Update recovery flow to perform commits during recovery Signed-off-by: Suraj Singh <surajrider@gmail.com> * Remove un-necessary char Signed-off-by: Suraj Singh <surajrider@gmail.com> * Address review comments Signed-off-by: Suraj Singh <surajrider@gmail.com> * Update comment nit-pick Signed-off-by: Suraj Singh <surajrider@gmail.com> * Remove deletion logic causing read issues due to deleted segments_N Signed-off-by: Suraj Singh <surajrider@gmail.com> * Spotless fix Signed-off-by: Suraj Singh <surajrider@gmail.com> * Fix unit tests Signed-off-by: Suraj Singh <surajrider@gmail.com> --------- Signed-off-by: Suraj Singh <surajrider@gmail.com> Signed-off-by: Ivan Brusic <ivan.brusic@flocksafety.com>
1 parent 9ba8eb6 commit 8bfef5c

24 files changed

Lines changed: 1291 additions & 1128 deletions

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
package org.opensearch.indices.replication;
1010

11-
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
1211
import org.opensearch.action.search.SearchResponse;
1312
import org.opensearch.cluster.ClusterState;
1413
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -24,7 +23,6 @@
2423
import org.opensearch.core.index.shard.ShardId;
2524
import org.opensearch.index.IndexModule;
2625
import org.opensearch.index.IndexService;
27-
import org.opensearch.index.SegmentReplicationPerGroupStats;
2826
import org.opensearch.index.SegmentReplicationShardStats;
2927
import org.opensearch.index.engine.Engine;
3028
import org.opensearch.index.shard.IndexShard;
@@ -134,24 +132,6 @@ protected void waitForSearchableDocs(long docCount, String... nodes) throws Exce
134132
waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList()));
135133
}
136134

137-
protected void waitForSegmentReplication(String node) throws Exception {
138-
assertBusy(() -> {
139-
SegmentReplicationStatsResponse segmentReplicationStatsResponse = client(node).admin()
140-
.indices()
141-
.prepareSegmentReplicationStats(INDEX_NAME)
142-
.setDetailed(true)
143-
.execute()
144-
.actionGet();
145-
final SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats()
146-
.get(INDEX_NAME)
147-
.get(0);
148-
assertEquals(
149-
perGroupStats.getReplicaStats().stream().findFirst().get().getCurrentReplicationState().getStage(),
150-
SegmentReplicationState.Stage.DONE
151-
);
152-
}, 1, TimeUnit.MINUTES);
153-
}
154-
155135
protected void verifyStoreContent() throws Exception {
156136
assertBusy(() -> {
157137
final ClusterState clusterState = getClusterState();

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -272,12 +272,12 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
272272
assertTrue(
273273
replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted > 0
274274
&& primaryStats.uploadBytesStarted
275-
- zeroStatePrimaryStats.uploadBytesStarted == replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted
275+
- zeroStatePrimaryStats.uploadBytesStarted >= replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted
276276
);
277277
assertTrue(
278278
replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0
279279
&& primaryStats.uploadBytesSucceeded
280-
- zeroStatePrimaryStats.uploadBytesSucceeded == replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded
280+
- zeroStatePrimaryStats.uploadBytesSucceeded >= replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded
281281
);
282282
// Assert zero failures
283283
assertEquals(0, primaryStats.uploadBytesFailed);
@@ -369,8 +369,8 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
369369
assertEquals(0, uploadsFailed);
370370
assertEquals(0, uploadBytesFailed);
371371
for (int j = 0; j < response.getSuccessfulShards() - 1; j++) {
372-
assertEquals(uploadBytesStarted - zeroStatePrimaryStats.uploadBytesStarted, (long) downloadBytesStarted.get(j));
373-
assertEquals(uploadBytesSucceeded - zeroStatePrimaryStats.uploadBytesSucceeded, (long) downloadBytesSucceeded.get(j));
372+
assertTrue(uploadBytesStarted - zeroStatePrimaryStats.uploadBytesStarted > downloadBytesStarted.get(j));
373+
assertTrue(uploadBytesSucceeded - zeroStatePrimaryStats.uploadBytesSucceeded > downloadBytesSucceeded.get(j));
374374
assertEquals(0, (long) downloadBytesFailed.get(j));
375375
}
376376
}, 60, TimeUnit.SECONDS);

server/src/internalClusterTest/java/org/opensearch/snapshots/RemoteIndexSnapshotStatusApiIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
5757
.put(super.nodeSettings(nodeOrdinal))
5858
.put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that check by-timestamp order
5959
.put(FeatureFlags.REMOTE_STORE, "true")
60+
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
6061
.put(remoteStoreClusterSettings("remote-store-repo-name"))
6162
.build();
6263
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 10 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,7 @@
199199
import java.util.ArrayList;
200200
import java.util.Arrays;
201201
import java.util.Collections;
202-
import java.util.Comparator;
203202
import java.util.EnumSet;
204-
import java.util.HashMap;
205203
import java.util.HashSet;
206204
import java.util.List;
207205
import java.util.Locale;
@@ -1988,7 +1986,7 @@ private long recoverLocallyUpToGlobalCheckpoint() {
19881986
final Optional<SequenceNumbers.CommitInfo> safeCommit;
19891987
final long globalCheckpoint;
19901988
try {
1991-
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
1989+
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(TRANSLOG_UUID_KEY);
19921990
globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
19931991
safeCommit = store.findSafeIndexCommit(globalCheckpoint);
19941992
} catch (org.apache.lucene.index.IndexNotFoundException e) {
@@ -2088,7 +2086,7 @@ private long recoverLocallyUptoLastCommit() {
20882086
try {
20892087
seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(MAX_SEQ_NO));
20902088
} catch (org.apache.lucene.index.IndexNotFoundException e) {
2091-
logger.error("skip local recovery as no index commit found", e);
2089+
logger.error("skip local recovery as no index commit found");
20922090
return UNASSIGNED_SEQ_NO;
20932091
} catch (Exception e) {
20942092
logger.error("skip local recovery as failed to find the safe commit", e);
@@ -2242,7 +2240,7 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException {
22422240
// we have to set it before we open an engine and recover from the translog because
22432241
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
22442242
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
2245-
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
2243+
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(TRANSLOG_UUID_KEY);
22462244
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
22472245
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
22482246
}
@@ -2326,7 +2324,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
23262324
assert currentEngineReference.get() == null : "engine is running";
23272325
verifyNotClosed();
23282326
if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) {
2329-
syncSegmentsFromRemoteSegmentStore(false, true, true);
2327+
syncSegmentsFromRemoteSegmentStore(false, true);
23302328
}
23312329
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
23322330
if (syncFromRemote) {
@@ -4555,7 +4553,7 @@ public void close() throws IOException {
45554553
};
45564554
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
45574555
if (indexSettings.isRemoteStoreEnabled()) {
4558-
syncSegmentsFromRemoteSegmentStore(false, true, true);
4556+
syncSegmentsFromRemoteSegmentStore(false, true);
45594557
}
45604558
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
45614559
syncRemoteTranslogAndUpdateGlobalCheckpoint();
@@ -4616,13 +4614,11 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
46164614
* Downloads segments from remote segment store.
46174615
* @param overrideLocal flag to override local segment files with those in remote store
46184616
* @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise
4619-
* @param shouldCommit if the shard requires committing the changes after sync from remote.
46204617
* @throws IOException if exception occurs while reading segments from remote store
46214618
*/
4622-
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit)
4623-
throws IOException {
4619+
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException {
46244620
assert indexSettings.isRemoteStoreEnabled();
4625-
logger.info("Downloading segments from remote segment store");
4621+
logger.trace("Downloading segments from remote segment store");
46264622
RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory();
46274623
// We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that
46284624
// are uploaded to the remote segment store.
@@ -4647,7 +4643,6 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
46474643
} else {
46484644
storeDirectory = store.directory();
46494645
}
4650-
Set<String> localSegmentFiles = Sets.newHashSet(storeDirectory.listAll());
46514646
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal);
46524647

46534648
if (refreshLevelSegmentSync && remoteSegmentMetadata != null) {
@@ -4661,37 +4656,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
46614656
indexInput,
46624657
remoteSegmentMetadata.getGeneration()
46634658
);
4664-
// Replicas never need a local commit
4665-
if (shouldCommit) {
4666-
if (this.shardRouting.primary()) {
4667-
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
4668-
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
4669-
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
4670-
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
4671-
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the
4672-
// latest commit.
4673-
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
4674-
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
4675-
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
4676-
if (localMaxSegmentInfos.isPresent()
4677-
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get())
4678-
- 1) {
4679-
// If remote translog is not enabled, local translog will be created with different UUID.
4680-
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
4681-
// to be same. Following code block make sure to have the same UUID.
4682-
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
4683-
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
4684-
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
4685-
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
4686-
infosSnapshot.setUserData(userData, false);
4687-
}
4688-
storeDirectory.deleteFile(localMaxSegmentInfos.get());
4689-
}
4690-
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
4691-
}
4692-
} else {
4693-
finalizeReplication(infosSnapshot);
4694-
}
4659+
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
4660+
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
46954661
}
46964662
}
46974663
} catch (IOException e) {
@@ -4716,7 +4682,7 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
47164682
long primaryTerm,
47174683
long commitGeneration
47184684
) throws IOException {
4719-
logger.info("Downloading segments from given remote segment store");
4685+
logger.trace("Downloading segments from given remote segment store");
47204686
RemoteSegmentStoreDirectory remoteDirectory = null;
47214687
if (remoteStore != null) {
47224688
remoteDirectory = getRemoteDirectory();

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ private synchronized boolean syncSegments() {
220220
public void onResponse(Void unused) {
221221
try {
222222
// Start metadata file upload
223-
uploadMetadata(localSegmentsPostRefresh, segmentInfos);
223+
uploadMetadata(localSegmentsPostRefresh, segmentInfos, checkpoint);
224224
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
225225
onSuccessfulSegmentsSync(
226226
refreshTimeMs,
@@ -327,7 +327,8 @@ private boolean isRefreshAfterCommit() throws IOException {
327327
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
328328
}
329329

330-
void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos) throws IOException {
330+
void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint)
331+
throws IOException {
331332
final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint();
332333
SegmentInfos segmentInfosSnapshot = segmentInfos.clone();
333334
Map<String, String> userData = segmentInfosSnapshot.getUserData();
@@ -344,8 +345,8 @@ void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos se
344345
localSegmentsPostRefresh,
345346
segmentInfosSnapshot,
346347
storeDirectory,
347-
indexShard.getOperationPrimaryTerm(),
348-
translogFileGeneration
348+
translogFileGeneration,
349+
replicationCheckpoint
349350
);
350351
}
351352
}

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
530530
remoteStore.incRef();
531531
try {
532532
// Download segments from remote segment store
533-
indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true);
533+
indexShard.syncSegmentsFromRemoteSegmentStore(true, true);
534534

535535
if (store.directory().listAll().length == 0) {
536536
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
4545
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
4646
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
47+
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
4748
import org.opensearch.threadpool.ThreadPool;
4849

4950
import java.io.FileNotFoundException;
@@ -603,19 +604,20 @@ public boolean containsFile(String localFilename, String checksum) {
603604
* @param segmentFiles segment files that are part of the shard at the time of the latest refresh
604605
* @param segmentInfosSnapshot SegmentInfos bytes to store as part of metadata file
605606
* @param storeDirectory instance of local directory to temporarily create metadata file before upload
606-
* @param primaryTerm primary term to be used in the name of metadata file
607+
* @param translogGeneration translog generation
608+
* @param replicationCheckpoint ReplicationCheckpoint of primary shard
607609
* @throws IOException in case of I/O error while uploading the metadata file
608610
*/
609611
public void uploadMetadata(
610612
Collection<String> segmentFiles,
611613
SegmentInfos segmentInfosSnapshot,
612614
Directory storeDirectory,
613-
long primaryTerm,
614-
long translogGeneration
615+
long translogGeneration,
616+
ReplicationCheckpoint replicationCheckpoint
615617
) throws IOException {
616618
synchronized (this) {
617619
String metadataFilename = MetadataFilenameUtils.getMetadataFilename(
618-
primaryTerm,
620+
replicationCheckpoint.getPrimaryTerm(),
619621
segmentInfosSnapshot.getGeneration(),
620622
translogGeneration,
621623
metadataUploadCounter.incrementAndGet(),
@@ -646,8 +648,7 @@ public void uploadMetadata(
646648
new RemoteSegmentMetadata(
647649
RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments),
648650
segmentInfoSnapshotByteArray,
649-
primaryTerm,
650-
segmentInfosSnapshot.getGeneration()
651+
replicationCheckpoint
651652
)
652653
);
653654
}

server/src/main/java/org/opensearch/index/store/Store.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -845,22 +845,24 @@ private void cleanupFiles(Collection<String> filesToConsiderForCleanup, String r
845845
* @param tmpToFileName Map of temporary replication file to actual file name
846846
* @param infosBytes bytes[] of SegmentInfos supposed to be sent over by primary excluding segment_N file
847847
* @param segmentsGen segment generation number
848-
* @param consumer consumer for generated SegmentInfos
848+
* @param finalizeConsumer consumer for action on passed in SegmentInfos
849+
* @param renameConsumer consumer for action on temporary copied over files
849850
* @throws IOException Exception while reading store and building segment infos
850851
*/
851852
public void buildInfosFromBytes(
852853
Map<String, String> tmpToFileName,
853854
byte[] infosBytes,
854855
long segmentsGen,
855-
CheckedConsumer<SegmentInfos, IOException> consumer
856+
CheckedConsumer<SegmentInfos, IOException> finalizeConsumer,
857+
CheckedConsumer<Map<String, String>, IOException> renameConsumer
856858
) throws IOException {
857859
metadataLock.writeLock().lock();
858860
try {
859861
final List<String> values = new ArrayList<>(tmpToFileName.values());
860862
incRefFileDeleter(values);
861863
try {
862-
renameTempFilesSafe(tmpToFileName);
863-
consumer.accept(buildSegmentInfos(infosBytes, segmentsGen));
864+
renameConsumer.accept(tmpToFileName);
865+
finalizeConsumer.accept(buildSegmentInfos(infosBytes, segmentsGen));
864866
} finally {
865867
decRefFileDeleter(values);
866868
}

0 commit comments

Comments
 (0)