From eed5c4644b06142fb99d3fa160ab28d66c8924be Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Mon, 29 Jul 2024 10:22:34 +0530 Subject: [PATCH 1/2] Add setting to ignore throttling nodes for allocation of unassgined primaries in restore in order to speed up remote restore Signed-off-by: Gaurav Bafna --- .../allocator/BalancedShardsAllocator.java | 23 ++- .../allocator/LocalShardsBalancer.java | 17 +- .../common/settings/ClusterSettings.java | 1 + .../allocation/BalancedSingleShardTests.java | 15 -- .../DecideAllocateUnassignedTests.java | 182 ++++++++++++++++++ .../cluster/OpenSearchAllocationTestCase.java | 15 ++ .../cluster/routing/TestShardRouting.java | 26 +++ 7 files changed, 259 insertions(+), 20 deletions(-) create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/DecideAllocateUnassignedTests.java diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index b2443490dd973..a4f3f8258615d 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -154,6 +154,13 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + public static final Setting IGNORE_THROTTLE_FOR_REMOTE_RESTORE = Setting.boolSetting( + "cluster.routing.allocation.ignore_throttle_for_restore", + true, + Property.Dynamic, + Property.NodeScope + ); + public static final Setting PRIMARY_SHARD_REBALANCE_BUFFER = Setting.floatSetting( "cluster.routing.allocation.rebalance.primary.buffer", 0.10f, @@ -173,6 +180,8 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile WeightFunction weightFunction; private volatile float threshold; + private volatile boolean ignoreThrottleInRestore; + public BalancedShardsAllocator(Settings settings) { this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); } @@ -182,6 +191,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting setShardBalanceFactor(SHARD_BALANCE_FACTOR_SETTING.get(settings)); setIndexBalanceFactor(INDEX_BALANCE_FACTOR_SETTING.get(settings)); setPreferPrimaryShardRebalanceBuffer(PRIMARY_SHARD_REBALANCE_BUFFER.get(settings)); + setIgnoreThrottleInRestore(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.get(settings)); updateWeightFunction(); setThreshold(THRESHOLD_SETTING.get(settings)); setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); @@ -195,6 +205,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); + clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore); } /** @@ -205,6 +216,10 @@ private void setMovePrimaryFirst(boolean movePrimaryFirst) { setShardMovementStrategy(this.shardMovementStrategy); } + private void setIgnoreThrottleInRestore(boolean ignoreThrottleInRestore) { + this.ignoreThrottleInRestore = ignoreThrottleInRestore; + } + /** * Sets the correct Shard movement strategy to use. * If users are still using deprecated setting `move_primary_first`, we want behavior to remain unchanged. @@ -282,7 +297,8 @@ public void allocate(RoutingAllocation allocation) { weightFunction, threshold, preferPrimaryShardBalance, - preferPrimaryShardRebalance + preferPrimaryShardRebalance, + ignoreThrottleInRestore ); localShardsBalancer.allocateUnassigned(); localShardsBalancer.moveShards(); @@ -304,7 +320,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f weightFunction, threshold, preferPrimaryShardBalance, - preferPrimaryShardRebalance + preferPrimaryShardRebalance, + ignoreThrottleInRestore ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; @@ -558,7 +575,7 @@ public Balancer( float threshold, boolean preferPrimaryBalance ) { - super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false); + super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 00eb79add9f1d..7e4ae58548c55 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -13,6 +13,7 @@ import org.apache.lucene.util.IntroSorter; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingPool; @@ -60,6 +61,8 @@ public class LocalShardsBalancer extends ShardsBalancer { private final boolean preferPrimaryBalance; private final boolean preferPrimaryRebalance; + + private final boolean ignoreThrottleInRestore; private final BalancedShardsAllocator.WeightFunction weight; private final float threshold; @@ -77,7 +80,8 @@ public LocalShardsBalancer( BalancedShardsAllocator.WeightFunction weight, float threshold, boolean preferPrimaryBalance, - boolean preferPrimaryRebalance + boolean preferPrimaryRebalance, + boolean ignoreThrottleInRestore ) { this.logger = logger; this.allocation = allocation; @@ -94,6 +98,7 @@ public LocalShardsBalancer( this.preferPrimaryBalance = preferPrimaryBalance; this.preferPrimaryRebalance = preferPrimaryRebalance; this.shardMovementStrategy = shardMovementStrategy; + this.ignoreThrottleInRestore = ignoreThrottleInRestore; } /** @@ -918,7 +923,15 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { nodeExplanationMap.put(node.getNodeId(), new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0)); nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight)); } - if (currentDecision.type() == Decision.Type.YES || currentDecision.type() == Decision.Type.THROTTLE) { + + // For REMOTE_STORE recoveries, THROTTLE is as good as NO as we want faster recoveries + // The side effect of this are increased relocations post these allocations. + boolean considerThrottleAsNo = ignoreThrottleInRestore + && shard.recoverySource().getType() == RecoverySource.Type.REMOTE_STORE + && shard.primary(); + + if (currentDecision.type() == Decision.Type.YES + || (currentDecision.type() == Decision.Type.THROTTLE && considerThrottleAsNo == false)) { final boolean updateMinNode; if (currentWeight == minWeight) { /* we have an equal weight tie breaking: diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 2f60c731bc554..a73e5d44b7e02 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -268,6 +268,7 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, + BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, BreakerSettings.CIRCUIT_BREAKER_TYPE, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalancedSingleShardTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalancedSingleShardTests.java index d29249cef0818..11a43019f648e 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalancedSingleShardTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalancedSingleShardTests.java @@ -33,7 +33,6 @@ package org.opensearch.cluster.routing.allocation; import org.opensearch.action.support.replication.ClusterStateCreationUtils; -import org.opensearch.cluster.ClusterInfo; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.node.DiscoveryNode; @@ -50,7 +49,6 @@ import org.opensearch.cluster.routing.allocation.decider.Decision.Type; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; -import org.opensearch.snapshots.SnapshotShardSizeInfo; import java.util.Arrays; import java.util.Collections; @@ -398,19 +396,6 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca return Tuple.tuple(clusterState, rebalanceDecision); } - private RoutingAllocation newRoutingAllocation(AllocationDeciders deciders, ClusterState state) { - RoutingAllocation allocation = new RoutingAllocation( - deciders, - new RoutingNodes(state, false), - state, - ClusterInfo.EMPTY, - SnapshotShardSizeInfo.EMPTY, - System.nanoTime() - ); - allocation.debugDecision(true); - return allocation; - } - private void assertAssignedNodeRemainsSame( BalancedShardsAllocator allocator, RoutingAllocation routingAllocation, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/DecideAllocateUnassignedTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/DecideAllocateUnassignedTests.java new file mode 100644 index 0000000000000..1bb9c00d99bd6 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/DecideAllocateUnassignedTests.java @@ -0,0 +1,182 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.Version; +import org.opensearch.action.support.replication.ClusterStateCreationUtils; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.AllocationId; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.cluster.routing.allocation.decider.AllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; +import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE; + +public class DecideAllocateUnassignedTests extends OpenSearchAllocationTestCase { + public void testSingleShardBalanceProducesSameResultsAsBalanceStep_IgnoreThrottle() { + final String[] indices = { "idx1" }; + // Create a cluster state with 1 indices, each with 1 started primary shard, and only + // one node initially so that all primary shards get allocated to the same node. + // + // When we add 1 more 1 index with 1 started primary shard and 1 more node , if the new node throttles the recovery + // shard should get assigned on the older node if IgnoreThrottle is set to true + ClusterState clusterState = ClusterStateCreationUtils.state(1, indices, 1); + clusterState = addNodesToClusterState(clusterState, 1); + clusterState = addRestoringIndexToClusterState(clusterState, "idx2"); + final Set throttleNodes = new HashSet<>(); + throttleNodes.add("node_1"); + AllocationDecider allocationDecider = new AllocationDecider() { + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (throttleNodes.contains(node.nodeId())) { + return Decision.THROTTLE; + } + return Decision.YES; + } + }; + AllocationDecider rebalanceDecider = new AllocationDecider() { + @Override + public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { + return Decision.YES; + } + }; + List allocationDeciders = Arrays.asList(rebalanceDecider, allocationDecider); + RoutingAllocation routingAllocation = newRoutingAllocation(new AllocationDeciders(allocationDeciders), clusterState); + // allocate and get the node that is now relocating + Settings build = Settings.builder().put(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.getKey(), true).build(); + BalancedShardsAllocator allocator = new BalancedShardsAllocator(build); + allocator.allocate(routingAllocation); + assertEquals(routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), "node_0"); + ; + assertFalse(routingAllocation.routingNodes().hasUnassignedPrimaries()); + } + + public void testSingleShardBalanceProducesSameResultsAsBalanceStep_Current() { + final String[] indices = { "idx1" }; + // Create a cluster state with 1 indices, each with 1 started primary shard, and only + // one node initially so that all primary shards get allocated to the same node. + // + // When we add 1 more 1 index with 1 started primary shard and 1 more node , if the new node throttles the recovery + // shard should remain unassigned if IgnoreThrottle is set to false + ClusterState clusterState = ClusterStateCreationUtils.state(1, indices, 1); + clusterState = addNodesToClusterState(clusterState, 1); + clusterState = addRestoringIndexToClusterState(clusterState, "idx2"); + final Set throttleNodes = new HashSet<>(); + throttleNodes.add("node_1"); + AllocationDecider allocationDecider = new AllocationDecider() { + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (throttleNodes.contains(node.nodeId())) { + return Decision.THROTTLE; + } + return Decision.YES; + } + }; + AllocationDecider rebalanceDecider = new AllocationDecider() { + @Override + public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { + return Decision.YES; + } + }; + List allocationDeciders = Arrays.asList(rebalanceDecider, allocationDecider); + RoutingAllocation routingAllocation = newRoutingAllocation(new AllocationDeciders(allocationDeciders), clusterState); + // allocate and get the node that is now relocating + Settings build = Settings.builder().put(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.getKey(), false).build(); + BalancedShardsAllocator allocator = new BalancedShardsAllocator(build); + allocator.allocate(routingAllocation); + assertEquals(routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), 0); + ; + assertTrue(routingAllocation.routingNodes().hasUnassignedPrimaries()); + } + + private ClusterState addNodesToClusterState(ClusterState clusterState, int nodeId) { + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterState.nodes()); + DiscoveryNode discoveryNode = newNode("node_" + nodeId); + nodesBuilder.add(discoveryNode); + return ClusterState.builder(clusterState).nodes(nodesBuilder).build(); + } + + private ClusterState addRestoringIndexToClusterState(ClusterState clusterState, String index) { + final int primaryTerm = 1 + randomInt(200); + final ShardId shardId = new ShardId(index, "_na_", 0); + + IndexMetadata indexMetadata = IndexMetadata.builder(index) + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .primaryTerm(0, primaryTerm) + .build(); + + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + String primaryNode = null; + String relocatingNode = null; + UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED, null); + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRoutingRemoteRestore( + index, + shardId, + primaryNode, + relocatingNode, + true, + ShardRoutingState.UNASSIGNED, + unassignedInfo + ) + ); + final IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build(); + + IndexMetadata.Builder indexMetadataBuilder = new IndexMetadata.Builder(indexMetadata); + indexMetadataBuilder.putInSyncAllocationIds( + 0, + indexShardRoutingTable.activeShards() + .stream() + .map(ShardRouting::allocationId) + .map(AllocationId::getId) + .collect(Collectors.toSet()) + ); + ClusterState.Builder state = ClusterState.builder(clusterState); + state.metadata(Metadata.builder(clusterState.metadata()).put(indexMetadataBuilder.build(), false).generateClusterUuidIfNeeded()); + state.routingTable( + RoutingTable.builder(clusterState.routingTable()) + .add(IndexRoutingTable.builder(indexMetadata.getIndex()).addIndexShard(indexShardRoutingTable)) + .build() + ); + return state.build(); + } + +} diff --git a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java index f6113860e3907..34b8c58a9c5b2 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java @@ -37,6 +37,7 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.AllocationService; @@ -287,6 +288,19 @@ public static ClusterState startShardsAndReroute( return allocationService.reroute(allocationService.applyStartedShards(clusterState, initializingShards), "reroute after starting"); } + protected RoutingAllocation newRoutingAllocation(AllocationDeciders deciders, ClusterState state) { + RoutingAllocation allocation = new RoutingAllocation( + deciders, + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + SnapshotShardSizeInfo.EMPTY, + System.nanoTime() + ); + allocation.debugDecision(true); + return allocation; + } + public static class TestAllocateDecision extends AllocationDecider { private final Decision decision; @@ -465,5 +479,6 @@ public void allocateUnassigned( unassignedAllocationHandler.removeAndIgnore(UnassignedInfo.AllocationStatus.DELAYED_ALLOCATION, allocation.changes()); } } + } } diff --git a/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java b/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java index f67108345550f..c7c71f0f569e5 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java +++ b/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java @@ -205,6 +205,32 @@ public static ShardRouting newShardRouting( ); } + public static ShardRouting newShardRoutingRemoteRestore( + String index, + ShardId shardId, + String currentNodeId, + String relocatingNodeId, + boolean primary, + ShardRoutingState state, + UnassignedInfo unassignedInfo + ) { + return new ShardRouting( + shardId, + currentNodeId, + relocatingNodeId, + primary, + state, + new RecoverySource.RemoteStoreRecoverySource( + UUIDs.randomBase64UUID(), + Version.V_EMPTY, + new IndexId(shardId.getIndexName(), shardId.getIndexName()) + ), + unassignedInfo, + buildAllocationId(state), + -1 + ); + } + public static ShardRouting newShardRouting( ShardId shardId, String currentNodeId, From c27772f51777f813997c7719ef98ecaa203ff9fc Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Tue, 30 Jul 2024 13:35:52 +0530 Subject: [PATCH 2/2] Add changelog and address PR comments Signed-off-by: Gaurav Bafna --- CHANGELOG.md | 2 + .../allocator/BalancedShardsAllocator.java | 2 +- .../DecideAllocateUnassignedTests.java | 66 ++++++------------- 3 files changed, 22 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5355f010a99f..9689e391c6df3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added - Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972)) - [Workload Management] Add queryGroupId to Task ([14708](https://github.com/opensearch-project/OpenSearch/pull/14708)) +- Add setting to ignore throttling nodes for allocation of unassigned primaries in remote restore ([#14991](https://github.com/opensearch-project/OpenSearch/pull/14991)) - Add basic aggregation support for derived fields ([#14618](https://github.com/opensearch-project/OpenSearch/pull/14618)) ### Dependencies @@ -23,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Removed ### Fixed +- Fix constraint bug which allows more primary shards than average primary shards per index ([#14908](https://github.com/opensearch-project/OpenSearch/pull/14908)) - Fix missing value of FieldSort for unsigned_long ([#14963](https://github.com/opensearch-project/OpenSearch/pull/14963)) ### Security diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index a4f3f8258615d..ae173bbf06c4f 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -155,7 +155,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { ); public static final Setting IGNORE_THROTTLE_FOR_REMOTE_RESTORE = Setting.boolSetting( - "cluster.routing.allocation.ignore_throttle_for_restore", + "cluster.routing.allocation.remote_primary.ignore_throttle", true, Property.Dynamic, Property.NodeScope diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/DecideAllocateUnassignedTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/DecideAllocateUnassignedTests.java index 1bb9c00d99bd6..6df2ffc6149d5 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/DecideAllocateUnassignedTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/DecideAllocateUnassignedTests.java @@ -45,7 +45,7 @@ import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE; public class DecideAllocateUnassignedTests extends OpenSearchAllocationTestCase { - public void testSingleShardBalanceProducesSameResultsAsBalanceStep_IgnoreThrottle() { + public void testAllocateUnassignedRemoteRestore_IgnoreThrottle() { final String[] indices = { "idx1" }; // Create a cluster state with 1 indices, each with 1 started primary shard, and only // one node initially so that all primary shards get allocated to the same node. @@ -55,35 +55,18 @@ public void testSingleShardBalanceProducesSameResultsAsBalanceStep_IgnoreThrottl ClusterState clusterState = ClusterStateCreationUtils.state(1, indices, 1); clusterState = addNodesToClusterState(clusterState, 1); clusterState = addRestoringIndexToClusterState(clusterState, "idx2"); - final Set throttleNodes = new HashSet<>(); - throttleNodes.add("node_1"); - AllocationDecider allocationDecider = new AllocationDecider() { - @Override - public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - if (throttleNodes.contains(node.nodeId())) { - return Decision.THROTTLE; - } - return Decision.YES; - } - }; - AllocationDecider rebalanceDecider = new AllocationDecider() { - @Override - public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { - return Decision.YES; - } - }; - List allocationDeciders = Arrays.asList(rebalanceDecider, allocationDecider); + List allocationDeciders = getAllocationDecidersThrottleOnNode1(); RoutingAllocation routingAllocation = newRoutingAllocation(new AllocationDeciders(allocationDeciders), clusterState); // allocate and get the node that is now relocating Settings build = Settings.builder().put(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.getKey(), true).build(); BalancedShardsAllocator allocator = new BalancedShardsAllocator(build); allocator.allocate(routingAllocation); assertEquals(routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), "node_0"); - ; + assertEquals(routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).getIndexName(), "idx2"); assertFalse(routingAllocation.routingNodes().hasUnassignedPrimaries()); } - public void testSingleShardBalanceProducesSameResultsAsBalanceStep_Current() { + public void testAllocateUnassignedRemoteRestore() { final String[] indices = { "idx1" }; // Create a cluster state with 1 indices, each with 1 started primary shard, and only // one node initially so that all primary shards get allocated to the same node. @@ -93,6 +76,18 @@ public void testSingleShardBalanceProducesSameResultsAsBalanceStep_Current() { ClusterState clusterState = ClusterStateCreationUtils.state(1, indices, 1); clusterState = addNodesToClusterState(clusterState, 1); clusterState = addRestoringIndexToClusterState(clusterState, "idx2"); + List allocationDeciders = getAllocationDecidersThrottleOnNode1(); + RoutingAllocation routingAllocation = newRoutingAllocation(new AllocationDeciders(allocationDeciders), clusterState); + // allocate and get the node that is now relocating + Settings build = Settings.builder().put(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.getKey(), false).build(); + BalancedShardsAllocator allocator = new BalancedShardsAllocator(build); + allocator.allocate(routingAllocation); + assertEquals(routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), 0); + assertTrue(routingAllocation.routingNodes().hasUnassignedPrimaries()); + } + + private static List getAllocationDecidersThrottleOnNode1() { + // Allocation Deciders to throttle on `node_1` final Set throttleNodes = new HashSet<>(); throttleNodes.add("node_1"); AllocationDecider allocationDecider = new AllocationDecider() { @@ -104,21 +99,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return Decision.YES; } }; - AllocationDecider rebalanceDecider = new AllocationDecider() { - @Override - public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { - return Decision.YES; - } - }; - List allocationDeciders = Arrays.asList(rebalanceDecider, allocationDecider); - RoutingAllocation routingAllocation = newRoutingAllocation(new AllocationDeciders(allocationDeciders), clusterState); - // allocate and get the node that is now relocating - Settings build = Settings.builder().put(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.getKey(), false).build(); - BalancedShardsAllocator allocator = new BalancedShardsAllocator(build); - allocator.allocate(routingAllocation); - assertEquals(routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), 0); - ; - assertTrue(routingAllocation.routingNodes().hasUnassignedPrimaries()); + List allocationDeciders = Arrays.asList(allocationDecider); + return allocationDeciders; } private ClusterState addNodesToClusterState(ClusterState clusterState, int nodeId) { @@ -144,19 +126,9 @@ private ClusterState addRestoringIndexToClusterState(ClusterState clusterState, .build(); IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); - String primaryNode = null; - String relocatingNode = null; UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED, null); indexShardRoutingBuilder.addShard( - TestShardRouting.newShardRoutingRemoteRestore( - index, - shardId, - primaryNode, - relocatingNode, - true, - ShardRoutingState.UNASSIGNED, - unassignedInfo - ) + TestShardRouting.newShardRoutingRemoteRestore(index, shardId, null, null, true, ShardRoutingState.UNASSIGNED, unassignedInfo) ); final IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build();