Skip to content

Commit ebed94f

Browse files
mch2dreamer-89
andcommitted
Segment Replication - Fix NoSuchFileException errors caused when computing metadata snapshot on primary shards. (opensearch-project#4366)
* Segment Replication - Fix NoSuchFileException errors caused when computing metadata snapshot on primary shards. This change fixes the errors that occur when computing metadata snapshots on primary shards from the latest in-memory SegmentInfos. The error occurs when a segments_N file that is referenced by the in-memory infos is deleted as part of a concurrent commit. The segments themselves are incref'd by IndexWriter.incRefDeleter but the commit file (Segments_N) is not. This change resolves this by ignoring the segments_N file when computing metadata for CopyState and only sending incref'd segment files to replicas. Signed-off-by: Marc Handalian <handalm@amazon.com> * Fix spotless. Signed-off-by: Marc Handalian <handalm@amazon.com> * Update StoreTests.testCleanupAndPreserveLatestCommitPoint to assert additional segments are deleted. Signed-off-by: Marc Handalian <handalm@amazon.com> * Rename snapshot to metadataMap in CheckpointInfoResponse. Signed-off-by: Marc Handalian <handalm@amazon.com> * Refactor segmentReplicationDiff method to compute off two maps instead of MetadataSnapshots. Signed-off-by: Marc Handalian <handalm@amazon.com> * Fix spotless. Signed-off-by: Marc Handalian <handalm@amazon.com> * Revert catchall in SegmentReplicationSourceService. Signed-off-by: Marc Handalian <handalm@amazon.com> * Revert log lvl change. Signed-off-by: Marc Handalian <handalm@amazon.com> * Fix SegmentReplicationTargetTests Signed-off-by: Marc Handalian <handalm@amazon.com> * Cleanup unused logger. Signed-off-by: Marc Handalian <handalm@amazon.com> Signed-off-by: Marc Handalian <handalm@amazon.com> Co-authored-by: Suraj Singh <surajrider@gmail.com>
1 parent 5985955 commit ebed94f

16 files changed

Lines changed: 301 additions & 230 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
4747
- Restore using the class ClusterInfoRequest and ClusterInfoRequestBuilder from package 'org.opensearch.action.support.master.info' for subclasses ([#4324](https://github.com/opensearch-project/OpenSearch/pull/4324))
4848
- Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225))
4949
- [Segment Replication] Bump segment infos counter before commit during replica promotion ([#4365](https://github.com/opensearch-project/OpenSearch/pull/4365))
50+
- [Segment Replication] Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366))
5051

5152
### Security
5253

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.opensearch.indices.replication;
1010

1111
import com.carrotsearch.randomizedtesting.RandomizedTest;
12-
import org.apache.lucene.index.SegmentInfos;
1312
import org.junit.BeforeClass;
1413
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
1514
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
@@ -458,13 +457,56 @@ private void assertSegmentStats(int numberOfReplicas) throws IOException {
458457
ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState();
459458
final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId());
460459
IndexShard indexShard = getIndexShard(replicaNode.getName());
461-
final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory());
462460
// calls to readCommit will fail if a valid commit point and all its segments are not in the store.
463-
SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName);
461+
indexShard.store().readLastCommittedSegmentsInfo();
464462
}
465463
}
466464
}
467465

