Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))
- Detect breaking changes on pull requests ([#9044](https://github.com/opensearch-project/OpenSearch/pull/9044))
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))
- [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,8 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,

RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4969,7 +4969,7 @@ public void deleteTranslogFilesFromRemoteTranslog() throws IOException {
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
RemoteFsTranslog.cleanup(repository, shardId, getThreadPool(), indexSettings.getRemoteStorePathStrategy());
RemoteFsTranslog.cleanup(repository, shardId, getThreadPool(), indexSettings.getRemoteStorePathStrategy(), remoteStoreSettings);
}

/*
Expand All @@ -4992,6 +4992,7 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
getThreadPool(),
shardPath().resolveTranslog(),
indexSettings.getRemoteStorePathStrategy(),
remoteStoreSettings,
logger
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog;

import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
Expand All @@ -34,11 +35,14 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory {

private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;

private final RemoteStoreSettings remoteStoreSettings;

public RemoteBlobStoreInternalTranslogFactory(
Supplier<RepositoriesService> repositoriesServiceSupplier,
ThreadPool threadPool,
String repositoryName,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings
) {
Repository repository;
try {
Expand All @@ -49,6 +53,7 @@ public RemoteBlobStoreInternalTranslogFactory(
this.repository = repository;
this.threadPool = threadPool;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
this.remoteStoreSettings = remoteStoreSettings;
}

@Override
Expand All @@ -74,7 +79,8 @@ public Translog newTranslog(
blobStoreRepository,
threadPool,
startedPrimarySupplier,
remoteTranslogTransferTracker
remoteTranslogTransferTracker,
remoteStoreSettings
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -100,7 +101,8 @@ public RemoteFsTranslog(
BlobStoreRepository blobStoreRepository,
ThreadPool threadPool,
BooleanSupplier startedPrimarySupplier,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
logger = Loggers.getLogger(getClass(), shardId);
Expand All @@ -113,7 +115,8 @@ public RemoteFsTranslog(
shardId,
fileTransferTracker,
remoteTranslogTransferTracker,
indexSettings().getRemoteStorePathStrategy()
indexSettings().getRemoteStorePathStrategy(),
remoteStoreSettings
);
try {
download(translogTransferManager, location, logger);
Expand Down Expand Up @@ -163,6 +166,7 @@ public static void download(
ThreadPool threadPool,
Path location,
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings,
Logger logger
) throws IOException {
assert repository instanceof BlobStoreRepository : String.format(
Expand All @@ -181,7 +185,8 @@ public static void download(
shardId,
fileTransferTracker,
remoteTranslogTransferTracker,
pathStrategy
pathStrategy,
remoteStoreSettings
);
RemoteFsTranslog.download(translogTransferManager, location, logger);
logger.trace(remoteTranslogTransferTracker.toString());
Expand Down Expand Up @@ -259,7 +264,8 @@ public static TranslogTransferManager buildTranslogTransferManager(
ShardId shardId,
FileTransferTracker fileTransferTracker,
RemoteTranslogTransferTracker tracker,
RemoteStorePathStrategy pathStrategy
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings
) {
assert Objects.nonNull(pathStrategy);
String indexUUID = shardId.getIndex().getUUID();
Expand All @@ -281,7 +287,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
.build();
BlobPath mdPath = pathStrategy.generatePath(mdPathInput);
BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool);
return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker);
return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker, remoteStoreSettings);
}

@Override
Expand Down Expand Up @@ -553,8 +559,13 @@ private void deleteStaleRemotePrimaryTerms() {
}
}

public static void cleanup(Repository repository, ShardId shardId, ThreadPool threadPool, RemoteStorePathStrategy pathStrategy)
throws IOException {
public static void cleanup(
Repository repository,
ShardId shardId,
ThreadPool threadPool,
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings
) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
// We use a dummy stats tracker to ensure the flow doesn't break.
Expand All @@ -567,7 +578,8 @@ public static void cleanup(Repository repository, ShardId shardId, ThreadPool th
shardId,
fileTransferTracker,
remoteTranslogTransferTracker,
pathStrategy
pathStrategy,
remoteStoreSettings
);
// clean up all remote translog files
translogTransferManager.deleteTranslogFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -60,9 +61,7 @@ public class TranslogTransferManager {
private final BlobPath remoteMetadataTransferPath;
private final FileTransferTracker fileTransferTracker;
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;

private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000;

private final RemoteStoreSettings remoteStoreSettings;
private static final int METADATA_FILES_TO_FETCH = 10;

private final Logger logger;
Expand All @@ -79,7 +78,8 @@ public TranslogTransferManager(
BlobPath remoteDataTransferPath,
BlobPath remoteMetadataTransferPath,
FileTransferTracker fileTransferTracker,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings
) {
this.shardId = shardId;
this.transferService = transferService;
Expand All @@ -88,6 +88,7 @@ public TranslogTransferManager(
this.fileTransferTracker = fileTransferTracker;
this.logger = Loggers.getLogger(getClass(), shardId);
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
this.remoteStoreSettings = remoteStoreSettings;
}

public RemoteTranslogTransferTracker getRemoteTranslogTransferTracker() {
Expand Down Expand Up @@ -151,7 +152,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
transferService.uploadBlobs(toUpload, blobPathMap, latchedActionListener, WritePriority.HIGH);

try {
if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) {
if (latch.await(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
Exception ex = new TranslogUploadFailedException(
"Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@ protected void closeInternal() {
repositoriesServiceSupplier,
threadPool,
remoteStoreStatsTrackerFactory,
settings
settings,
remoteStoreSettings
);
this.searchRequestStats = searchRequestStats;
this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings());
Expand Down Expand Up @@ -528,22 +529,25 @@ private static BiFunction<IndexSettings, ShardRouting, TranslogFactory> getTrans
Supplier<RepositoriesService> repositoriesServiceSupplier,
ThreadPool threadPool,
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
Settings settings
Settings settings,
RemoteStoreSettings remoteStoreSettings
) {
return (indexSettings, shardRouting) -> {
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
return new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
indexSettings.getRemoteStoreTranslogRepository(),
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId())
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()),
remoteStoreSettings
);
} else if (isRemoteDataAttributePresent(settings) && shardRouting.primary()) {
return new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo(indexSettings.getNodeSettings()),
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId())
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()),
remoteStoreSettings
);
}
return new InternalTranslogFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,20 @@ public class RemoteStoreSettings {
Property.Dynamic
);

/**
* Controls timeout value while uploading translog and checkpoint files to remote translog
*/
public static final Setting<TimeValue> CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.translog.transfer_timeout",
TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30),
Property.NodeScope,
Property.Dynamic
);

