Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -493,15 +493,23 @@ protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener
.execute(ActionListener.map(wrappedListener, r -> null));
}

/**
* Handles releasing or blocking read access to indices based on disk threshold status.
*/
private void handleReadBlocks(ClusterState state, Set<String> indicesToBlockRead, ActionListener<Void> listener) {
final Set<String> indicesToReleaseReadBlock = StreamSupport.stream(
Spliterators.spliterator(state.routingTable().indicesRouting().entrySet(), 0),
false
)
.map(Map.Entry::getKey)
.filter(index -> indicesToBlockRead.contains(index) == false)
.filter(index -> state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_BLOCK))
.collect(Collectors.toSet());
final Set<String> indicesToReleaseReadBlock;
if (diskThresholdSettings.isReadBlockAutoReleaseEnabled()) {
indicesToReleaseReadBlock = StreamSupport.stream(
Spliterators.spliterator(state.routingTable().indicesRouting().entrySet(), 0),
false
)
.map(Map.Entry::getKey)
.filter(index -> indicesToBlockRead.contains(index) == false)
.filter(index -> state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_BLOCK))
.collect(Collectors.toSet());
} else {
indicesToReleaseReadBlock = Set.of();
}

if (indicesToReleaseReadBlock.isEmpty() == false) {
updateIndicesReadBlock(indicesToReleaseReadBlock, listener, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ public class DiskThresholdSettings {
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
/**
* Identifies if OpenSearch should automatically release read-only blocks on indices
* when disk usage falls below the threshold.
*/
public static final Setting<Boolean> CLUSTER_READ_BLOCK_AUTO_RELEASE = Setting.boolSetting(
"cluster.blocks.read.auto_release",
true,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private volatile String lowWatermarkRaw;
private volatile String highWatermarkRaw;
Expand All @@ -123,6 +133,7 @@ public class DiskThresholdSettings {
private volatile ByteSizeValue freeBytesThresholdHigh;
private volatile boolean includeRelocations;
private volatile boolean createIndexBlockAutoReleaseEnabled;
private volatile boolean readBlockAutoReleaseEnabled;
private volatile boolean enabled;
private volatile boolean warmThresholdEnabled;
private volatile TimeValue rerouteInterval;
Expand Down Expand Up @@ -153,6 +164,7 @@ public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings)
this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
this.warmThresholdEnabled = CLUSTER_ROUTING_ALLOCATION_WARM_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
this.createIndexBlockAutoReleaseEnabled = CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.get(settings);
this.readBlockAutoReleaseEnabled = CLUSTER_READ_BLOCK_AUTO_RELEASE.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING, this::setFloodStage);
Expand All @@ -164,6 +176,7 @@ public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings)
this::setWarmThresholdEnabled
);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE, this::setCreateIndexBlockAutoReleaseEnabled);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_READ_BLOCK_AUTO_RELEASE, this::setReadBlockAutoReleaseEnabled);
}

/**
Expand Down Expand Up @@ -365,6 +378,10 @@ private void setCreateIndexBlockAutoReleaseEnabled(boolean createIndexBlockAutoR
this.createIndexBlockAutoReleaseEnabled = createIndexBlockAutoReleaseEnabled;
}

private void setReadBlockAutoReleaseEnabled(boolean readBlockAutoReleaseEnabled) {
this.readBlockAutoReleaseEnabled = readBlockAutoReleaseEnabled;
}

/**
* Gets the raw (uninterpreted) low watermark value as found in the settings.
*/
Expand Down Expand Up @@ -423,6 +440,13 @@ public boolean isCreateIndexBlockAutoReleaseEnabled() {
return createIndexBlockAutoReleaseEnabled;
}

/**
* Returns true if auto-release of read-only blocks is enabled.
*/
public boolean isReadBlockAutoReleaseEnabled() {
return readBlockAutoReleaseEnabled;
}

String describeLowThreshold() {
return freeBytesThresholdLow.equals(ByteSizeValue.ZERO)
? Strings.format1Decimals(100.0 - freeDiskThresholdLow, "%")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_WARM_DISK_THRESHOLD_ENABLED_SETTING,
DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE,
DiskThresholdSettings.CLUSTER_READ_BLOCK_AUTO_RELEASE,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
FileCacheThresholdSettings.CLUSTER_FILECACHE_ACTIVEUSAGE_THRESHOLD_ENABLED_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,79 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
assertEquals(countUnblockBlocksCalled.get(), 1);
}

public void testReadBlockAutoReleaseDisabled() {
AtomicReference<Set<String>> indicesToBlockRead = new AtomicReference<>();
AtomicReference<Set<String>> indicesToReleaseReadBlock = new AtomicReference<>();

AllocationService allocation = createAllocationService(
Settings.builder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()
);

IndexMetadata indexMetadata = IndexMetadata.builder("test_read_block")
.settings(settings(Version.CURRENT).put(IndexMetadata.INDEX_BLOCKS_READ_SETTING.getKey(), true))
.numberOfShards(1)
.numberOfReplicas(0)
.build();

Metadata metadata = Metadata.builder().put(indexMetadata, false).build();
RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test_read_block")).build();

DiscoveryNode warmNode = newNode("warm_node", Collections.singleton(DiscoveryNodeRole.WARM_ROLE));

final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(warmNode))
.blocks(ClusterBlocks.builder().addBlocks(indexMetadata).build())
.build(),
allocation
);

assertTrue(clusterState.blocks().indexBlocked(ClusterBlockLevel.READ, "test_read_block"));

Settings settings = Settings.builder().put("cluster.blocks.read.auto_release", false).build();

DiskThresholdMonitor monitor = new DiskThresholdMonitor(
settings,
() -> clusterState,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null,
() -> 0L,
(reason, priority, listener) -> listener.onResponse(clusterState),
() -> 2.0
) {
@Override
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) {
listener.onResponse(null);
}

@Override
protected void updateIndicesReadBlock(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readBlock) {
if (readBlock) {
indicesToBlockRead.set(indicesToUpdate);
} else {
indicesToReleaseReadBlock.set(indicesToUpdate);
}
listener.onResponse(null);
}

@Override
protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
listener.onResponse(null);
}
};

// Node is healthy (free space 40 > 30 threshold)
Map<String, DiskUsage> builder = new HashMap<>();
builder.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 40));

monitor.onNewInfo(clusterInfo(builder));

// Since auto-release is disabled, indicesToReleaseReadBlock should remain null even though node is healthy
assertNull(indicesToReleaseReadBlock.get());
}

public void testWarmNodeLowStageWatermarkBreach() {
AllocationService allocation = createAllocationService(
Settings.builder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()
Expand Down
Loading