466+
public void testDropPrimaryDuringReplication() throws Exception {
467+
final Settings settings = Settings.builder()
468+
.put(indexSettings())
469+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6)
470+
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
471+
.build();
472+
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
473+
final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY);
474+
createIndex(INDEX_NAME, settings);
475+
internalCluster().startDataOnlyNodes(6);
476+
ensureGreen(INDEX_NAME);
477+
478+
int initialDocCount = scaledRandomIntBetween(100, 200);
479+
try (
480+
BackgroundIndexer indexer = new BackgroundIndexer(
481+
INDEX_NAME,
482+
"_doc",
483+
client(),
484+
-1,
485+
RandomizedTest.scaledRandomIntBetween(2, 5),
486+
false,
487+
random()
488+
)
489+
) {
490+
indexer.start(initialDocCount);
491+
waitForDocs(initialDocCount, indexer);
492+
refresh(INDEX_NAME);
493+
// don't wait for replication to complete, stop the primary immediately.
494+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
495+
ensureYellow(INDEX_NAME);
496+
497+
// start another replica.
498+
internalCluster().startDataOnlyNode();
499+
ensureGreen(INDEX_NAME);
500+
501+
// index another doc and refresh - without this the new replica won't catch up.
502+
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get();
503+
504+
flushAndRefresh(INDEX_NAME);
505+
waitForReplicaUpdate();
506+
assertSegmentStats(6);
507+
}
508+
}
509+
468510
/**
469511
* Waits until the replica is caught up to the latest primary segments gen.
470512
* @throws Exception
@@ -483,10 +525,12 @@ private void waitForReplicaUpdate() throws Exception {
483525
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);
484526
// if we don't have any segments yet, proceed.
485527
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
528+
logger.debug("Primary Segments: {}", primaryShardSegments.getSegments());
486529
if (primaryShardSegments.getSegments().isEmpty() == false) {
487530
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
488531
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
489532
for (ShardSegments shardSegments : replicaShardSegments) {
533+
logger.debug("Replica {} Segments: {}", shardSegments.getShardRouting(), shardSegments.getSegments());
490534
final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments()
491535
.stream()
492536
.anyMatch(segment -> segment.getGeneration() == latestPrimaryGen);

server/src/main/java/org/opensearch/index/store/Store.java

Lines changed: 68 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
import java.nio.file.NoSuchFileException;
106106
import java.nio.file.Path;
107107
import java.util.ArrayList;
108+
import java.util.Collection;
108109
import java.util.Collections;
109110
import java.util.HashMap;
110111
import java.util.Iterator;
@@ -122,6 +123,7 @@
122123
import static java.util.Collections.emptyMap;
123124
import static java.util.Collections.unmodifiableMap;
124125
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
126+
import static org.opensearch.index.store.Store.MetadataSnapshot.loadMetadata;
125127

126128
/**
127129
* A Store provides plain access to files written by an opensearch index shard. Each shard
@@ -334,6 +336,51 @@ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOExceptio
334336
return new MetadataSnapshot(segmentInfos, directory, logger);
335337
}
336338

339+
/**
340+
* Segment Replication method - Fetch a map of StoreFileMetadata for segments, ignoring Segment_N files.
341+
* @param segmentInfos {@link SegmentInfos} from which to compute metadata.
342+
* @return {@link Map} map file name to {@link StoreFileMetadata}.
343+
*/
344+
public Map<String, StoreFileMetadata> getSegmentMetadataMap(SegmentInfos segmentInfos) throws IOException {
345+
assert indexSettings.isSegRepEnabled();
346+
return loadMetadata(segmentInfos, directory, logger, true).fileMetadata;
347+
}
348+
349+
/**
350+
* Segment Replication method
351+
* Returns a diff between the Maps of StoreFileMetadata that can be used for getting list of files to copy over to a replica for segment replication. The returned diff will hold a list of files that are:
352+
* <ul>
353+
* <li>identical: they exist in both maps and they can be considered the same ie. they don't need to be recovered</li>
354+
* <li>different: they exist in both maps but their they are not identical</li>
355+
* <li>missing: files that exist in the source but not in the target</li>
356+
* </ul>
357+
*/
358+
public static RecoveryDiff segmentReplicationDiff(Map<String, StoreFileMetadata> source, Map<String, StoreFileMetadata> target) {
359+
final List<StoreFileMetadata> identical = new ArrayList<>();
360+
final List<StoreFileMetadata> different = new ArrayList<>();
361+
final List<StoreFileMetadata> missing = new ArrayList<>();
362+
for (StoreFileMetadata value : source.values()) {
363+
if (value.name().startsWith(IndexFileNames.SEGMENTS)) {
364+
continue;
365+
}
366+
if (target.containsKey(value.name()) == false) {
367+
missing.add(value);
368+
} else {
369+
final StoreFileMetadata fileMetadata = target.get(value.name());
370+
if (fileMetadata.isSame(value)) {
371+
identical.add(value);
372+
} else {
373+
different.add(value);
374+
}
375+
}
376+
}
377+
return new RecoveryDiff(
378+
Collections.unmodifiableList(identical),
379+
Collections.unmodifiableList(different),
380+
Collections.unmodifiableList(missing)
381+
);
382+
}
383+
337384
/**
338385
* Renames all the given files from the key of the map to the
339386
* value of the map. All successfully renamed files are removed from the map in-place.
@@ -709,31 +756,34 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
709756
}
710757

711758
/**
712-
* This method deletes every file in this store that is not contained in either the remote or local metadata snapshots.
759+
* Segment Replication method -
760+
* This method deletes every file in this store that is not referenced by the passed in SegmentInfos or
761+
* part of the latest on-disk commit point.
713762
* This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file.
714763
* In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk.
715764
* @param reason the reason for this cleanup operation logged for each deleted file
716-
* @param localSnapshot The local snapshot from in memory SegmentInfos.
765+
* @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present.
717766
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
718767
*/
719-
public void cleanupAndPreserveLatestCommitPoint(String reason, MetadataSnapshot localSnapshot) throws IOException {
768+
public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException {
769+
assert indexSettings.isSegRepEnabled();
720770
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
721771
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
722772
metadataLock.writeLock().lock();
723773
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
724-
cleanupFiles(reason, localSnapshot, getMetadata(readLastCommittedSegmentsInfo()));
774+
cleanupFiles(reason, getMetadata(readLastCommittedSegmentsInfo()), infos.files(true));
725775
} finally {
726776
metadataLock.writeLock().unlock();
727777
}
728778
}
729779

