Skip to content

Commit 1f3818e

Browse files
[Remote Store] Fix stats reporting for multistream downloads. (#10402)
* Fix stats reporting for multistream downloads. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * rename tracker to fileTransferTracker. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> --------- Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> (cherry picked from commit a3f8432) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent fea095d commit 1f3818e

4 files changed

Lines changed: 45 additions & 14 deletions

File tree

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@
162162
import org.opensearch.index.seqno.SequenceNumbers;
163163
import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
164164
import org.opensearch.index.similarity.SimilarityService;
165+
import org.opensearch.index.store.DirectoryFileTransferTracker;
165166
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
166167
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
167168
import org.opensearch.index.store.Store;
@@ -4938,9 +4939,10 @@ private void downloadSegments(
49384939
final Runnable onFileSync
49394940
) throws IOException {
49404941
final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex();
4942+
final DirectoryFileTransferTracker fileTransferTracker = store.getDirectoryFileTransferTracker();
49414943
for (String segment : toDownloadSegments) {
49424944
final PlainActionFuture<String> segmentListener = PlainActionFuture.newFuture();
4943-
sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, segmentListener);
4945+
sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, fileTransferTracker, segmentListener);
49444946
segmentListener.actionGet();
49454947
onFileSync.run();
49464948
if (targetRemoteDirectory != null) {

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -487,30 +487,51 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen
487487
* @param source The source file name
488488
* @param destinationDirectory The destination directory (if multipart is not supported)
489489
* @param destinationPath The destination path (if multipart is supported)
490+
* @param fileTransferTracker Tracker used for file transfer stats
490491
* @param fileCompletionListener The listener to notify of completion
491492
*/
492-
public void copyTo(String source, Directory destinationDirectory, Path destinationPath, ActionListener<String> fileCompletionListener) {
493+
public void copyTo(
494+
String source,
495+
Directory destinationDirectory,
496+
Path destinationPath,
497+
DirectoryFileTransferTracker fileTransferTracker,
498+
ActionListener<String> fileCompletionListener
499+
) {
493500
final String blobName = getExistingRemoteFilename(source);
494501
if (destinationPath != null && remoteDataDirectory.getBlobContainer() instanceof AsyncMultiStreamBlobContainer) {
502+
long length = 0L;
503+
try {
504+
length = fileLength(source);
505+
} catch (IOException ex) {
506+
logger.error("Unable to fetch segment length for stats tracking", ex);
507+
}
508+
final long fileLength = length;
509+
final long startTime = System.currentTimeMillis();
510+
fileTransferTracker.addTransferredBytesStarted(fileLength);
495511
final AsyncMultiStreamBlobContainer blobContainer = (AsyncMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer();
496512
final Path destinationFilePath = destinationPath.resolve(source);
513+
final ActionListener<String> completionListener = ActionListener.wrap(response -> {
514+
fileTransferTracker.addTransferredBytesSucceeded(fileLength, startTime);
515+
fileCompletionListener.onResponse(response);
516+
}, e -> {
517+
fileTransferTracker.addTransferredBytesFailed(fileLength, startTime);
518+
fileCompletionListener.onFailure(e);
519+
});
497520
final ReadContextListener readContextListener = new ReadContextListener(
498521
blobName,
499522
destinationFilePath,
500-
fileCompletionListener,
523+
completionListener,
501524
threadPool,
502525
remoteDataDirectory.getDownloadRateLimiter(),
503526
recoverySettings.getMaxConcurrentRemoteStoreStreams()
504527
);
505528
blobContainer.readBlobAsync(blobName, readContextListener);
506529
} else {
507530
// Fallback to older mechanism of downloading the file
508-
try {
531+
ActionListener.completeWith(fileCompletionListener, () -> {
509532
destinationDirectory.copyFrom(this, source, source, IOContext.DEFAULT);
510-
fileCompletionListener.onResponse(source);
511-
} catch (IOException e) {
512-
fileCompletionListener.onFailure(e);
513-
}
533+
return source;
534+
});
514535
}
515536
}
516537

server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.opensearch.index.shard.IndexShard;
2121
import org.opensearch.index.shard.IndexShardState;
2222
import org.opensearch.index.shard.ShardPath;
23+
import org.opensearch.index.store.DirectoryFileTransferTracker;
2324
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
2425
import org.opensearch.index.store.Store;
2526
import org.opensearch.index.store.StoreFileMetadata;
@@ -121,7 +122,8 @@ public void getSegmentFiles(
121122
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
122123
toDownloadSegments.add(fileMetadata);
123124
}
124-
downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, listener);
125+
final DirectoryFileTransferTracker fileTransferTracker = indexShard.store().getDirectoryFileTransferTracker();
126+
downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, fileTransferTracker, listener);
125127
logger.debug("Downloaded segment files from remote store {}", toDownloadSegments);
126128
} finally {
127129
indexShard.store().decRef();
@@ -138,12 +140,13 @@ private void downloadSegments(
138140
RemoteSegmentStoreDirectory remoteStoreDirectory,
139141
List<StoreFileMetadata> toDownloadSegments,
140142
ShardPath shardPath,
143+
DirectoryFileTransferTracker fileTransferTracker,
141144
ActionListener<GetSegmentFilesResponse> completionListener
142145
) {
143146
final Path indexPath = shardPath == null ? null : shardPath.resolveIndex();
144147
for (StoreFileMetadata storeFileMetadata : toDownloadSegments) {
145148
final PlainActionFuture<String> segmentListener = PlainActionFuture.newFuture();
146-
remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, segmentListener);
149+
remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, fileTransferTracker, segmentListener);
147150
segmentListener.actionGet();
148151
}
149152
completionListener.onResponse(new GetSegmentFilesResponse(toDownloadSegments));

server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -596,10 +596,15 @@ public void onResponse(String unused) {
596596
public void onFailure(Exception e) {}
597597
};
598598
Path path = createTempDir();
599-
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener);
599+
DirectoryFileTransferTracker directoryFileTransferTracker = new DirectoryFileTransferTracker();
600+
long sourceFileLengthInBytes = remoteSegmentStoreDirectory.fileLength(filename);
601+
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, directoryFileTransferTracker, completionListener);
600602
assertTrue(downloadLatch.await(5000, TimeUnit.SECONDS));
601603
verify(blobContainer, times(1)).readBlobAsync(contains(filename), any());
602604
verify(storeDirectory, times(0)).copyFrom(any(), any(), any(), any());
605+
606+
// Verify stats are updated to DirectoryFileTransferTracker
607+
assertEquals(sourceFileLengthInBytes, directoryFileTransferTracker.getTransferredBytesSucceeded());
603608
}
604609

605610
public void testCopyFilesTo() throws Exception {
@@ -619,7 +624,7 @@ public void onResponse(String unused) {
619624
public void onFailure(Exception e) {}
620625
};
621626
Path path = createTempDir();
622-
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener);
627+
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener);
623628
assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS));
624629
verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT));
625630
}
@@ -643,7 +648,7 @@ public void onResponse(String unused) {
643648
@Override
644649
public void onFailure(Exception e) {}
645650
};
646-
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, completionListener);
651+
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, new DirectoryFileTransferTracker(), completionListener);
647652
assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS));
648653
verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT));
649654
}
@@ -670,7 +675,7 @@ public void onFailure(Exception e) {
670675
}
671676
};
672677
Path path = createTempDir();
673-
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener);
678+
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener);
674679
assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS));
675680
verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT));
676681
}

0 commit comments

Comments
 (0)