diff --git a/CHANGELOG.md b/CHANGELOG.md index 184fa6f125f5d..7d41b19fcc8ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added - Add GeoTile and GeoHash Grid aggregations on GeoShapes. ([#5589](https://github.com/opensearch-project/OpenSearch/pull/5589)) - Disallow multiple data paths for search nodes ([#6427](https://github.com/opensearch-project/OpenSearch/pull/6427)) +- [Segment Replication] Allocation and rebalancing based on average primary shard count per index ([#6422](https://github.com/opensearch-project/OpenSearch/pull/6422)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java new file mode 100644 index 0000000000000..9286ab9a74c14 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java @@ -0,0 +1,216 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.IndexModule; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +import org.opensearch.cluster.OpenSearchAllocationTestCase.ShardAllocations; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationAllocationIT extends SegmentReplicationBaseIT { + + private void createIndex(String idxName, int shardCount, int replicaCount, boolean isSegRep) { + Settings.Builder builder = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shardCount) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replicaCount); + if (isSegRep) { + builder = builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + } else { + builder = builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT); + } + prepareCreate(idxName, builder).get(); + } + + public void enablePreferPrimaryBalance() { + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().put(BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), "true") + ) + ); + } + + /** + * This test verifies the happy path where primary shard allocation is balanced when multiple indices are created. + * + * This test in general passes without primary shard balance as well due to nature of allocation algorithm which + * assigns all primary shards first followed by replica copies. + */ + public void testBalancedPrimaryAllocation() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final int maxReplicaCount = 2; + final int maxShardCount = 5; + final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10); + final int numberOfIndices = randomIntBetween(5, 10); + + final List nodeNames = new ArrayList<>(); + logger.info("--> Creating {} nodes", nodeCount); + for (int i = 0; i < nodeCount; i++) { + nodeNames.add(internalCluster().startNode()); + } + enablePreferPrimaryBalance(); + int shardCount, replicaCount; + ClusterState state; + for (int i = 0; i < numberOfIndices; i++) { + shardCount = randomIntBetween(1, maxShardCount); + replicaCount = randomIntBetween(0, maxReplicaCount); + createIndex("test" + i, shardCount, replicaCount, i % 2 == 0); + logger.info("--> Creating index {} with shard count {} and replica count {}", "test" + i, shardCount, replicaCount); + ensureGreen(TimeValue.timeValueSeconds(60)); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info(ShardAllocations.printShardDistribution(state)); + } + verifyPerIndexPrimaryBalance(); + } + + /** + * This test verifies balanced primary shard allocation for a single index with large shard count in event of node + * going down and a new node joining the cluster. The results in shard distribution skewness and re-balancing logic + * ensures the primary shard distribution is balanced. + * + */ + public void testSingleIndexShardAllocation() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final int maxReplicaCount = 1; + final int maxShardCount = 50; + final int nodeCount = 5; + + final List nodeNames = new ArrayList<>(); + logger.info("--> Creating {} nodes", nodeCount); + for (int i = 0; i < nodeCount; i++) { + nodeNames.add(internalCluster().startNode()); + } + enablePreferPrimaryBalance(); + + ClusterState state; + createIndex("test", maxShardCount, maxReplicaCount, true); + ensureGreen(TimeValue.timeValueSeconds(60)); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info(ShardAllocations.printShardDistribution(state)); + verifyPerIndexPrimaryBalance(); + + // Remove a node + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames.get(0))); + ensureGreen(TimeValue.timeValueSeconds(60)); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info(ShardAllocations.printShardDistribution(state)); + verifyPerIndexPrimaryBalance(); + + // Add a new node + internalCluster().startDataOnlyNode(); + ensureGreen(TimeValue.timeValueSeconds(60)); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info(ShardAllocations.printShardDistribution(state)); + verifyPerIndexPrimaryBalance(); + } + + /** + * Similar to testSingleIndexShardAllocation test but creates multiple indices, multiple node adding in and getting + * removed. The test asserts post each such event that primary shard distribution is balanced across single index. + */ + public void testAllocationWithDisruption() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final int maxReplicaCount = 2; + final int maxShardCount = 5; + final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10); + final int numberOfIndices = randomIntBetween(1, 10); + + logger.info("--> Creating {} nodes", nodeCount); + final List nodeNames = new ArrayList<>(); + for (int i = 0; i < nodeCount; i++) { + nodeNames.add(internalCluster().startNode()); + } + enablePreferPrimaryBalance(); + + int shardCount, replicaCount, totalShardCount = 0, totalReplicaCount = 0; + ClusterState state; + for (int i = 0; i < numberOfIndices; i++) { + shardCount = randomIntBetween(1, maxShardCount); + totalShardCount += shardCount; + replicaCount = randomIntBetween(1, maxReplicaCount); + totalReplicaCount += replicaCount; + logger.info("--> Creating index test{} with primary {} and replica {}", i, shardCount, replicaCount); + createIndex("test" + i, shardCount, replicaCount, i % 2 == 0); + ensureGreen(TimeValue.timeValueSeconds(60)); + if (logger.isTraceEnabled()) { + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info(ShardAllocations.printShardDistribution(state)); + } + } + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info(ShardAllocations.printShardDistribution(state)); + verifyPerIndexPrimaryBalance(); + + final int additionalNodeCount = randomIntBetween(1, 5); + logger.info("--> Adding {} nodes", additionalNodeCount); + + internalCluster().startNodes(additionalNodeCount); + ensureGreen(TimeValue.timeValueSeconds(60)); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info(ShardAllocations.printShardDistribution(state)); + verifyPerIndexPrimaryBalance(); + + logger.info("--> Stop one third nodes"); + for (int i = 0; i < nodeCount; i += 3) { + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames.get(i))); + // give replica a chance to promote as primary before terminating node containing the replica + ensureGreen(TimeValue.timeValueSeconds(60)); + } + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info(ShardAllocations.printShardDistribution(state)); + verifyPerIndexPrimaryBalance(); + } + + /** + * Utility method which ensures cluster has balanced primary shard distribution across a single index. + * @throws Exception + */ + private void verifyPerIndexPrimaryBalance() throws Exception { + assertBusy(() -> { + final ClusterState currentState = client().admin().cluster().prepareState().execute().actionGet().getState(); + RoutingNodes nodes = currentState.getRoutingNodes(); + for (ObjectObjectCursor index : currentState.getRoutingTable().indicesRouting()) { + final int totalPrimaryShards = index.value.primaryShardsActive(); + final int avgPrimaryShardsPerNode = (int) Math.ceil(totalPrimaryShards * 1f / currentState.getRoutingNodes().size()); + for (RoutingNode node : nodes) { + final int primaryCount = node.shardsWithState(index.key, STARTED) + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .size(); + assertTrue(primaryCount <= avgPrimaryShardsPerNode); + } + } + }, 60, TimeUnit.SECONDS); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java index 3d9847ca35931..228ac5b504abc 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -8,58 +8,48 @@ import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; import java.util.function.Predicate; +import static org.opensearch.cluster.routing.allocation.RebalanceConstraints.PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID; +import static org.opensearch.cluster.routing.allocation.RebalanceConstraints.isPrimaryShardsPerIndexPerNodeBreached; + /** * Allocation constraints specify conditions which, if breached, reduce the - * priority of a node for receiving shard allocations. + * priority of a node for receiving unassigned shard allocations. * * @opensearch.internal */ public class AllocationConstraints { - public final long CONSTRAINT_WEIGHT = 1000000L; - private List> constraintPredicates; + + /** + * + * This constraint is only applied for unassigned shards to avoid overloading a newly added node. + * Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by this constraint. + */ + public final static String INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID = "index.shard.breach.constraint"; + private Map constraints; public AllocationConstraints() { - this.constraintPredicates = new ArrayList<>(1); - this.constraintPredicates.add(isIndexShardsPerNodeBreached()); + this.constraints = new HashMap<>(); + this.constraints.putIfAbsent( + INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, + new Constraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, isIndexShardsPerNodeBreached()) + ); + this.constraints.putIfAbsent( + PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, + new Constraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, isPrimaryShardsPerIndexPerNodeBreached()) + ); } - class ConstraintParams { - private ShardsBalancer balancer; - private BalancedShardsAllocator.ModelNode node; - private String index; - - ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) { - this.balancer = balancer; - this.node = node; - this.index = index; - } + public void updateAllocationConstraint(String constraint, boolean enable) { + this.constraints.get(constraint).setEnable(enable); } - /** - * Evaluates configured allocation constraint predicates for given node - index - * combination; and returns a weight value based on the number of breached - * constraints. - * - * Constraint weight should be added to the weight calculated via weight - * function, to reduce priority of allocating on nodes with breached - * constraints. - * - * This weight function is used only in case of unassigned shards to avoid overloading a newly added node. - * Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by this function. - */ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) { - int constraintsBreached = 0; - ConstraintParams params = new ConstraintParams(balancer, node, index); - for (Predicate predicate : constraintPredicates) { - if (predicate.test(params)) { - constraintsBreached++; - } - } - return constraintsBreached * CONSTRAINT_WEIGHT; + Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index); + return params.weight(constraints); } /** @@ -76,12 +66,11 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no * This constraint is breached when balancer attempts to allocate more than * average shards per index per node. */ - private Predicate isIndexShardsPerNodeBreached() { + public static Predicate isIndexShardsPerNodeBreached() { return (params) -> { - int currIndexShardsOnNode = params.node.numShards(params.index); - int allowedIndexShardsPerNode = (int) Math.ceil(params.balancer.avgShardsPerNode(params.index)); + int currIndexShardsOnNode = params.getNode().numShards(params.getIndex()); + int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex())); return (currIndexShardsOnNode >= allowedIndexShardsPerNode); }; } - } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java new file mode 100644 index 0000000000000..68be7dc770ca1 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer; + +import java.util.Map; +import java.util.Objects; +import java.util.function.Predicate; + +/** + * Defines a constraint useful to de-prioritize certain nodes as target of unassigned shards used in {@link AllocationConstraints} or + * re-balancing target used in {@link RebalanceConstraints} + * + * @opensearch.internal + */ +public class Constraint implements Predicate { + + public final static long CONSTRAINT_WEIGHT = 1000000L; + + private String name; + + private boolean enable; + private Predicate predicate; + + public Constraint(String name, Predicate constraintPredicate) { + this.name = name; + this.predicate = constraintPredicate; + this.enable = false; + } + + @Override + public boolean test(ConstraintParams constraintParams) { + return this.enable && predicate.test(constraintParams); + } + + public String getName() { + return name; + } + + public void setEnable(boolean enable) { + this.enable = enable; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Constraint that = (Constraint) o; + return name.equals(that.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + + static class ConstraintParams { + private ShardsBalancer balancer; + private BalancedShardsAllocator.ModelNode node; + private String index; + + ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) { + this.balancer = balancer; + this.node = node; + this.index = index; + } + + public ShardsBalancer getBalancer() { + return balancer; + } + + public BalancedShardsAllocator.ModelNode getNode() { + return node; + } + + public String getIndex() { + return index; + } + + /** + * Evaluates configured allocation constraint predicates for given node - index + * combination; and returns a weight value based on the number of breached + * constraints. + *

+ * Constraint weight should be added to the weight calculated via weight + * function, to reduce priority of allocating on nodes with breached + * constraints. + *

+ */ + public long weight(Map constraints) { + long totalConstraintWeight = 0; + for (Constraint constraint : constraints.values()) { + if (constraint.test(this)) { + totalConstraintWeight += CONSTRAINT_WEIGHT; + } + } + return totalConstraintWeight; + } + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java new file mode 100644 index 0000000000000..b619eb993894a --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Predicate; + +import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE; + +/** + * Constraints applied during rebalancing round; specify conditions which, if breached, reduce the + * priority of a node for receiving shard relocations. + * + * @opensearch.internal + */ +public class RebalanceConstraints { + public final static String PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID = PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(); + private Map constraints; + + public RebalanceConstraints() { + this.constraints = new HashMap<>(); + this.constraints.putIfAbsent( + PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, + new Constraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, isPrimaryShardsPerIndexPerNodeBreached()) + ); + } + + public void updateRebalanceConstraint(String constraint, boolean enable) { + this.constraints.get(constraint).setEnable(enable); + } + + public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) { + Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index); + return params.weight(constraints); + } + + /** + * When primary balance is preferred, add node constraint of average primary shards per node to give the node a + * higher weight resulting in lesser chances of being target of unassigned shard allocation or rebalancing target node + */ + public static Predicate isPrimaryShardsPerIndexPerNodeBreached() { + return (params) -> { + int currPrimaryShardsOnNode = params.getNode().numPrimaryShards(params.getIndex()); + int allowedPrimaryShardsPerNode = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode(params.getIndex())); + return currPrimaryShardsOnNode > allowedPrimaryShardsPerNode; + }; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index d8761e9b1a78e..a59035dea9d7b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -43,6 +43,7 @@ import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationConstraints; import org.opensearch.cluster.routing.allocation.MoveDecision; +import org.opensearch.cluster.routing.allocation.RebalanceConstraints; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.ShardAllocationDecision; import org.opensearch.common.inject.Inject; @@ -58,6 +59,8 @@ import java.util.Map; import java.util.Set; +import static org.opensearch.cluster.routing.allocation.AllocationConstraints.INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID; + /** * The {@link BalancedShardsAllocator} re-balances the nodes allocations * within an cluster based on a {@link WeightFunction}. The clusters balance is defined by four parameters which can be set @@ -93,6 +96,10 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.Dynamic, Property.NodeScope ); + + /** + * Move primary shards first from node for shard movement when shards can not stay on node anymore. {@link LocalShardsBalancer#moveShards()} + */ public static final Setting SHARD_MOVE_PRIMARY_FIRST_SETTING = Setting.boolSetting( "cluster.routing.allocation.move.primary_first", false, @@ -107,7 +114,22 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + /** + * Prefer per index primary shard balance by using {@link RebalanceConstraints#isPrimaryShardsPerIndexPerNodeBreached()} + * constraint which is used during unassigned shard allocation {@link LocalShardsBalancer#allocateUnassigned()} and + * shard re-balance/relocation to a different node{@link LocalShardsBalancer#balance()} . + */ + + public static final Setting PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE = Setting.boolSetting( + "cluster.routing.allocation.balance.prefer_primary", + false, + Property.Dynamic, + Property.NodeScope + ); + private volatile boolean movePrimaryFirst; + + private volatile boolean preferPrimaryShardBalance; private volatile WeightFunction weightFunction; private volatile float threshold; @@ -119,6 +141,8 @@ public BalancedShardsAllocator(Settings settings) { public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings)); setThreshold(THRESHOLD_SETTING.get(settings)); + setPreferPrimaryShardBalance(PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.get(settings)); + clusterSettings.addSettingsUpdateConsumer(PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); @@ -132,6 +156,12 @@ private void setWeightFunction(float indexBalance, float shardBalanceFactor) { weightFunction = new WeightFunction(indexBalance, shardBalanceFactor); } + private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) { + this.preferPrimaryShardBalance = preferPrimaryShardBalance; + this.weightFunction.updateAllocationConstraint(PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryShardBalance); + this.weightFunction.updateRebalanceConstraint(PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryShardBalance); + } + private void setThreshold(float threshold) { this.threshold = threshold; } @@ -142,7 +172,14 @@ public void allocate(RoutingAllocation allocation) { failAllocationOfNewPrimaries(allocation); return; } - final ShardsBalancer localShardsBalancer = new LocalShardsBalancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); + final ShardsBalancer localShardsBalancer = new LocalShardsBalancer( + logger, + allocation, + movePrimaryFirst, + weightFunction, + threshold, + preferPrimaryShardBalance + ); localShardsBalancer.allocateUnassigned(); localShardsBalancer.moveShards(); localShardsBalancer.balance(); @@ -157,7 +194,14 @@ public void allocate(RoutingAllocation allocation) { @Override public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) { - ShardsBalancer localShardsBalancer = new LocalShardsBalancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); + ShardsBalancer localShardsBalancer = new LocalShardsBalancer( + logger, + allocation, + movePrimaryFirst, + weightFunction, + threshold, + preferPrimaryShardBalance + ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; if (shard.unassigned()) { @@ -220,6 +264,13 @@ public float getShardBalance() { return weightFunction.shardBalance; } + /** + * Returns preferPrimaryShardBalance. + */ + public boolean getPreferPrimaryBalance() { + return preferPrimaryShardBalance; + } + /** * This class is the primary weight function used to create balanced over nodes and shards in the cluster. * Currently this function has 3 properties: @@ -253,6 +304,7 @@ static class WeightFunction { private final float theta0; private final float theta1; private AllocationConstraints constraints; + private RebalanceConstraints rebalanceConstraints; WeightFunction(float indexBalance, float shardBalance) { float sum = indexBalance + shardBalance; @@ -264,6 +316,9 @@ static class WeightFunction { this.indexBalance = indexBalance; this.shardBalance = shardBalance; this.constraints = new AllocationConstraints(); + this.rebalanceConstraints = new RebalanceConstraints(); + // Enable index shard per node breach constraint + updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); } public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode node, String index) { @@ -271,11 +326,24 @@ public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode return balancerWeight + constraints.weight(balancer, node, index); } + public float weightWithRebalanceConstraints(ShardsBalancer balancer, ModelNode node, String index) { + float balancerWeight = weight(balancer, node, index); + return balancerWeight + rebalanceConstraints.weight(balancer, node, index); + } + float weight(ShardsBalancer balancer, ModelNode node, String index) { final float weightShard = node.numShards() - balancer.avgShardsPerNode(); final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); return theta0 * weightShard + theta1 * weightIndex; } + + void updateAllocationConstraint(String constraint, boolean enable) { + this.constraints.updateAllocationConstraint(constraint, enable); + } + + void updateRebalanceConstraint(String constraint, boolean add) { + this.rebalanceConstraints.updateRebalanceConstraint(constraint, add); + } } /** @@ -313,6 +381,11 @@ public int numShards(String idx) { return index == null ? 0 : index.numShards(); } + public int numPrimaryShards(String idx) { + ModelIndex index = indices.get(idx); + return index == null ? 0 : index.numPrimaryShards(); + } + public int highestPrimary(String index) { ModelIndex idx = indices.get(index); if (idx != null) { @@ -374,9 +447,10 @@ public Balancer( RoutingAllocation allocation, boolean movePrimaryFirst, BalancedShardsAllocator.WeightFunction weight, - float threshold + float threshold, + boolean preferPrimaryBalance ) { - super(logger, allocation, movePrimaryFirst, weight, threshold); + super(logger, allocation, movePrimaryFirst, weight, threshold, preferPrimaryBalance); } } @@ -388,12 +462,17 @@ public Balancer( static final class ModelIndex implements Iterable { private final String id; private final Set shards = new HashSet<>(4); // expect few shards of same index to be allocated on same node + private final Set primaryShards = new HashSet<>(); private int highestPrimary = -1; ModelIndex(String id) { this.id = id; } + public int numPrimaryShards() { + return primaryShards.size(); + } + public int highestPrimary() { if (highestPrimary == -1) { int maxId = -1; @@ -423,12 +502,20 @@ public Iterator iterator() { public void removeShard(ShardRouting shard) { highestPrimary = -1; assert shards.contains(shard) : "Shard not allocated on current node: " + shard; + if (shard.primary()) { + assert primaryShards.contains(shard) : "Primary shard not allocated on current node: " + shard; + primaryShards.remove(shard); + } shards.remove(shard); } public void addShard(ShardRouting shard) { highestPrimary = -1; - assert !shards.contains(shard) : "Shard already allocated on current node: " + shard; + assert shards.contains(shard) == false : "Shard already allocated on current node: " + shard; + if (shard.primary()) { + assert primaryShards.contains(shard) == false : "Primary shard already allocated on current node: " + shard; + primaryShards.add(shard); + } shards.add(shard); } @@ -476,7 +563,7 @@ public void reset(String index) { } public float weight(ModelNode node) { - return function.weight(balancer, node, index); + return function.weightWithRebalanceConstraints(balancer, node, index); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 8570a16fd690c..ae6b8fd39978e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -42,6 +42,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -58,6 +59,8 @@ public class LocalShardsBalancer extends ShardsBalancer { private final RoutingAllocation allocation; private final RoutingNodes routingNodes; private final boolean movePrimaryFirst; + + private final boolean preferPrimaryBalance; private final BalancedShardsAllocator.WeightFunction weight; private final float threshold; @@ -71,7 +74,8 @@ public LocalShardsBalancer( RoutingAllocation allocation, boolean movePrimaryFirst, BalancedShardsAllocator.WeightFunction weight, - float threshold + float threshold, + boolean preferPrimaryBalance ) { this.logger = logger; this.allocation = allocation; @@ -84,6 +88,7 @@ public LocalShardsBalancer( nodes = Collections.unmodifiableMap(buildModelFromAssigned()); sorter = newNodeSorter(); inEligibleTargetNode = new HashSet<>(); + this.preferPrimaryBalance = preferPrimaryBalance; } /** @@ -101,6 +106,11 @@ public float avgShardsPerNode(String index) { return ((float) metadata.index(index).getTotalNumberOfShards()) / nodes.size(); } + @Override + public float avgPrimaryShardsPerNode(String index) { + return ((float) metadata.index(index).getNumberOfShards()) / nodes.size(); + } + /** * Returns the global average of shards per node */ @@ -223,7 +233,7 @@ MoveDecision decideRebalance(final ShardRouting shard) { // this is a comparison of the number of shards on this node to the number of shards // that should be on each node on average (both taking the cluster as a whole into account // as well as shards per index) - final float nodeWeight = weight.weight(this, node, idxName); + final float nodeWeight = weight.weightWithRebalanceConstraints(this, node, idxName); // if the node we are examining has a worse (higher) weight than the node the shard is // assigned to, then there is no way moving the shard to the node with the worse weight // can make the balance of the cluster better, so we check for that here @@ -959,6 +969,7 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { } private static final Comparator BY_DESCENDING_SHARD_ID = Comparator.comparing(ShardRouting::shardId).reversed(); + private static final Comparator PRIMARY_FIRST = Comparator.comparing(ShardRouting::primary).reversed(); /** * Tries to find a relocation from the max node to the minimal node for an arbitrary shard of the given index on the @@ -969,12 +980,16 @@ private boolean tryRelocateShard(BalancedShardsAllocator.ModelNode minNode, Bala final BalancedShardsAllocator.ModelIndex index = maxNode.getIndex(idx); if (index != null) { logger.trace("Try relocating shard of [{}] from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId()); - final Iterable shardRoutings = StreamSupport.stream(index.spliterator(), false) + Stream routingStream = StreamSupport.stream(index.spliterator(), false) .filter(ShardRouting::started) // cannot rebalance unassigned, initializing or relocating shards anyway - .filter(maxNode::containsShard) - .sorted(BY_DESCENDING_SHARD_ID) // check in descending order of shard id so that the decision is deterministic - ::iterator; + .filter(maxNode::containsShard) // check shards which are present on heaviest node + .sorted(BY_DESCENDING_SHARD_ID); // check in descending order of shard id so that the decision is deterministic + // If primary balance is preferred then prioritize moving primaries first + if (preferPrimaryBalance == true) { + routingStream = routingStream.sorted(PRIMARY_FIRST); + } + final Iterable shardRoutings = routingStream::iterator; final AllocationDeciders deciders = allocation.deciders(); for (ShardRouting shard : shardRoutings) { final Decision rebalanceDecision = deciders.canRebalance(shard, allocation); @@ -985,9 +1000,15 @@ private boolean tryRelocateShard(BalancedShardsAllocator.ModelNode minNode, Bala if (allocationDecision.type() == Decision.Type.NO) { continue; } + // This is a safety net which prevents un-necessary primary shard relocations from maxNode to minNode when + // doing such relocation wouldn't help in primary balance. + if (preferPrimaryBalance == true + && shard.primary() + && maxNode.numPrimaryShards(shard.getIndexName()) - minNode.numPrimaryShards(shard.getIndexName()) < 2) { + continue; + } final Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision); - maxNode.removeShard(shard); long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java index 593e6998141fb..b74393e1eec4c 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java @@ -72,4 +72,12 @@ public float avgShardsPerNode() { public float avgShardsPerNode(String index) { return Float.MAX_VALUE; } + + /** + * Returns the average of primary shards per node for the given index + */ + public float avgPrimaryShardsPerNode(String index) { + return Float.MAX_VALUE; + } + } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index c616d272acc98..497bd752b40d1 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -232,6 +232,7 @@ public void apply(Settings value, Settings current, Settings previous) { AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING, BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, + BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE, BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java index ae10a92a5104e..b2e4521811569 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java @@ -18,6 +18,9 @@ import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.cluster.routing.allocation.AllocationConstraints.INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID; +import static org.opensearch.cluster.routing.allocation.Constraint.CONSTRAINT_WEIGHT; +import static org.opensearch.cluster.routing.allocation.RebalanceConstraints.PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID; public class AllocationConstraintsTests extends OpenSearchAllocationTestCase { @@ -33,13 +36,18 @@ public void testSettings() { settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalanceFactor); settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), threshold); + settings.put(BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), true); service.applySettings(settings.build()); assertEquals(indexBalanceFactor, allocator.getIndexBalance(), 0.01); assertEquals(shardBalance, allocator.getShardBalance(), 0.01); assertEquals(threshold, allocator.getThreshold(), 0.01); + assertEquals(true, allocator.getPreferPrimaryBalance()); + settings.put(BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), false); + service.applySettings(settings.build()); + assertEquals(false, allocator.getPreferPrimaryBalance()); } /** @@ -50,6 +58,7 @@ public void testIndexShardsPerNodeConstraint() { ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); AllocationConstraints constraints = new AllocationConstraints(); + constraints.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); int shardCount = randomIntBetween(1, 500); float avgShardsPerNode = 1.0f + (random().nextFloat()) * 999.0f; @@ -58,9 +67,63 @@ public void testIndexShardsPerNodeConstraint() { when(node.numShards(anyString())).thenReturn(shardCount); when(node.getNodeId()).thenReturn("test-node"); - long expectedWeight = (shardCount >= avgShardsPerNode) ? constraints.CONSTRAINT_WEIGHT : 0; + long expectedWeight = (shardCount >= avgShardsPerNode) ? CONSTRAINT_WEIGHT : 0; assertEquals(expectedWeight, constraints.weight(balancer, node, "index")); } + /** + * Test constraint evaluation logic when with different values of ConstraintMode + * for IndexShardPerNode constraint satisfied and breached. + */ + public void testIndexPrimaryShardsPerNodeConstraint() { + ShardsBalancer balancer = mock(LocalShardsBalancer.class); + BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); + AllocationConstraints constraints = new AllocationConstraints(); + constraints.updateAllocationConstraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, true); + + int primaryShardCount = 1; + float avgPrimaryShardsPerNode = 2f; + + when(balancer.avgPrimaryShardsPerNode(anyString())).thenReturn(avgPrimaryShardsPerNode); + when(node.numPrimaryShards(anyString())).thenReturn(primaryShardCount); + when(node.getNodeId()).thenReturn("test-node"); + + assertEquals(0, constraints.weight(balancer, node, "index")); + + primaryShardCount = 3; + when(node.numPrimaryShards(anyString())).thenReturn(primaryShardCount); + assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, "index")); + + constraints.updateAllocationConstraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, false); + assertEquals(0, constraints.weight(balancer, node, "index")); + } + + /** + * Test constraint evaluation logic when with different values of ConstraintMode + * for IndexShardPerNode constraint satisfied and breached. + */ + public void testAllConstraint() { + ShardsBalancer balancer = mock(LocalShardsBalancer.class); + BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); + AllocationConstraints constraints = new AllocationConstraints(); + constraints.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); + constraints.updateAllocationConstraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, true); + + int shardCount = randomIntBetween(1, 500); + int primaryShardCount = randomIntBetween(1, shardCount); + float avgShardsPerNode = 1.0f + (random().nextFloat()) * 999.0f; + float avgPrimaryShardsPerNode = (random().nextFloat()) * avgShardsPerNode; + + when(balancer.avgPrimaryShardsPerNode(anyString())).thenReturn(avgPrimaryShardsPerNode); + when(node.numPrimaryShards(anyString())).thenReturn(primaryShardCount); + when(balancer.avgShardsPerNode(anyString())).thenReturn(avgShardsPerNode); + when(node.numShards(anyString())).thenReturn(shardCount); + when(node.getNodeId()).thenReturn("test-node"); + + long expectedWeight = (shardCount >= avgShardsPerNode) ? CONSTRAINT_WEIGHT : 0; + expectedWeight += primaryShardCount > avgPrimaryShardsPerNode ? CONSTRAINT_WEIGHT : 0; + assertEquals(expectedWeight, constraints.weight(balancer, node, "index")); + } + } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java index 1ba69694eaec1..b472c043b2843 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -33,10 +33,12 @@ package org.opensearch.cluster.routing.allocation; import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.ArrayUtil; import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.EmptyClusterInfoService; @@ -44,22 +46,37 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.opensearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.snapshots.EmptySnapshotsInfoService; import org.opensearch.test.gateway.TestGatewayAllocator; import org.hamcrest.Matchers; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.stream.Collectors; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; @@ -75,7 +92,7 @@ public class BalanceConfigurationTests extends OpenSearchAllocationTestCase { public void testIndexBalance() { /* Tests balance over indices only */ final float indexBalance = 1.0f; - final float replicaBalance = 0.0f; + final float shardBalance = 0.0f; final float balanceThreshold = 1.0f; Settings.Builder settings = Settings.builder(); @@ -84,7 +101,7 @@ public void testIndexBalance() { ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() ); settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); - settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); + settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); @@ -123,10 +140,389 @@ public void testIndexBalance() { ); } - public void testReplicaBalance() { + private Settings.Builder getSettingsBuilderForPrimaryBalance() { + return getSettingsBuilderForPrimaryBalance(true); + } + + private Settings.Builder getSettingsBuilderForPrimaryBalance(boolean preferPrimaryBalance) { + final float indexBalance = 0.55f; + final float shardBalance = 0.45f; + final float balanceThreshold = 1.0f; + + Settings.Builder settings = Settings.builder(); + settings.put( + ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), + ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() + ); + settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); + settings.put(BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryBalance); + settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); + settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); + return settings; + } + + private IndexMetadata getIndexMetadata(String indexName, int shardCount, int replicaCount) { + return IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, shardCount) + .put(SETTING_NUMBER_OF_REPLICAS, replicaCount) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + } + + /** + * This test verifies that with only primary shard balance, the primary shard distribution per index is balanced. + */ + public void testPrimaryBalance() { + AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryBalance().build(), new TestGatewayAllocator()); + + ClusterState clusterState = initCluster(strategy); + verifyPerIndexPrimaryBalance(clusterState); + + clusterState = addNode(clusterState, strategy); + verifyPerIndexPrimaryBalance(clusterState); + + clusterState = removeNodes(clusterState, strategy); + verifyPerIndexPrimaryBalance(clusterState); + } + + /** + * This test verifies primary shard balance is not attained without PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE setting. + */ + public void testPrimaryBalanceWithoutPreferPrimaryBalanceSetting() { + final int numberOfNodes = 5; + final int numberOfIndices = 5; + final int numberOfShards = 25; + final int numberOfReplicas = 1; + + final int numberOfRuns = 5; + int balanceFailed = 0; + + AllocationService strategy = createAllocationService( + getSettingsBuilderForPrimaryBalance(false).build(), + new TestGatewayAllocator() + ); + for (int i = 0; i < numberOfRuns; i++) { + ClusterState clusterState = initCluster(strategy, true, numberOfIndices, numberOfNodes, numberOfShards, numberOfReplicas); + clusterState = removeOneNode(clusterState, strategy); + logger.info(ShardAllocations.printShardDistribution(clusterState)); + try { + verifyPerIndexPrimaryBalance(clusterState); + } catch (AssertionError e) { + balanceFailed++; + logger.info("Expected assertion failure"); + } + } + assertTrue(balanceFailed >= 4); + } + + /** + * This test verifies primary shard balance is attained with PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE setting. + */ + public void testPrimaryBalanceWithPreferPrimaryBalanceSetting() { + final int numberOfNodes = 5; + final int numberOfIndices = 5; + final int numberOfShards = 25; + final int numberOfReplicas = 1; + final int numberOfRuns = 5; + int balanceFailed = 0; + + AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryBalance().build(), new TestGatewayAllocator()); + for (int i = 0; i < numberOfRuns; i++) { + ClusterState clusterState = initCluster(strategy, true, numberOfIndices, numberOfNodes, numberOfShards, numberOfReplicas); + clusterState = removeOneNode(clusterState, strategy); + logger.info(ShardAllocations.printShardDistribution(clusterState)); + try { + verifyPerIndexPrimaryBalance(clusterState); + } catch (AssertionError e) { + balanceFailed++; + logger.info("Unexpected assertion failure"); + } + } + assertTrue(balanceFailed <= 1); + } + + /** + * This test verifies the allocation logic when nodes breach multiple constraints and ensure node breaching min + * constraints chosen for allocation. + * + * This test mimics a cluster state containing four nodes, where one node breaches two constraints while one breaches + * only one. In order to have nodes breach constraints, test excludes two nodes (node2, node3) from allocation so + * that other two nodes (node0, node1) have all shards assignments resulting in constraints breach. Test asserts that + * the new primary shard assignment lands on the node breaching one constraint(node1), while replica land on the other + * (node0). Final shard allocation state. + * + routing_nodes: + -----node_id[node2][V] + -----node_id[node3][V] + -----node_id[node0][V] + --------[test][1], node[node0], [P], s[STARTED], a[id=7B4dVsrjSoC6imBHO60mrQ] + --------[test][0], node[node0], [P], s[STARTED], a[id=0HySaPcyRhiKrH6QLA3evw] + --------[test][2], node[node0], [R], s[STARTED], a[id=pB3iuLKZSC--2yNS0trbgA] + -----node_id[node1][V] + --------[test][2], node[node1], [P], s[STARTED], a[id=QWN_T6xpQiWGSD8GJnX-bQ] + --------[test][1], node[node1], [R], s[STARTED], a[id=ChWdQiOdSdKrTPxwceIu1w] + --------[test][0], node[node1], [R], s[STARTED], a[id=5Adc5JteQ8-lY2xfsHUg-Q] + * + */ + public void testPrimaryBalanceWithContrainstBreaching() { + // Mark node2, node 3 excluded (FilterAllocationDecider) so that all allocations land on node 0, node 1 + Settings.Builder settingsBuilder = getSettingsBuilderForPrimaryBalance(); + settingsBuilder.put("cluster.routing.allocation.exclude._id", "node2,node3"); + + AllocationService strategy = createAllocationService(settingsBuilder.build(), new TestGatewayAllocator()); + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + List nodesList = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + final DiscoveryNode node = newNode("node" + i); + discoBuilder = discoBuilder.add(node); + nodesList.add(node.getId()); + } + discoBuilder.localNodeId(newNode("node_0").getId()); + discoBuilder.clusterManagerNodeId(newNode("node_0").getId()); + + Metadata.Builder metadata = Metadata.builder(); + metadata.persistentSettings(settingsBuilder.build()); + RoutingTable.Builder routingTable = RoutingTable.builder(); + // build index metadata + IndexMetadata indexMetadata = getIndexMetadata("test", 3, 1); + // Build index routing table + IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()); + ShardId shardId_0 = new ShardId(indexMetadata.getIndex(), 0); + ShardId shardId_1 = new ShardId(indexMetadata.getIndex(), 1); + ShardId shardId_2 = new ShardId(indexMetadata.getIndex(), 2); + UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null); + IndexShardRoutingTable.Builder indexShardRoutingBuilder_0 = new IndexShardRoutingTable.Builder(shardId_0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder_1 = new IndexShardRoutingTable.Builder(shardId_1); + IndexShardRoutingTable.Builder indexShardRoutingBuilder_2 = new IndexShardRoutingTable.Builder(shardId_2); + indexShardRoutingBuilder_0.addShard(TestShardRouting.newShardRouting(shardId_0, nodesList.get(0), true, ShardRoutingState.STARTED)); + indexShardRoutingBuilder_1.addShard(TestShardRouting.newShardRouting(shardId_1, nodesList.get(0), true, ShardRoutingState.STARTED)); + indexShardRoutingBuilder_0.addShard( + TestShardRouting.newShardRouting(shardId_0, nodesList.get(1), false, ShardRoutingState.STARTED) + ); + indexShardRoutingBuilder_1.addShard( + TestShardRouting.newShardRouting(shardId_1, nodesList.get(1), false, ShardRoutingState.STARTED) + ); + indexShardRoutingBuilder_2.addShard( + TestShardRouting.newShardRouting( + shardId_2, + null, + null, + true, + UNASSIGNED, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + unassignedInfo + ) + ); + indexShardRoutingBuilder_2.addShard( + TestShardRouting.newShardRouting( + shardId_2, + null, + null, + false, + UNASSIGNED, + RecoverySource.PeerRecoverySource.INSTANCE, + unassignedInfo + ) + ); + + IndexShardRoutingTable indexShardRoutingTable_0 = indexShardRoutingBuilder_0.build(); + IndexShardRoutingTable indexShardRoutingTable_1 = indexShardRoutingBuilder_1.build(); + + indexRoutingTable.addIndexShard(indexShardRoutingBuilder_0.build()); + indexRoutingTable.addIndexShard(indexShardRoutingBuilder_1.build()); + indexRoutingTable.addIndexShard(indexShardRoutingBuilder_2.build()); + routingTable.add(indexRoutingTable); + + IndexMetadata.Builder indexMetaDataBuilder = IndexMetadata.builder(indexMetadata); + indexMetaDataBuilder.putInSyncAllocationIds(0, indexShardRoutingTable_0.getAllAllocationIds()); + indexMetaDataBuilder.putInSyncAllocationIds(1, indexShardRoutingTable_1.getAllAllocationIds()); + metadata.put(indexMetaDataBuilder.build(), false); + + ClusterState.Builder stateBuilder = ClusterState.builder(new ClusterName("test")); + stateBuilder.nodes(discoBuilder); + stateBuilder.metadata(metadata.generateClusterUuidIfNeeded().build()); + stateBuilder.routingTable(routingTable.build()); + ClusterState clusterState = stateBuilder.build(); + clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = applyAllocationUntilNoChange(clusterState, strategy); + + logger.info(ShardAllocations.printShardDistribution(clusterState)); + ShardRouting primaryShard = clusterState.routingTable().shardRoutingTable(shardId_2).primaryShard(); + List replicaShards = clusterState.routingTable().shardRoutingTable(shardId_2).replicaShards(); + + assertTrue(primaryShard.started()); + assertEquals("node1", primaryShard.currentNodeId()); + + assertEquals(1, replicaShards.size()); + assertTrue(replicaShards.get(0).started()); + assertEquals("node0", replicaShards.get(0).currentNodeId()); + } + + /** + * This test mimics a cluster state which can not be rebalanced due to + * {@link org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider} + * allocation decider which prevents shard relocation, leaving cluster unbalanced on primaries. + * + * There are two nodes (N1, N2) where all primaries land on N1 while replicas on N2. + * N1 N2 + * ------ -------- + * P1 R1 + * P2 R2 + * + * -----node_id[node_0][V] + * --------[test][1], node[node_0], [P], s[STARTED], a[id=xqfZSToVSQaff2xvuxh_yA] + * --------[test][0], node[node_0], [P], s[STARTED], a[id=VGjOeBGdSmu3pJR6T7v29A] + * -----node_id[node_1][V] + * --------[test][1], node[node_1], [R], s[STARTED], a[id=zZI0R8FBQkWMNndEZt9d8w] + * --------[test][0], node[node_1], [R], s[STARTED], a[id=8IpwEMQ2QEuj5rQOxBagSA] + */ + public void testPrimaryBalance_NotSolved_1() { + AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryBalance().build(), new TestGatewayAllocator()); + + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + Set nodes = new HashSet<>(); + for (int i = 0; i < 2; i++) { + final DiscoveryNode node = newNode("node_" + i); + discoBuilder = discoBuilder.add(node); + nodes.add(node.getId()); + } + discoBuilder.localNodeId(newNode("node_0").getId()); + discoBuilder.clusterManagerNodeId(newNode("node_0").getId()); + Metadata.Builder metadata = Metadata.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + List nodesList = new ArrayList<>(nodes); + // build index metadata + IndexMetadata indexMetadata = getIndexMetadata("test", 2, 1); + IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()); + ShardId shardId_0 = new ShardId(indexMetadata.getIndex(), 0); + ShardId shardId_1 = new ShardId(indexMetadata.getIndex(), 1); + IndexShardRoutingTable.Builder indexShardRoutingBuilder_0 = new IndexShardRoutingTable.Builder(shardId_0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder_1 = new IndexShardRoutingTable.Builder(shardId_1); + indexShardRoutingBuilder_0.addShard(TestShardRouting.newShardRouting(shardId_0, nodesList.get(0), true, ShardRoutingState.STARTED)); + indexShardRoutingBuilder_1.addShard(TestShardRouting.newShardRouting(shardId_1, nodesList.get(0), true, ShardRoutingState.STARTED)); + indexShardRoutingBuilder_0.addShard( + TestShardRouting.newShardRouting(shardId_0, nodesList.get(1), false, ShardRoutingState.STARTED) + ); + indexShardRoutingBuilder_1.addShard( + TestShardRouting.newShardRouting(shardId_1, nodesList.get(1), false, ShardRoutingState.STARTED) + ); + indexRoutingTable.addIndexShard(indexShardRoutingBuilder_0.build()); + indexRoutingTable.addIndexShard(indexShardRoutingBuilder_1.build()); + metadata.put(indexMetadata, false); + routingTable.add(indexRoutingTable); + + ClusterState.Builder stateBuilder = ClusterState.builder(new ClusterName("test")); + stateBuilder.nodes(discoBuilder); + stateBuilder.metadata(metadata.generateClusterUuidIfNeeded().build()); + stateBuilder.routingTable(routingTable.build()); + ClusterState clusterState = stateBuilder.build(); + + clusterState = strategy.reroute(clusterState, "reroute"); + boolean balanced = true; + logger.info(ShardAllocations.printShardDistribution(clusterState)); + try { + verifyPerIndexPrimaryBalance(clusterState); + } catch (AssertionError e) { + balanced = false; + } + assertFalse(balanced); + } + + /** + * This test mimics cluster state where re-balancing is not possible due to existing limitation of re-balancing + * logic which applies at index level i.e. balance shards single index across all nodes. This will be solved when + * primary shard count across indices, constraint is added. + * + * Please note, P1, P2 belongs to different index + * + * N1 N2 + * ------ -------- + * P1 R1 + * P2 R2 + * + * -----node_id[node_0][V] + * --------[test1][0], node[node_0], [P], s[STARTED], a[id=u7qtyy5AR42hgEa-JpeArg] + * --------[test0][0], node[node_0], [P], s[STARTED], a[id=BQrLSo6sQyGlcLdVvGgqLQ] + * -----node_id[node_1][V] + * --------[test1][0], node[node_1], [R], s[STARTED], a[id=TDqbfvAfSFK6lnv3aOU9bA] + * --------[test0][0], node[node_1], [R], s[STARTED], a[id=E85-jhiEQwuB43u5Wq1mAw] + * + */ + public void testPrimaryBalance_NotSolved_2() { + AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryBalance().build(), new TestGatewayAllocator()); + + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + Set nodes = new HashSet<>(); + for (int i = 0; i < 2; i++) { + final DiscoveryNode node = newNode("node_" + i); + discoBuilder = discoBuilder.add(node); + nodes.add(node.getId()); + } + discoBuilder.localNodeId(newNode("node_0").getId()); + discoBuilder.clusterManagerNodeId(newNode("node_0").getId()); + Metadata.Builder metadata = Metadata.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + List nodesList = new ArrayList<>(nodes); + // build index metadata + IndexMetadata indexMetadata_0 = getIndexMetadata("test0", 1, 1); + IndexMetadata indexMetadata_1 = getIndexMetadata("test1", 1, 1); + IndexRoutingTable.Builder indexRoutingTable_0 = IndexRoutingTable.builder(indexMetadata_0.getIndex()); + IndexRoutingTable.Builder indexRoutingTable_1 = IndexRoutingTable.builder(indexMetadata_1.getIndex()); + ShardId shardId_0 = new ShardId(indexMetadata_0.getIndex(), 0); + ShardId shardId_1 = new ShardId(indexMetadata_1.getIndex(), 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder_0 = new IndexShardRoutingTable.Builder(shardId_0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder_1 = new IndexShardRoutingTable.Builder(shardId_1); + indexShardRoutingBuilder_0.addShard(TestShardRouting.newShardRouting(shardId_0, nodesList.get(0), true, ShardRoutingState.STARTED)); + indexShardRoutingBuilder_1.addShard(TestShardRouting.newShardRouting(shardId_1, nodesList.get(0), true, ShardRoutingState.STARTED)); + indexShardRoutingBuilder_0.addShard( + TestShardRouting.newShardRouting(shardId_0, nodesList.get(1), false, ShardRoutingState.STARTED) + ); + indexShardRoutingBuilder_1.addShard( + TestShardRouting.newShardRouting(shardId_1, nodesList.get(1), false, ShardRoutingState.STARTED) + ); + indexRoutingTable_0.addIndexShard(indexShardRoutingBuilder_0.build()); + indexRoutingTable_1.addIndexShard(indexShardRoutingBuilder_1.build()); + metadata.put(indexMetadata_0, false); + metadata.put(indexMetadata_1, false); + routingTable.add(indexRoutingTable_0); + routingTable.add(indexRoutingTable_1); + ClusterState.Builder stateBuilder = ClusterState.builder(new ClusterName("test")); + stateBuilder.nodes(discoBuilder); + stateBuilder.metadata(metadata.generateClusterUuidIfNeeded().build()); + stateBuilder.routingTable(routingTable.build()); + ClusterState clusterState = stateBuilder.build(); + + clusterState = strategy.reroute(clusterState, "reroute"); + logger.info(ShardAllocations.printShardDistribution(clusterState)); + // The cluster is balanced when considering indices individually not balanced when considering global state + verifyPerIndexPrimaryBalance(clusterState); + } + + public void verifyPerIndexPrimaryBalance(ClusterState currentState) { + RoutingNodes nodes = currentState.getRoutingNodes(); + for (ObjectObjectCursor index : currentState.getRoutingTable().indicesRouting()) { + final int totalPrimaryShards = index.value.primaryShardsActive(); + final int avgPrimaryShardsPerNode = (int) Math.ceil(totalPrimaryShards * 1f / currentState.getRoutingNodes().size()); + + for (RoutingNode node : nodes) { + final int primaryCount = node.shardsWithState(index.key, STARTED) + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .size(); + assertTrue(primaryCount <= avgPrimaryShardsPerNode); + } + } + } + + public void testShardBalance() { /* Tests balance over replicas only */ final float indexBalance = 0.0f; - final float replicaBalance = 1.0f; + final float shardBalance = 1.0f; final float balanceThreshold = 1.0f; Settings.Builder settings = Settings.builder(); @@ -135,13 +531,13 @@ public void testReplicaBalance() { ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() ); settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); - settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); + settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); ClusterState clusterState = initCluster(strategy); - assertReplicaBalance( + assertShardBalance( clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, @@ -151,7 +547,7 @@ public void testReplicaBalance() { ); clusterState = addNode(clusterState, strategy); - assertReplicaBalance( + assertShardBalance( clusterState.getRoutingNodes(), numberOfNodes + 1, numberOfIndices, @@ -161,7 +557,7 @@ public void testReplicaBalance() { ); clusterState = removeNodes(clusterState, strategy); - assertReplicaBalance( + assertShardBalance( clusterState.getRoutingNodes(), numberOfNodes + 1 - (numberOfNodes + 1) / 2, numberOfIndices, @@ -172,12 +568,27 @@ public void testReplicaBalance() { } private ClusterState initCluster(AllocationService strategy) { + return initCluster(strategy, false, numberOfIndices, numberOfNodes, numberOfShards, numberOfReplicas); + } + + private ClusterState initCluster( + AllocationService strategy, + boolean segrep, + int numberOfIndices, + int numberOfNodes, + int numberOfShards, + int numberOfReplicas + ) { Metadata.Builder metadataBuilder = Metadata.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); for (int i = 0; i < numberOfIndices; i++) { + Settings.Builder settingsBuilder = settings(Version.CURRENT); + if (segrep) { + settingsBuilder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + } IndexMetadata.Builder index = IndexMetadata.builder("test" + i) - .settings(settings(Version.CURRENT)) + .settings(settingsBuilder) .numberOfShards(numberOfShards) .numberOfReplicas(numberOfReplicas); metadataBuilder = metadataBuilder.put(index); @@ -202,21 +613,13 @@ private ClusterState initCluster(AllocationService strategy) { .routingTable(initialRoutingTable) .build(); clusterState = strategy.reroute(clusterState, "reroute"); - - logger.info("restart all the primary shards, replicas will start initializing"); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); - - logger.info("start the replica shards"); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); - - logger.info("complete rebalancing"); - return applyStartedShardsUntilNoChange(clusterState, strategy); + return applyAllocationUntilNoChange(clusterState, strategy); } private ClusterState addNode(ClusterState clusterState, AllocationService strategy) { logger.info("now, start 1 more node, check that rebalancing will happen because we set it to always"); clusterState = ClusterState.builder(clusterState) - .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node" + numberOfNodes))) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node" + clusterState.getRoutingNodes().size()))) .build(); RoutingTable routingTable = strategy.reroute(clusterState, "reroute").routingTable(); @@ -226,7 +629,16 @@ private ClusterState addNode(ClusterState clusterState, AllocationService strate return applyStartedShardsUntilNoChange(clusterState, strategy); } + private ClusterState removeOneNode(ClusterState clusterState, AllocationService strategy) { + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()); + nodes.remove("node0"); + clusterState = ClusterState.builder(clusterState).nodes(nodes.build()).build(); + clusterState = strategy.disassociateDeadNodes(clusterState, randomBoolean(), "removed nodes"); + return applyAllocationUntilNoChange(clusterState, strategy); + } + private ClusterState removeNodes(ClusterState clusterState, AllocationService strategy) { + int numberOfNodes = clusterState.getRoutingNodes().size(); logger.info("Removing half the nodes (" + (numberOfNodes + 1) / 2 + ")"); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()); @@ -240,7 +652,10 @@ private ClusterState removeNodes(ClusterState clusterState, AllocationService st if (removed) { clusterState = strategy.disassociateDeadNodes(clusterState, randomBoolean(), "removed nodes"); } + return applyAllocationUntilNoChange(clusterState, strategy); + } + private ClusterState applyAllocationUntilNoChange(ClusterState clusterState, AllocationService strategy) { logger.info("start all the primary shards, replicas will start initializing"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); @@ -254,7 +669,7 @@ private ClusterState removeNodes(ClusterState clusterState, AllocationService st return applyStartedShardsUntilNoChange(clusterState, strategy); } - private void assertReplicaBalance( + private void assertShardBalance( RoutingNodes nodes, int numberOfNodes, int numberOfIndices, @@ -494,5 +909,4 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing } } } - } diff --git a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java index 1d527140dc038..ff0ad0c69e4e5 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java @@ -60,11 +60,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Formatter; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.TreeMap; import static java.util.Collections.emptyMap; import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; @@ -306,6 +309,101 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat } } + /** + * Utility class to show shards distribution across nodes. + */ + public static class ShardAllocations { + + private static final String separator = "==================================================="; + private static final String ONE_LINE_RETURN = "\n"; + private static final String TWO_LINE_RETURN = "\n\n"; + + /** + Store shard primary/replica shard count against a node. + String: NodeId + int[]: tuple storing primary shard count in 0th index and replica's in 1st + */ + static TreeMap nodeToShardCountMap = new TreeMap<>(); + + /** + * Helper map containing NodeName to NodeId + */ + static TreeMap nameToNodeId = new TreeMap<>(); + + /* + Unassigned array containing primary at 0, replica at 1 + */ + static int[] unassigned = new int[2]; + + static int[] totalShards = new int[2]; + + private final static String printShardAllocationWithHeader(int[] shardCount) { + StringBuffer sb = new StringBuffer(); + Formatter formatter = new Formatter(sb, Locale.getDefault()); + formatter.format("%-20s %-20s\n", "P", shardCount[0]); + formatter.format("%-20s %-20s\n", "R", shardCount[1]); + return sb.toString(); + } + + private static void reset() { + nodeToShardCountMap.clear(); + nameToNodeId.clear(); + totalShards[0] = totalShards[1] = 0; + unassigned[0] = unassigned[1] = 0; + } + + private static void buildMap(ClusterState inputState) { + reset(); + for (RoutingNode node : inputState.getRoutingNodes()) { + if (node.node().getName() != null && node.node().getName().isEmpty() == false) { + nameToNodeId.putIfAbsent(node.node().getName(), node.nodeId()); + } else { + nameToNodeId.putIfAbsent(node.nodeId(), node.nodeId()); + } + nodeToShardCountMap.putIfAbsent(node.nodeId(), new int[] { 0, 0 }); + } + for (ShardRouting shardRouting : inputState.routingTable().allShards()) { + // Fetch shard to update. Initialize local array + updateMap(nodeToShardCountMap, shardRouting); + } + } + + private static void updateMap(TreeMap mapToUpdate, ShardRouting shardRouting) { + int[] shard; + shard = shardRouting.assignedToNode() ? mapToUpdate.get(shardRouting.currentNodeId()) : unassigned; + // Update shard type count + if (shardRouting.primary()) { + shard[0]++; + totalShards[0]++; + } else { + shard[1]++; + totalShards[1]++; + } + // For assigned shards, put back counter + if (shardRouting.assignedToNode()) mapToUpdate.put(shardRouting.currentNodeId(), shard); + } + + private static String allocation() { + StringBuffer sb = new StringBuffer(); + sb.append(TWO_LINE_RETURN + separator + ONE_LINE_RETURN); + Formatter formatter = new Formatter(sb, Locale.getDefault()); + for (Map.Entry entry : nameToNodeId.entrySet()) { + String nodeId = nameToNodeId.get(entry.getKey()); + formatter.format("%-20s\n", entry.getKey().toUpperCase(Locale.getDefault())); + sb.append(printShardAllocationWithHeader(nodeToShardCountMap.get(nodeId))); + } + sb.append(ONE_LINE_RETURN); + formatter.format("%-20s (P)%-5s (R)%-5s\n\n", "Unassigned ", unassigned[0], unassigned[1]); + formatter.format("%-20s (P)%-5s (R)%-5s\n\n", "Total Shards", totalShards[0], totalShards[1]); + return sb.toString(); + } + + public static String printShardDistribution(ClusterState state) { + buildMap(state); + return allocation(); + } + } + /** A lock {@link AllocationService} allowing tests to override time */ protected static class MockAllocationService extends AllocationService { diff --git a/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java b/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java index 29207cbc4ada1..76325b6a0035b 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java +++ b/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java @@ -205,6 +205,28 @@ public static ShardRouting newShardRouting( ); } + public static ShardRouting newShardRouting( + ShardId shardId, + String currentNodeId, + String relocatingNodeId, + boolean primary, + ShardRoutingState state, + RecoverySource recoverySource, + UnassignedInfo unassignedInfo + ) { + return new ShardRouting( + shardId, + currentNodeId, + relocatingNodeId, + primary, + state, + recoverySource, + unassignedInfo, + buildAllocationId(state), + -1 + ); + } + public static ShardRouting relocate(ShardRouting shardRouting, String relocatingNodeId, long expectedShardSize) { return shardRouting.relocate(relocatingNodeId, expectedShardSize); }