730-
private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable MetadataSnapshot additionalSnapshot)
780+
private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable Collection<String> additionalFiles)
731781
throws IOException {
732782
assert metadataLock.isWriteLockedByCurrentThread();
733783
for (String existingFile : directory.listAll()) {
734784
if (Store.isAutogenerated(existingFile)
735785
|| localSnapshot.contains(existingFile)
736-
|| (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) {
786+
|| (additionalFiles != null && additionalFiles.contains(existingFile))) {
737787
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
738788
// checksum)
739789
continue;
@@ -825,17 +875,9 @@ public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, l
825875
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
826876
latestSegmentInfos.setUserData(userData, true);
827877
latestSegmentInfos.commit(directory());
828-
829-
// similar to TrimUnsafeCommits, create a commit with an appending IW, this will delete old commits and ensure all files
830-
// associated with the SegmentInfos.commit are fsynced.
831-
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(directory);
832-
assert existingCommits.isEmpty() == false : "Expected at least one commit but none found";
833-
final IndexCommit lastIndexCommit = existingCommits.get(existingCommits.size() - 1);
834-
assert latestSegmentInfos.getSegmentsFileName().equals(lastIndexCommit.getSegmentsFileName());
835-
try (IndexWriter writer = newAppendingIndexWriter(directory, lastIndexCommit)) {
836-
writer.setLiveCommitData(lastIndexCommit.getUserData().entrySet());
837-
writer.commit();
838-
}
878+
directory.sync(latestSegmentInfos.files(true));
879+
directory.syncMetaData();
880+
cleanupAndPreserveLatestCommitPoint("After commit", latestSegmentInfos);
839881
} finally {
840882
metadataLock.writeLock().unlock();
841883
}
@@ -1033,6 +1075,11 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
10331075
}
10341076

10351077
static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
1078+
return loadMetadata(segmentInfos, directory, logger, false);
1079+
}
1080+
1081+
static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger, boolean ignoreSegmentsFile)
1082+
throws IOException {
10361083
long numDocs = Lucene.getNumDocs(segmentInfos);
10371084
Map<String, String> commitUserDataBuilder = new HashMap<>();
10381085
commitUserDataBuilder.putAll(segmentInfos.getUserData());
@@ -1067,8 +1114,10 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director
10671114
if (maxVersion == null) {
10681115
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
10691116
}
1070-
final String segmentsFile = segmentInfos.getSegmentsFileName();
1071-
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
1117+
if (ignoreSegmentsFile == false) {
1118+
final String segmentsFile = segmentInfos.getSegmentsFileName();
1119+
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
1120+
}
10721121
return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs);
10731122
}
10741123

