Skip to content

Commit acf8aa1

Browse files
AdiiigoAditi Goyal
andauthored
Changes for the Upload Workflow for merged segments (opensearch-project#18610)
* Changes for the Upload Workflow for merged segments Signed-off-by: Aditi Goyal <adgoyal@amazon.com> * Addressed Todos Signed-off-by: Aditi Goyal <adgoyal@amazon.com> * Added Test Cases Signed-off-by: Aditi Goyal <adgoyal@amazon.com> * Removed the usage of Forbidden APIs Signed-off-by: Aditi Goyal <adgoyal@amazon.com> * Updated the code to include priority upload as paramter Signed-off-by: Aditi Goyal <adgoyal@amazon.com> * Updated the remote listener to use remote upploader interface Signed-off-by: Aditi Goyal <adgoyal@amazon.com> * Added JavaDocs across the changes Signed-off-by: Aditi Goyal <adgoyal@amazon.com> * Addressed spotless check Signed-off-by: Aditi Goyal <adgoyal@amazon.com> * Dummy commit(can be reverted) Signed-off-by: Aditi Goyal <adgoyal@amazon.com> * Revert "Dummy commit(can be reverted)" This reverts commit e542b99. Signed-off-by: Aditi Goyal <adgoyal@amazon.com> --------- Signed-off-by: Aditi Goyal <adgoyal@amazon.com> Co-authored-by: Aditi Goyal <adgoyal@amazon.com>
1 parent d6c857f commit acf8aa1

4 files changed

Lines changed: 625 additions & 43 deletions

File tree

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

Lines changed: 29 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,14 @@
99
package org.opensearch.index.shard;
1010

1111
import org.apache.logging.log4j.Logger;
12-
import org.apache.logging.log4j.message.ParameterizedMessage;
1312
import org.apache.lucene.codecs.CodecUtil;
14-
import org.apache.lucene.index.CorruptIndexException;
1513
import org.apache.lucene.index.SegmentInfos;
1614
import org.apache.lucene.store.Directory;
1715
import org.apache.lucene.store.FilterDirectory;
1816
import org.apache.lucene.store.IOContext;
1917
import org.apache.lucene.store.IndexInput;
2018
import org.opensearch.action.LatchedActionListener;
2119
import org.opensearch.action.bulk.BackoffPolicy;
22-
import org.opensearch.action.support.GroupedActionListener;
2320
import org.opensearch.cluster.routing.RecoverySource;
2421
import org.opensearch.common.concurrent.GatedCloseable;
2522
import org.opensearch.common.logging.Loggers;
@@ -30,7 +27,6 @@
3027
import org.opensearch.index.engine.InternalEngine;
3128
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
3229
import org.opensearch.index.seqno.SequenceNumbers;
33-
import org.opensearch.index.store.CompositeDirectory;
3430
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
3531
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
3632
import org.opensearch.index.translog.Translog;
@@ -49,6 +45,7 @@
4945
import java.util.concurrent.CountDownLatch;
5046
import java.util.concurrent.TimeUnit;
5147
import java.util.concurrent.atomic.AtomicBoolean;
48+
import java.util.function.Function;
5249
import java.util.stream.Collectors;
5350

5451
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
@@ -93,6 +90,7 @@ public final class RemoteStoreRefreshListener extends ReleasableRetryableRefresh
9390
private volatile Iterator<TimeValue> backoffDelayIterator;
9491
private final SegmentReplicationCheckpointPublisher checkpointPublisher;
9592
private final RemoteStoreSettings remoteStoreSettings;
93+
private final RemoteStoreUploader remoteStoreUploader;
9694

9795
public RemoteStoreRefreshListener(
9896
IndexShard indexShard,
@@ -106,6 +104,7 @@ public RemoteStoreRefreshListener(
106104
this.storeDirectory = indexShard.store().directory();
107105
this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
108106
.getDelegate()).getDelegate();
107+
remoteStoreUploader = new RemoteStoreUploaderService(indexShard, storeDirectory, remoteDirectory);
109108
localSegmentChecksumMap = new HashMap<>();
110109
RemoteSegmentMetadata remoteSegmentMetadata = null;
111110
if (indexShard.routingEntry().primary()) {
@@ -324,6 +323,32 @@ public void onFailure(Exception e) {
324323
return successful.get();
325324
}
326325

326+
/**
327+
* Uploads new segment files to the remote store.
328+
*
329+
* @param localSegmentsPostRefresh collection of segment files present after refresh
330+
* @param localSegmentsSizeMap map of segment file names to their sizes
331+
* @param segmentUploadsCompletedListener listener to be notified when upload completes
332+
*/
333+
private void uploadNewSegments(
334+
Collection<String> localSegmentsPostRefresh,
335+
Map<String, Long> localSegmentsSizeMap,
336+
ActionListener<Void> segmentUploadsCompletedListener
337+
) {
338+
Collection<String> filteredFiles = localSegmentsPostRefresh.stream().filter(file -> !skipUpload(file)).collect(Collectors.toList());
339+
Function<Map<String, Long>, UploadListener> uploadListenerFunction = (Map<String, Long> sizeMap) -> createUploadListener(
340+
localSegmentsSizeMap
341+
);
342+
343+
remoteStoreUploader.uploadSegments(
344+
filteredFiles,
345+
localSegmentsSizeMap,
346+
segmentUploadsCompletedListener,
347+
uploadListenerFunction,
348+
isLowPriorityUpload()
349+
);
350+
}
351+
327352
/**
328353
* Clears the stale files from the latest local segment checksum map.
329354
*
@@ -424,45 +449,6 @@ void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos se
424449
}
425450
}
426451

427-
private void uploadNewSegments(
428-
Collection<String> localSegmentsPostRefresh,
429-
Map<String, Long> localSegmentsSizeMap,
430-
ActionListener<Void> listener
431-
) {
432-
Collection<String> filteredFiles = localSegmentsPostRefresh.stream().filter(file -> !skipUpload(file)).collect(Collectors.toList());
433-
if (filteredFiles.size() == 0) {
434-
logger.debug("No new segments to upload in uploadNewSegments");
435-
listener.onResponse(null);
436-
return;
437-
}
438-
439-
logger.debug("Effective new segments files to upload {}", filteredFiles);
440-
ActionListener<Collection<Void>> mappedListener = ActionListener.map(listener, resp -> null);
441-
GroupedActionListener<Void> batchUploadListener = new GroupedActionListener<>(mappedListener, filteredFiles.size());
442-
Directory directory = ((FilterDirectory) (((FilterDirectory) storeDirectory).getDelegate())).getDelegate();
443-
444-
for (String src : filteredFiles) {
445-
// Initializing listener here to ensure that the stats increment operations are thread-safe
446-
UploadListener statsListener = createUploadListener(localSegmentsSizeMap);
447-
ActionListener<Void> aggregatedListener = ActionListener.wrap(resp -> {
448-
statsListener.onSuccess(src);
449-
batchUploadListener.onResponse(resp);
450-
if (directory instanceof CompositeDirectory) {
451-
((CompositeDirectory) directory).afterSyncToRemote(src);
452-
}
453-
}, ex -> {
454-
logger.warn(() -> new ParameterizedMessage("Exception: [{}] while uploading segment files", ex), ex);
455-
if (ex instanceof CorruptIndexException) {
456-
indexShard.failShard(ex.getMessage(), ex);
457-
}
458-
statsListener.onFailure(src);
459-
batchUploadListener.onFailure(ex);
460-
});
461-
statsListener.beforeUpload(src);
462-
remoteDirectory.copyFrom(storeDirectory, src, IOContext.DEFAULT, aggregatedListener, isLowPriorityUpload());
463-
}
464-
}
465-
466452
boolean isLowPriorityUpload() {
467453
return isLocalOrSnapshotRecoveryOrSeeding();
468454
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.shard;
10+
11+
import org.opensearch.common.util.UploadListener;
12+
import org.opensearch.core.action.ActionListener;
13+
14+
import java.util.Collection;
15+
import java.util.Map;
16+
import java.util.function.Function;
17+
18+
/**
19+
* Interface to handle the functionality for uploading data in the remote store
20+
*/
21+
public interface RemoteStoreUploader {
22+
23+
void uploadSegments(
24+
Collection<String> localSegments,
25+
Map<String, Long> localSegmentsSizeMap,
26+
ActionListener<Void> listener,
27+
Function<Map<String, Long>, UploadListener> uploadListenerFunction,
28+
boolean isLowPriorityUpload
29+
);
30+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.shard;
10+
11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.logging.log4j.message.ParameterizedMessage;
13+
import org.apache.lucene.index.CorruptIndexException;
14+
import org.apache.lucene.store.Directory;
15+
import org.apache.lucene.store.FilterDirectory;
16+
import org.apache.lucene.store.IOContext;
17+
import org.opensearch.action.support.GroupedActionListener;
18+
import org.opensearch.common.logging.Loggers;
19+
import org.opensearch.common.util.UploadListener;
20+
import org.opensearch.core.action.ActionListener;
21+
import org.opensearch.index.store.CompositeDirectory;
22+
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
23+
24+
import java.util.Collection;
25+
import java.util.Map;
26+
import java.util.function.Function;
27+
28+
/**
29+
* The service essentially acts as a bridge between local segment storage and remote storage,
30+
* ensuring efficient and reliable segment synchronization while providing comprehensive monitoring and error handling.
31+
*/
32+
public class RemoteStoreUploaderService implements RemoteStoreUploader {
33+
34+
private final Logger logger;
35+
36+
private final IndexShard indexShard;
37+
private final Directory storeDirectory;
38+
private final RemoteSegmentStoreDirectory remoteDirectory;
39+
40+
public RemoteStoreUploaderService(IndexShard indexShard, Directory storeDirectory, RemoteSegmentStoreDirectory remoteDirectory) {
41+
logger = Loggers.getLogger(getClass(), indexShard.shardId());
42+
this.indexShard = indexShard;
43+
this.storeDirectory = storeDirectory;
44+
this.remoteDirectory = remoteDirectory;
45+
}
46+
47+
@Override
48+
public void uploadSegments(
49+
Collection<String> localSegments,
50+
Map<String, Long> localSegmentsSizeMap,
51+
ActionListener<Void> listener,
52+
Function<Map<String, Long>, UploadListener> uploadListenerFunction,
53+
boolean isLowPriorityUpload
54+
) {
55+
if (localSegments.isEmpty()) {
56+
logger.debug("No new segments to upload in uploadNewSegments");
57+
listener.onResponse(null);
58+
return;
59+
}
60+
61+
logger.debug("Effective new segments files to upload {}", localSegments);
62+
ActionListener<Collection<Void>> mappedListener = ActionListener.map(listener, resp -> null);
63+
GroupedActionListener<Void> batchUploadListener = new GroupedActionListener<>(mappedListener, localSegments.size());
64+
Directory directory = ((FilterDirectory) (((FilterDirectory) storeDirectory).getDelegate())).getDelegate();
65+
66+
for (String localSegment : localSegments) {
67+
// Initializing listener here to ensure that the stats increment operations are thread-safe
68+
UploadListener statsListener = uploadListenerFunction.apply(localSegmentsSizeMap);
69+
ActionListener<Void> aggregatedListener = ActionListener.wrap(resp -> {
70+
statsListener.onSuccess(localSegment);
71+
batchUploadListener.onResponse(resp);
72+
// Once uploaded to Remote, local files become eligible for eviction from FileCache
73+
if (directory instanceof CompositeDirectory) {
74+
((CompositeDirectory) directory).afterSyncToRemote(localSegment);
75+
}
76+
}, ex -> {
77+
logger.warn(() -> new ParameterizedMessage("Exception: [{}] while uploading segment files", ex), ex);
78+
if (ex instanceof CorruptIndexException) {
79+
indexShard.failShard(ex.getMessage(), ex);
80+
}
81+
statsListener.onFailure(localSegment);
82+
batchUploadListener.onFailure(ex);
83+
});
84+
statsListener.beforeUpload(localSegment);
85+
// Place where the actual upload is happening
86+
remoteDirectory.copyFrom(storeDirectory, localSegment, IOContext.DEFAULT, aggregatedListener, isLowPriorityUpload);
87+
}
88+
}
89+
}

0 commit comments

Comments
 (0)