Skip to content

Commit e0c5bf3

Browse files
committed
Add restore level safeguards to prevent file cache oversubscription (opensearch-project#8606)
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com> (cherry picked from commit a3aab67)
1 parent e4a7662 commit e0c5bf3

13 files changed

Lines changed: 300 additions & 19 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
88
- Add server version as REST response header [#6583](https://github.com/opensearch-project/OpenSearch/issues/6583)
99
- Start replication checkpointTimers on primary before segments upload to remote store. ([#8221]()https://github.com/opensearch-project/OpenSearch/pull/8221)
1010
- Introduce new static cluster setting to control slice computation for concurrent segment search. ([#8847](https://github.com/opensearch-project/OpenSearch/pull/8884))
11+
- Add configuration for file cache size to max remote data ratio to prevent oversubscription of file cache ([#8606](https://github.com/opensearch-project/OpenSearch/pull/8606))
1112

1213
### Dependencies
1314
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ public String snapshotUuid() {
532532
/**
533533
* Sets the storage type for this request.
534534
*/
535-
RestoreSnapshotRequest storageType(StorageType storageType) {
535+
public RestoreSnapshotRequest storageType(StorageType storageType) {
536536
this.storageType = storageType;
537537
return this;
538538
}

server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,16 @@ public ShardsIterator allShardsIncludingRelocationTargets(String[] indices) {
295295
return allShardsSatisfyingPredicate(indices, shardRouting -> true, true);
296296
}
297297

298+
/**
299+
* All the shards on the node which match the predicate
300+
* @param predicate condition to match
301+
* @return iterator over shards matching the predicate
302+
*/
303+
public ShardsIterator allShardsSatisfyingPredicate(Predicate<ShardRouting> predicate) {
304+
String[] indices = indicesRouting.keySet().toArray(new String[0]);
305+
return allShardsSatisfyingPredicate(indices, predicate, false);
306+
}
307+
298308
private ShardsIterator allShardsSatisfyingPredicate(
299309
String[] indices,
300310
Predicate<ShardRouting> predicate,

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
import static org.opensearch.cluster.routing.RoutingPool.getShardPool;
6969
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
7070
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
71-
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO;
71+
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
7272

7373
/**
7474
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
@@ -199,8 +199,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
199199
final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
200200
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
201201
final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;
202-
203-
if (totalNodeRemoteShardSize > DATA_TO_FILE_CACHE_SIZE_RATIO * nodeCacheSize) {
202+
final double dataToFileCacheSizeRatio = DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(allocation.metadata().settings());
203+
if (dataToFileCacheSizeRatio > 0.0f && totalNodeRemoteShardSize > dataToFileCacheSizeRatio * nodeCacheSize) {
204204
return allocation.decision(
205205
Decision.NO,
206206
NAME,

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.opensearch.index.ShardIndexingPressureSettings;
4747
import org.opensearch.index.ShardIndexingPressureStore;
4848
import org.opensearch.search.SearchBootstrapSettings;
49+
import org.opensearch.index.store.remote.filecache.FileCache;
4950
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
5051
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
5152
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
@@ -646,6 +647,7 @@ public void apply(Settings value, Settings current, Settings previous) {
646647

647648
// Settings related to Searchable Snapshots
648649
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
650+
FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING,
649651

650652
// Settings related to Remote Refresh Segment Pressure
651653
RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED,

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.lucene.store.IndexInput;
1212
import org.opensearch.common.breaker.CircuitBreaker;
1313
import org.opensearch.common.breaker.CircuitBreakingException;
14+
import org.opensearch.common.settings.Setting;
1415
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
1516
import org.opensearch.index.store.remote.utils.cache.RefCountedCache;
1617
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
@@ -49,8 +50,20 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {
4950

5051
private final CircuitBreaker circuitBreaker;
5152

52-
// TODO: Convert the constant into an integer setting
53-
public static final int DATA_TO_FILE_CACHE_SIZE_RATIO = 5;
53+
/**
54+
* Defines a limit of how much total remote data can be referenced as a ratio of the size of the disk reserved for
55+
* the file cache. For example, if 100GB disk space is configured for use as a file cache and the
56+
* remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots.
57+
* This is designed to be a safeguard to prevent oversubscribing a cluster.
58+
* Specify a value of zero for no limit, which is the default for compatibility reasons.
59+
*/
60+
public static final Setting<Double> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting(
61+
"cluster.filecache.remote_data_ratio",
62+
0.0,
63+
0.0,
64+
Setting.Property.NodeScope,
65+
Setting.Property.Dynamic
66+
);
5467

5568
public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
5669
this.theCache = cache;

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -945,8 +945,9 @@ protected Node(
945945
clusterModule.getAllocationService(),
946946
metadataCreateIndexService,
947947
metadataIndexUpgradeService,
948-
clusterService.getClusterSettings(),
949-
shardLimitValidator
948+
shardLimitValidator,
949+
indicesService,
950+
clusterInfoService::getClusterInfo
950951
);
951952

952953
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(

server/src/main/java/org/opensearch/snapshots/RestoreService.java

Lines changed: 73 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
4343
import org.opensearch.action.support.IndicesOptions;
4444
import org.opensearch.cluster.ClusterChangedEvent;
45+
import org.opensearch.cluster.ClusterInfo;
4546
import org.opensearch.cluster.ClusterState;
4647
import org.opensearch.cluster.ClusterStateApplier;
4748
import org.opensearch.cluster.ClusterStateTaskConfig;
@@ -70,6 +71,7 @@
7071
import org.opensearch.cluster.routing.RoutingChangesObserver;
7172
import org.opensearch.cluster.routing.RoutingTable;
7273
import org.opensearch.cluster.routing.ShardRouting;
74+
import org.opensearch.cluster.routing.ShardsIterator;
7375
import org.opensearch.cluster.routing.UnassignedInfo;
7476
import org.opensearch.cluster.routing.allocation.AllocationService;
7577
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
@@ -88,6 +90,9 @@
8890
import org.opensearch.index.IndexSettings;
8991
import org.opensearch.index.shard.IndexShard;
9092
import org.opensearch.core.index.shard.ShardId;
93+
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
94+
import org.opensearch.index.store.remote.filecache.FileCacheStats;
95+
import org.opensearch.indices.IndicesService;
9196
import org.opensearch.indices.ShardLimitValidator;
9297
import org.opensearch.repositories.IndexId;
9398
import org.opensearch.repositories.RepositoriesService;
@@ -105,6 +110,7 @@
105110
import java.util.Set;
106111
import java.util.function.Function;
107112
import java.util.function.Predicate;
113+
import java.util.function.Supplier;
108114
import java.util.stream.Collectors;
109115

110116
import static java.util.Collections.unmodifiableSet;
@@ -120,6 +126,8 @@
120126
import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
121127
import static org.opensearch.common.util.set.Sets.newHashSet;
122128
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectory.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;
129+
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
130+
import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING;
123131
import static org.opensearch.snapshots.SnapshotUtils.filterIndices;
124132

125133
/**
@@ -178,6 +186,10 @@ public class RestoreService implements ClusterStateApplier {
178186

179187
private final ClusterSettings clusterSettings;
180188

189+
private final IndicesService indicesService;
190+
191+
private final Supplier<ClusterInfo> clusterInfoSupplier;
192+
181193
private final ClusterManagerTaskThrottler.ThrottlingKey restoreSnapshotTaskKey;
182194

183195
private static final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor();
@@ -188,8 +200,9 @@ public RestoreService(
188200
AllocationService allocationService,
189201
MetadataCreateIndexService createIndexService,
190202
MetadataIndexUpgradeService metadataIndexUpgradeService,
191-
ClusterSettings clusterSettings,
192-
ShardLimitValidator shardLimitValidator
203+
ShardLimitValidator shardLimitValidator,
204+
IndicesService indicesService,
205+
Supplier<ClusterInfo> clusterInfoSupplier
193206
) {
194207
this.clusterService = clusterService;
195208
this.repositoriesService = repositoriesService;
@@ -201,6 +214,8 @@ public RestoreService(
201214
}
202215
this.clusterSettings = clusterService.getClusterSettings();
203216
this.shardLimitValidator = shardLimitValidator;
217+
this.indicesService = indicesService;
218+
this.clusterInfoSupplier = clusterInfoSupplier;
204219

205220
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
206221
restoreSnapshotTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.RESTORE_SNAPSHOT_KEY, true);
@@ -447,7 +462,9 @@ public ClusterState execute(ClusterState currentState) {
447462
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
448463
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
449464
final Map<ShardId, RestoreInProgress.ShardRestoreStatus> shards;
465+
final boolean isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(request.storageType().toString());
450466
Set<String> aliases = new HashSet<>();
467+
long totalRestorableRemoteIndexesSize = 0;
451468

452469
if (indices.isEmpty() == false) {
453470
// We have some indices to restore
@@ -458,17 +475,14 @@ public ClusterState execute(ClusterState currentState) {
458475
String index = indexEntry.getValue();
459476
boolean partial = checkPartial(index);
460477

478+
IndexId snapshotIndexId = repositoryData.resolveIndexId(index);
461479
IndexMetadata snapshotIndexMetadata = updateIndexSettings(
462480
metadata.index(index),
463481
request.indexSettings(),
464482
request.ignoreIndexSettings()
465483
);
466-
if (IndexModule.Type.REMOTE_SNAPSHOT.match(request.storageType().toString())) {
467-
snapshotIndexMetadata = addSnapshotToIndexSettings(
468-
snapshotIndexMetadata,
469-
snapshot,
470-
repositoryData.resolveIndexId(index)
471-
);
484+
if (isRemoteSnapshot) {
485+
snapshotIndexMetadata = addSnapshotToIndexSettings(snapshotIndexMetadata, snapshot, snapshotIndexId);
472486
}
473487
final boolean isSearchableSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(
474488
snapshotIndexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey())
@@ -494,7 +508,7 @@ public ClusterState execute(ClusterState currentState) {
494508
restoreUUID,
495509
snapshot,
496510
snapshotInfo.version(),
497-
repositoryData.resolveIndexId(index),
511+
snapshotIndexId,
498512
isSearchableSnapshot,
499513
isRemoteStoreShallowCopy,
500514
request.getSourceRemoteStoreRepository()
@@ -618,6 +632,14 @@ public ClusterState execute(ClusterState currentState) {
618632
}
619633

620634
for (int shard = 0; shard < snapshotIndexMetadata.getNumberOfShards(); shard++) {
635+
if (isRemoteSnapshot) {
636+
IndexShardSnapshotStatus.Copy shardStatus = repository.getShardSnapshotStatus(
637+
snapshotInfo.snapshotId(),
638+
snapshotIndexId,
639+
new ShardId(metadata.index(index).getIndex(), shard)
640+
).asCopy();
641+
totalRestorableRemoteIndexesSize += shardStatus.getTotalSize();
642+
}
621643
if (!ignoreShards.contains(shard)) {
622644
shardsBuilder.put(
623645
new ShardId(renamedIndex, shard),
@@ -654,6 +676,9 @@ public ClusterState execute(ClusterState currentState) {
654676
}
655677

656678
checkAliasNameConflicts(indices, aliases);
679+
if (isRemoteSnapshot) {
680+
validateSearchableSnapshotRestorable(totalRestorableRemoteIndexesSize);
681+
}
657682

658683
Map<String, DataStream> updatedDataStreams = new HashMap<>(currentState.metadata().dataStreams());
659684
updatedDataStreams.putAll(
@@ -853,6 +878,45 @@ private IndexMetadata updateIndexSettings(
853878
return builder.settings(settingsBuilder).build();
854879
}
855880

881+
private void validateSearchableSnapshotRestorable(long totalRestorableRemoteIndexesSize) {
882+
ClusterInfo clusterInfo = clusterInfoSupplier.get();
883+
double remoteDataToFileCacheRatio = DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(clusterService.getSettings());
884+
Map<String, FileCacheStats> nodeFileCacheStats = clusterInfo.getNodeFileCacheStats();
885+
if (nodeFileCacheStats.isEmpty() || remoteDataToFileCacheRatio <= 0.01f) {
886+
return;
887+
}
888+
889+
long totalNodeFileCacheSize = clusterInfo.getNodeFileCacheStats()
890+
.values()
891+
.stream()
892+
.map(fileCacheStats -> fileCacheStats.getTotal().getBytes())
893+
.mapToLong(Long::longValue)
894+
.sum();
895+
896+
Predicate<ShardRouting> isRemoteSnapshotShard = shardRouting -> shardRouting.primary()
897+
&& indicesService.indexService(shardRouting.index()).getIndexSettings().isRemoteSnapshot();
898+
899+
ShardsIterator shardsIterator = clusterService.state()
900+
.routingTable()
901+
.allShardsSatisfyingPredicate(isRemoteSnapshotShard);
902+
903+
long totalRestoredRemoteIndexesSize = shardsIterator.getShardRoutings()
904+
.stream()
905+
.map(clusterInfo::getShardSize)
906+
.mapToLong(Long::longValue)
907+
.sum();
908+
909+
if (totalRestoredRemoteIndexesSize + totalRestorableRemoteIndexesSize > remoteDataToFileCacheRatio
910+
* totalNodeFileCacheSize) {
911+
throw new SnapshotRestoreException(
912+
snapshot,
913+
"Size of the indexes to be restored exceeds the file cache bounds. Increase the file cache capacity on the cluster nodes using "
914+
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
915+
+ " setting."
916+
);
917+
}
918+
}
919+
856920
@Override
857921
public void onFailure(String source, Exception e) {
858922
logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", snapshotId), e);

server/src/test/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequestTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ private RestoreSnapshotRequest randomState(RestoreSnapshotRequest instance) {
112112
instance.snapshotUuid(randomBoolean() ? null : randomAlphaOfLength(10));
113113
}
114114

115+
instance.storageType(
116+
randomBoolean() ? RestoreSnapshotRequest.StorageType.LOCAL : RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT
117+
);
118+
115119
if (randomBoolean()) {
116120
instance.setSourceRemoteStoreRepository(randomAlphaOfLengthBetween(5, 10));
117121
}

0 commit comments

Comments
 (0)