@@ -1148,7 +1197,6 @@ public Map<String, StoreFileMetadata> asMap() {
11481197
* Helper method used to group store files according to segment and commit.
11491198
*
11501199
* @see MetadataSnapshot#recoveryDiff(MetadataSnapshot)
1151-
* @see MetadataSnapshot#segmentReplicationDiff(MetadataSnapshot)
11521200
*/
11531201
private Iterable<List<StoreFileMetadata>> getGroupedFilesIterable() {
11541202
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
@@ -1241,51 +1289,6 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
12411289
return recoveryDiff;
12421290
}
12431291

1244-
/**
1245-
* Segment Replication method
1246-
* Returns a diff between the two snapshots that can be used for getting list of files to copy over to a replica for segment replication. The given snapshot is treated as the
1247-
* target and this snapshot as the source. The returned diff will hold a list of files that are:
1248-
* <ul>
1249-
* <li>identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered</li>
1250-
* <li>different: they exist in both snapshots but their they are not identical</li>
1251-
* <li>missing: files that exist in the source but not in the target</li>
1252-
* </ul>
1253-
*/
1254-
public RecoveryDiff segmentReplicationDiff(MetadataSnapshot recoveryTargetSnapshot) {
1255-
final List<StoreFileMetadata> identical = new ArrayList<>();
1256-
final List<StoreFileMetadata> different = new ArrayList<>();
1257-
final List<StoreFileMetadata> missing = new ArrayList<>();
1258-
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
1259-
for (List<StoreFileMetadata> segmentFiles : getGroupedFilesIterable()) {
1260-
identicalFiles.clear();
1261-
boolean consistent = true;
1262-
for (StoreFileMetadata meta : segmentFiles) {
1263-
StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name());
1264-
if (storeFileMetadata == null) {
1265-
// Do not consider missing files as inconsistent in SegRep as replicas may lag while primary updates
1266-
// documents and generate new files specific to a segment
1267-
missing.add(meta);
1268-
} else if (storeFileMetadata.isSame(meta) == false) {
1269-
consistent = false;
1270-
different.add(meta);
1271-
} else {
1272-
identicalFiles.add(meta);
1273-
}
1274-
}
1275-
if (consistent) {
1276-
identical.addAll(identicalFiles);
1277-
} else {
1278-
different.addAll(identicalFiles);
1279-
}
1280-
}
1281-
RecoveryDiff recoveryDiff = new RecoveryDiff(
1282-
Collections.unmodifiableList(identical),
1283-
Collections.unmodifiableList(different),
1284-
Collections.unmodifiableList(missing)
1285-
);
1286-
return recoveryDiff;
1287-
}
1288-
12891292
/**
12901293
* Returns the number of files in this snapshot
12911294
*/

server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,12 @@
1010

1111
import org.opensearch.common.io.stream.StreamInput;
1212
import org.opensearch.common.io.stream.StreamOutput;
13-
import org.opensearch.index.store.Store;
1413
import org.opensearch.index.store.StoreFileMetadata;
1514
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
1615
import org.opensearch.transport.TransportResponse;
1716

1817
import java.io.IOException;
19-
import java.util.Set;
18+
import java.util.Map;
2019

2120
/**
2221
* Response returned from a {@link SegmentReplicationSource} that includes the file metadata, and SegmentInfos
@@ -28,52 +27,41 @@
2827
public class CheckpointInfoResponse extends TransportResponse {
2928

3029
private final ReplicationCheckpoint checkpoint;
31-
private final Store.MetadataSnapshot snapshot;
30+
private final Map<String, StoreFileMetadata> metadataMap;
3231
private final byte[] infosBytes;
33-
// pendingDeleteFiles are segments that have been merged away in the latest in memory SegmentInfos
34-
// but are still referenced by the latest commit point (Segments_N).
35-
private final Set<StoreFileMetadata> pendingDeleteFiles;
3632

3733
public CheckpointInfoResponse(
3834
final ReplicationCheckpoint checkpoint,
39-
final Store.MetadataSnapshot snapshot,
40-
final byte[] infosBytes,
41-
final Set<StoreFileMetadata> additionalFiles
35+
final Map<String, StoreFileMetadata> metadataMap,
36+
final byte[] infosBytes
4237
) {
4338
this.checkpoint = checkpoint;
44-
this.snapshot = snapshot;
39+
this.metadataMap = metadataMap;
4540
this.infosBytes = infosBytes;
46-
this.pendingDeleteFiles = additionalFiles;
4741
}
4842

4943
public CheckpointInfoResponse(StreamInput in) throws IOException {
5044
this.checkpoint = new ReplicationCheckpoint(in);
51-
this.snapshot = new Store.MetadataSnapshot(in);
45+
this.metadataMap = in.readMap(StreamInput::readString, StoreFileMetadata::new);
5246
this.infosBytes = in.readByteArray();
53-
this.pendingDeleteFiles = in.readSet(StoreFileMetadata::new);
5447
}
5548

5649
@Override
5750
public void writeTo(StreamOutput out) throws IOException {
5851
checkpoint.writeTo(out);
59-
snapshot.writeTo(out);
52+
out.writeMap(metadataMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
6053
out.writeByteArray(infosBytes);
61-
out.writeCollection(pendingDeleteFiles);
6254
}
6355

6456
public ReplicationCheckpoint getCheckpoint() {
6557
return checkpoint;
6658
}
6759

68-
public Store.MetadataSnapshot getSnapshot() {
69-
return snapshot;
60+
public Map<String, StoreFileMetadata> getMetadataMap() {
61+
return metadataMap;
7062
}
7163

7264
public byte[] getInfosBytes() {
7365
return infosBytes;
7466
}
75-
76-
public Set<StoreFileMetadata> getPendingDeleteFiles() {
77-
return pendingDeleteFiles;
78-
}
7967
}

0 commit comments

Comments
 (0)