private volatile TimeValue clusterRemoteTranslogBufferInterval;
private volatile int minRemoteSegmentMetadataFiles;
private volatile TimeValue clusterRemoteTranslogTransferTimeout;

public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
Expand All @@ -69,9 +81,14 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
this::setMinRemoteSegmentMetadataFiles
);

this.clusterRemoteTranslogTransferTimeout = CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
this::setClusterRemoteTranslogTransferTimeout
);
}

// Exclusively for testing, please do not use it elsewhere.
public TimeValue getClusterRemoteTranslogBufferInterval() {
return clusterRemoteTranslogBufferInterval;
}
Expand All @@ -87,4 +104,12 @@ private void setMinRemoteSegmentMetadataFiles(int minRemoteSegmentMetadataFiles)
public int getMinRemoteSegmentMetadataFiles() {
return this.minRemoteSegmentMetadataFiles;
}

public TimeValue getClusterRemoteTranslogTransferTimeout() {
return clusterRemoteTranslogTransferTimeout;
}

private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) {
this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ private IndexService newIndexService(IndexModule module) throws IOException {
repositoriesServiceReference::get,
threadPool,
indexSettings.getRemoteStoreTranslogRepository(),
new RemoteTranslogTransferTracker(shardRouting.shardId(), 10)
new RemoteTranslogTransferTracker(shardRouting.shardId(), 10),
DefaultRemoteStoreSettings.INSTANCE
);
}
return new InternalTranslogFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.TranslogUploadFailedException;
import org.opensearch.indices.DefaultRemoteStoreSettings;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand Down Expand Up @@ -188,7 +189,8 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin
repository,
threadPool,
primaryMode::get,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
);
}

Expand Down Expand Up @@ -459,7 +461,8 @@ public void testExtraGenToKeep() throws Exception {
repository,
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
) {
@Override
ChannelFactory getChannelFactory() {
Expand Down Expand Up @@ -1508,7 +1511,8 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException {
repository,
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
) {
@Override
ChannelFactory getChannelFactory() {
Expand Down Expand Up @@ -1616,7 +1620,8 @@ public void force(boolean metaData) throws IOException {
repository,
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
) {
@Override
ChannelFactory getChannelFactory() {
Expand Down
Loading