diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index 912c48d7bbdfe..57ded47fd64c6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -690,28 +690,18 @@ private List startTestNodes(int nodeCount, Settings additionalSettings) * Helper method to simulate disk pressure for both hot and warm indices */ private void simulateDiskPressure(MockInternalClusterInfoService clusterInfoService) { - boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings); - if (isWarmIndex) { - clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> TOTAL_SPACE_BYTES - WATERMARK_BYTES + 10); - } else { - clusterInfoService.setDiskUsageFunctionAndRefresh( - (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1) - ); - } + clusterInfoService.setDiskUsageFunctionAndRefresh( + (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1) + ); } /** * Helper method to release disk pressure for both hot and warm indices */ private void releaseDiskPressure(MockInternalClusterInfoService clusterInfoService) { - boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings); - if (isWarmIndex) { - clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 100L); - } else { - clusterInfoService.setDiskUsageFunctionAndRefresh( - (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES) - ); - } + clusterInfoService.setDiskUsageFunctionAndRefresh( + (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES) + ); } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java index 0b6b96f1f8c0a..fddba66833243 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -53,7 +53,6 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; -import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats; import org.opensearch.transport.client.Client; import java.util.ArrayList; @@ -70,7 +69,6 @@ import java.util.stream.StreamSupport; import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE; -import static org.opensearch.cluster.routing.RoutingPool.getIndexPool; import static org.opensearch.cluster.routing.RoutingPool.getNodePool; /** @@ -86,7 +84,6 @@ public class DiskThresholdMonitor { private final DiskThresholdSettings diskThresholdSettings; private final Client client; private final Supplier clusterStateSupplier; - private final Supplier dataToFileCacheSizeRatioSupplier; private final LongSupplier currentTimeMillisSupplier; private final RerouteService rerouteService; private final NodeDiskEvaluator nodeDiskEvaluator; @@ -126,7 +123,6 @@ public DiskThresholdMonitor( this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); this.client = client; this.nodeDiskEvaluator = new NodeDiskEvaluator(diskThresholdSettings, dataToFileCacheSizeRatioSupplier); - this.dataToFileCacheSizeRatioSupplier = dataToFileCacheSizeRatioSupplier; } private void checkFinished() { @@ -181,10 +177,6 @@ public void onNewInfo(ClusterInfo info) { // Only for Dedicated Warm Nodes final boolean isWarmNode = REMOTE_CAPABLE.equals(getNodePool(routingNode)); nodeDiskEvaluator.setNodeType(isWarmNode); - if (isWarmNode) { - // Create DiskUsage for Warm Nodes based on total Addressable Space - usage = getWarmDiskUsage(usage, info, routingNode, state); - } if (nodeDiskEvaluator.isNodeExceedingFloodStageWatermark(usage)) { @@ -431,29 +423,6 @@ long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, Cluste ); } - private DiskUsage getWarmDiskUsage(DiskUsage diskUsage, ClusterInfo info, RoutingNode node, ClusterState state) { - double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); - AggregateFileCacheStats fileCacheStats = info.getNodeFileCacheStats().getOrDefault(diskUsage.getNodeId(), null); - final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0; - long totalAddressableSpace = (long) dataToFileCacheSizeRatio * nodeCacheSize; - final List remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false) - .filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getIndexPool(state.metadata().getIndexSafe(shard.index())))) - .collect(Collectors.toList()); - - long remoteShardSize = 0L; - for (ShardRouting shard : remoteShardsOnNode) { - remoteShardSize += DiskThresholdDecider.getExpectedShardSize(shard, 0L, info, null, state.metadata(), state.getRoutingTable()); - } - final DiskUsage warmDiskUsage = new DiskUsage( - diskUsage.getNodeId(), - diskUsage.getNodeName(), - diskUsage.getPath(), - totalAddressableSpace, - Math.max(0, totalAddressableSpace - remoteShardSize) - ); - return warmDiskUsage; - } - private void markNodesMissingUsageIneligibleForRelease( RoutingNodes routingNodes, Map usages, diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java index 99dea397e805c..650a227a5e817 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.Version; import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.DiskUsage; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.DiskThresholdEvaluator; @@ -49,6 +50,7 @@ import org.opensearch.index.store.remote.filecache.FileCacheSettings; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -111,11 +113,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return Decision.ALWAYS; } - final Decision decision = earlyTerminate(node, allocation); + ClusterInfo clusterInfo = allocation.clusterInfo(); + Map usages = clusterInfo.getNodeMostAvailableDiskUsages(); + final Decision decision = earlyTerminate(node, allocation, usages); if (decision != null) { return decision; } + DiskUsage usage = usages.get(node.nodeId()); final long shardSize = DiskThresholdDecider.getExpectedShardSize( shardRouting, 0L, @@ -125,18 +130,21 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing allocation.routingTable() ); - final long totalAddressableSpace = calculateTotalAddressableSpace(node, allocation); - final long currentNodeRemoteShardSize = calculateCurrentNodeRemoteShardSize(node, allocation, false); - final long freeSpace = Math.max(totalAddressableSpace - currentNodeRemoteShardSize, 0); - final long freeSpaceAfterAllocation = Math.max(freeSpace - shardSize, 0); - final long freeSpaceLowThreshold = diskThresholdEvaluator.getFreeSpaceLowThreshold(totalAddressableSpace); + final DiskUsage usageAfterShardAssigned = new DiskUsage( + usage.getNodeId(), + usage.getNodeName(), + usage.getPath(), + usage.getTotalBytes(), + Math.max(0, usage.getFreeBytes() - shardSize) + ); + final long freeSpaceLowThreshold = diskThresholdEvaluator.getFreeSpaceLowThreshold(usage.getTotalBytes()); final ByteSizeValue freeSpaceLowThresholdInByteSize = new ByteSizeValue(freeSpaceLowThreshold); - final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(freeSpace); - final ByteSizeValue freeSpaceAfterAllocationInByteSize = new ByteSizeValue(freeSpaceAfterAllocation); + final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(usage.getFreeBytes()); + final ByteSizeValue freeSpaceAfterAllocationInByteSize = new ByteSizeValue(usageAfterShardAssigned.getFreeBytes()); final ByteSizeValue shardSizeInByteSize = new ByteSizeValue(shardSize); - if (freeSpaceAfterAllocation < freeSpaceLowThreshold) { + if (diskThresholdEvaluator.isNodeExceedingLowWatermark(usageAfterShardAssigned)) { logger.warn( "after allocating [{}] node [{}] would have less than the required threshold of " + "{} free (currently {} free, estimated shard size is {}), preventing allocation", @@ -180,21 +188,29 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl return Decision.ALWAYS; } - final Decision decision = earlyTerminate(node, allocation); + ClusterInfo clusterInfo = allocation.clusterInfo(); + Map usages = clusterInfo.getNodeMostAvailableDiskUsages(); + final Decision decision = earlyTerminate(node, allocation, usages); if (decision != null) { return decision; } - final long totalAddressableSpace = calculateTotalAddressableSpace(node, allocation); - final long currentNodeRemoteShardSize = calculateCurrentNodeRemoteShardSize(node, allocation, true); - final long freeSpace = Math.max(totalAddressableSpace - currentNodeRemoteShardSize, 0); + final long leavingRemoteShardSize = calculateCurrentNodeLeavingRemoteShardSize(node, allocation); + final DiskUsage usage = usages.get(node.nodeId()); + final DiskUsage usageAfterSubtractingLeavingShard = new DiskUsage( + usage.getNodeId(), + usage.getNodeName(), + usage.getPath(), + usage.getTotalBytes(), + Math.min(usage.getFreeBytes() + leavingRemoteShardSize, usage.getTotalBytes()) + ); - final long freeSpaceHighThreshold = diskThresholdEvaluator.getFreeSpaceHighThreshold(totalAddressableSpace); + final long freeSpaceHighThreshold = diskThresholdEvaluator.getFreeSpaceHighThreshold(usage.getTotalBytes()); final ByteSizeValue freeSpaceHighThresholdInByteSize = new ByteSizeValue(freeSpaceHighThreshold); - final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(freeSpace); + final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(usageAfterSubtractingLeavingShard.getFreeBytes()); - if (freeSpace < freeSpaceHighThreshold) { + if (diskThresholdEvaluator.isNodeExceedingHighWatermark(usageAfterSubtractingLeavingShard)) { logger.warn( "less than the required {} of free remote addressable space threshold left ({} free) on node [{}], shard cannot remain", freeSpaceHighThresholdInByteSize, @@ -220,18 +236,14 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl ); } - private long calculateCurrentNodeRemoteShardSize(RoutingNode node, RoutingAllocation allocation, boolean subtractLeavingShards) { - final List remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false) - .filter( - shard -> shard.primary() - && REMOTE_CAPABLE.equals(getShardPool(shard, allocation)) - && (subtractLeavingShards == false || shard.relocating() == false) - ) + private long calculateCurrentNodeLeavingRemoteShardSize(RoutingNode node, RoutingAllocation allocation) { + final List leavingRemoteShardsOnNode = StreamSupport.stream(node.spliterator(), false) + .filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getShardPool(shard, allocation)) && (shard.relocating() == true)) .collect(Collectors.toList()); - var remoteShardSize = 0L; - for (ShardRouting shard : remoteShardsOnNode) { - remoteShardSize += DiskThresholdDecider.getExpectedShardSize( + var leavingRemoteShardSize = 0L; + for (ShardRouting shard : leavingRemoteShardsOnNode) { + leavingRemoteShardSize += DiskThresholdDecider.getExpectedShardSize( shard, 0L, allocation.clusterInfo(), @@ -241,19 +253,10 @@ private long calculateCurrentNodeRemoteShardSize(RoutingNode node, RoutingAlloca ); } - return remoteShardSize; - } - - private long calculateTotalAddressableSpace(RoutingNode node, RoutingAllocation allocation) { - ClusterInfo clusterInfo = allocation.clusterInfo(); - // TODO: Change the default value to 5 instead of 0 - final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio(); - final AggregateFileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null); - final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0; - return (long) dataToFileCacheSizeRatio * nodeCacheSize; + return leavingRemoteShardSize; } - private Decision earlyTerminate(RoutingNode node, RoutingAllocation allocation) { + private Decision earlyTerminate(RoutingNode node, RoutingAllocation allocation, final Map usages) { // Always allow allocation if the decider is disabled if (diskThresholdSettings.isWarmThresholdEnabled() == false) { return allocation.decision(Decision.YES, NAME, "the warm disk threshold decider is disabled"); @@ -285,9 +288,12 @@ private Decision earlyTerminate(RoutingNode node, RoutingAllocation allocation) return allocation.decision(Decision.YES, NAME, "File Cache Stat is unavailable"); } - double remoteDataRatio = fileCacheSettings.getRemoteDataRatio(); - if (remoteDataRatio == 0) { - return allocation.decision(Decision.YES, NAME, "Remote data ratio is set to 0, no limit on allocation"); + // Fail open if there are no disk usages available + if (usages.isEmpty() || usages.containsKey(node.nodeId()) == false) { + if (logger.isTraceEnabled()) { + logger.trace("unable to determine disk usages for disk-aware allocation, allowing allocation"); + } + return allocation.decision(Decision.YES, NAME, "disk usages are unavailable"); } return null; diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index f50a9c53ced86..652611d98859f 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -901,7 +901,7 @@ protected void setIndexCreateBlock(ActionListener listener, boolean indexC // High stage threshold (50%) = 200 * 0.15 = 30 // Free space = 28 < 30, so should exceed low stage Map diskUsages = new HashMap<>(); - diskUsages.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 8)); + diskUsages.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 28)); Map fileCacheStats = new HashMap<>(); fileCacheStats.put("warm_node", createAggregateFileCacheStats(100)); @@ -982,7 +982,7 @@ protected void setIndexCreateBlock(ActionListener listener, boolean indexC // High stage threshold (10%) = 200 * 0.1 = 20 // Free space = 18 < 20, so should exceed high stage Map diskUsages = new HashMap<>(); - diskUsages.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 8)); + diskUsages.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 18)); Map fileCacheStats = new HashMap<>(); fileCacheStats.put("warm_node", createAggregateFileCacheStats(100)); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDeciderTests.java index c4a0b43b40b3f..8fdea89865aa7 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDeciderTests.java @@ -94,30 +94,12 @@ public void testCanAllocateSufficientFreeSpace() { shardSizes.put("[test2][0][p]", 1000L); // 1000 bytes shardSizes.put("[test2][0][r]", 1000L); - Map fileCacheStatsMap = new HashMap<>(); - fileCacheStatsMap.put( - "node1", - new AggregateFileCacheStats( - randomNonNegativeInt(), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.OVER_ALL_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.FULL_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.BLOCK_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.PINNED_FILE_STATS) - ) - ); - fileCacheStatsMap.put( - "node2", - new AggregateFileCacheStats( - randomNonNegativeInt(), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.OVER_ALL_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.FULL_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.BLOCK_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.PINNED_FILE_STATS) - - ) - ); + Map fileCacheStatsMap = createFileCacheStatsMap(1000L, "node1", "node2"); final Map usages = new HashMap<>(); + // With file cache of 1000 bytes and ratio of 5.0, total addressable space = 1000 * 5 = 5000 bytes + usages.put("node1", createDiskUsage("node1", 5000, 5000)); + usages.put("node2", createDiskUsage("node2", 5000, 5000)); final ClusterInfo clusterInfo = new DiskThresholdDeciderTests.DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap); ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); @@ -180,31 +162,14 @@ public void testCanAllocateInSufficientFreeSpace() { shardSizes.put("[test2][0][p]", 1000L); // 1000 bytes shardSizes.put("[test2][0][r]", 1000L); - Map fileCacheStatsMap = new HashMap<>(); - fileCacheStatsMap.put( - "node1", - new AggregateFileCacheStats( - randomNonNegativeInt(), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.OVER_ALL_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.FULL_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.BLOCK_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.PINNED_FILE_STATS) - - ) - ); - fileCacheStatsMap.put( - "node2", - new AggregateFileCacheStats( - randomNonNegativeInt(), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.OVER_ALL_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.FULL_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.BLOCK_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.PINNED_FILE_STATS) - - ) - ); + Map fileCacheStatsMap = createFileCacheStatsMap(1000L, "node1", "node2"); final Map usages = new HashMap<>(); + // With file cache of 1000 bytes and ratio of 5.0, total addressable space = 1000 * 5 = 5000 bytes + // For 70% low watermark, we need at least 30% free = 1500 bytes free + // Test shard is 5500 bytes, which exceeds the total addressable space + usages.put("node1", createDiskUsage("node1", 5000, 5000)); + usages.put("node2", createDiskUsage("node2", 5000, 5000)); final ClusterInfo clusterInfo = new DiskThresholdDeciderTests.DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap); ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); @@ -273,31 +238,12 @@ public void testCanRemainSufficientSpace() { shardSizes.put("[test2][0][p]", 1000L); // 1000 bytes shardSizes.put("[test2][0][r]", 1000L); - Map fileCacheStatsMap = new HashMap<>(); - fileCacheStatsMap.put( - "node1", - new AggregateFileCacheStats( - randomNonNegativeInt(), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.OVER_ALL_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.FULL_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.BLOCK_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.PINNED_FILE_STATS) - - ) - ); - fileCacheStatsMap.put( - "node2", - new AggregateFileCacheStats( - randomNonNegativeInt(), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.OVER_ALL_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.FULL_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.BLOCK_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.PINNED_FILE_STATS) - - ) - ); + Map fileCacheStatsMap = createFileCacheStatsMap(1000L, "node1", "node2"); final Map usages = new HashMap<>(); + // With file cache of 1000 bytes and ratio of 5.0, total addressable space = 1000 * 5 = 5000 bytes + usages.put("node1", createDiskUsage("node1", 5000, 3000)); + usages.put("node2", createDiskUsage("node2", 5000, 4000)); final ClusterInfo clusterInfo = new DiskThresholdDeciderTests.DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap); Metadata metadata = Metadata.builder() @@ -359,31 +305,13 @@ public void testCanRemainInsufficientSpace() { shardSizes.put("[test2][0][p]", 1000L); // 1000 bytes shardSizes.put("[test2][0][r]", 1000L); - Map fileCacheStatsMap = new HashMap<>(); - fileCacheStatsMap.put( - "node1", - new AggregateFileCacheStats( - randomNonNegativeInt(), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.OVER_ALL_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.FULL_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.BLOCK_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.PINNED_FILE_STATS) - - ) - ); - fileCacheStatsMap.put( - "node2", - new AggregateFileCacheStats( - randomNonNegativeInt(), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.OVER_ALL_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.FULL_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.BLOCK_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.PINNED_FILE_STATS) - - ) - ); + Map fileCacheStatsMap = createFileCacheStatsMap(1000L, "node1", "node2"); final Map usages = new HashMap<>(); + // With file cache of 1000 bytes and ratio of 5.0, total addressable space = 1000 * 5 = 5000 bytes + // For high watermark of 250b, we set free space < 250b to trigger NO decision + usages.put("node1", createDiskUsage("node1", 5000, 0)); + usages.put("node2", createDiskUsage("node2", 5000, 4000)); final ClusterInfo clusterInfo = new DiskThresholdDeciderTests.DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap); Metadata metadata = Metadata.builder() @@ -439,36 +367,20 @@ public void testCanRemainSufficientSpaceAfterRelocation() { WarmDiskThresholdDecider decider = new WarmDiskThresholdDecider(settings, clusterSettings); final Map shardSizes = new HashMap<>(); - shardSizes.put("[test][0][p]", 3000L); // 4000 bytes + shardSizes.put("[test][0][p]", 3000L); // 3000 bytes shardSizes.put("[test][0][r]", 3000L); shardSizes.put("[test2][0][p]", 1000L); // 1000 bytes shardSizes.put("[test2][0][r]", 1000L); shardSizes.put("[test3][0][p]", 1500L); - Map fileCacheStatsMap = new HashMap<>(); - fileCacheStatsMap.put( - "node1", - new AggregateFileCacheStats( - randomNonNegativeInt(), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.OVER_ALL_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.FULL_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.BLOCK_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.PINNED_FILE_STATS) - ) - ); - fileCacheStatsMap.put( - "node2", - new AggregateFileCacheStats( - randomNonNegativeInt(), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.OVER_ALL_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.FULL_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.BLOCK_FILE_STATS), - new FileCacheStats(0, 1000, 0, 0, 0, 0, 0, FileCacheStatsType.PINNED_FILE_STATS) - - ) - ); + Map fileCacheStatsMap = createFileCacheStatsMap(1000L, "node1", "node2"); final Map usages = new HashMap<>(); + // With file cache of 1000 bytes and ratio of 5.0, total addressable space = 1000 * 5 = 5000 bytes + // We need enough free space to handle relocation - with [test3][0][p] relocating away, + // node1 will gain 1500b of free space + usages.put("node1", createDiskUsage("node1", 5000, 500)); + usages.put("node2", createDiskUsage("node2", 5000, 4000)); final ClusterInfo clusterInfo = new DiskThresholdDeciderTests.DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap); Metadata metadata = Metadata.builder() @@ -538,4 +450,35 @@ public void logShardStates(ClusterState state) { rn.shardsWithState(ShardRoutingState.STARTED) ); } + + /** + * Creates a standard FileCacheStats map for testing with warm nodes. + * @param fileCacheSize the size of the file cache + * @param nodes the node IDs to create stats for + * @return a map of node ID to AggregateFileCacheStats + */ + private Map createFileCacheStatsMap(long fileCacheSize, String... nodes) { + Map fileCacheStatsMap = new HashMap<>(); + for (String node : nodes) { + fileCacheStatsMap.put( + node, + new AggregateFileCacheStats( + randomNonNegativeInt(), + new FileCacheStats(0, fileCacheSize, 0, 0, 0, 0, 0, FileCacheStatsType.OVER_ALL_STATS), + new FileCacheStats(0, fileCacheSize, 0, 0, 0, 0, 0, FileCacheStatsType.FULL_FILE_STATS), + new FileCacheStats(0, fileCacheSize, 0, 0, 0, 0, 0, FileCacheStatsType.BLOCK_FILE_STATS), + new FileCacheStats(0, fileCacheSize, 0, 0, 0, 0, 0, FileCacheStatsType.PINNED_FILE_STATS) + ) + ); + } + return fileCacheStatsMap; + } + + /** + * Creates a DiskUsage with the specified free bytes and total bytes. + */ + private DiskUsage createDiskUsage(String nodeId, long totalBytes, long freeBytes) { + return new DiskUsage(nodeId, nodeId, "/dev/null", totalBytes, freeBytes); + } + }