-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[Segment Replication] Allocation and rebalancing based on average primary shard count per index #6422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
dreamer-89
merged 15 commits into
opensearch-project:main
from
dreamer-89:primary_allocation_with_constraint
Mar 3, 2023
Merged
[Segment Replication] Allocation and rebalancing based on average primary shard count per index #6422
Changes from all commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
3d670eb
[Segment Replication] Move primary shard first during rebalancing
dreamer-89 e23b7c7
Address review comments
dreamer-89 7244985
Add primary weight constraint
dreamer-89 b1d4327
Add average primary shard count constraint for allocation and rebalan…
dreamer-89 d207c62
Spotless fix and javadocs
dreamer-89 2abaf28
Add failing tests
dreamer-89 e5e3fe2
Fix unit tests
dreamer-89 baf225f
Add unit test for nodes breaching multiple constraints
dreamer-89 d253109
Add comments and update unit test
dreamer-89 bf8b4ad
Address review comments
dreamer-89 fab15bf
Remove auto-expand replicas integration test
dreamer-89 57754a5
Address review comments
dreamer-89 b28fe2d
Update comments for tests
dreamer-89 f1ef7fc
Add changelog entry
dreamer-89 12e5021
Remove extra changelog entry
dreamer-89 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
216 changes: 216 additions & 0 deletions
216
...alClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,216 @@ | ||
| /* | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| package org.opensearch.indices.replication; | ||
|
|
||
| import com.carrotsearch.hppc.cursors.ObjectObjectCursor; | ||
| import org.opensearch.cluster.ClusterState; | ||
| import org.opensearch.cluster.metadata.IndexMetadata; | ||
| import org.opensearch.cluster.routing.IndexRoutingTable; | ||
| import org.opensearch.cluster.routing.RoutingNode; | ||
| import org.opensearch.cluster.routing.RoutingNodes; | ||
| import org.opensearch.cluster.routing.ShardRouting; | ||
| import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; | ||
| import org.opensearch.common.settings.Settings; | ||
| import org.opensearch.common.unit.TimeValue; | ||
| import org.opensearch.index.IndexModule; | ||
| import org.opensearch.indices.replication.common.ReplicationType; | ||
| import org.opensearch.test.InternalTestCluster; | ||
| import org.opensearch.test.OpenSearchIntegTestCase; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; | ||
| import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; | ||
|
|
||
| import org.opensearch.cluster.OpenSearchAllocationTestCase.ShardAllocations; | ||
|
|
||
| @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
| public class SegmentReplicationAllocationIT extends SegmentReplicationBaseIT { | ||
|
|
||
| private void createIndex(String idxName, int shardCount, int replicaCount, boolean isSegRep) { | ||
| Settings.Builder builder = Settings.builder() | ||
| .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shardCount) | ||
| .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) | ||
| .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replicaCount); | ||
| if (isSegRep) { | ||
| builder = builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); | ||
| } else { | ||
| builder = builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT); | ||
| } | ||
| prepareCreate(idxName, builder).get(); | ||
| } | ||
|
|
||
| public void enablePreferPrimaryBalance() { | ||
| assertAcked( | ||
| client().admin() | ||
| .cluster() | ||
| .prepareUpdateSettings() | ||
| .setPersistentSettings( | ||
| Settings.builder().put(BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), "true") | ||
| ) | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
| * This test verifies the happy path where primary shard allocation is balanced when multiple indices are created. | ||
| * | ||
| * This test in general passes without primary shard balance as well due to nature of allocation algorithm which | ||
| * assigns all primary shards first followed by replica copies. | ||
| */ | ||
| public void testBalancedPrimaryAllocation() throws Exception { | ||
| internalCluster().startClusterManagerOnlyNode(); | ||
| final int maxReplicaCount = 2; | ||
| final int maxShardCount = 5; | ||
| final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10); | ||
| final int numberOfIndices = randomIntBetween(5, 10); | ||
|
|
||
| final List<String> nodeNames = new ArrayList<>(); | ||
| logger.info("--> Creating {} nodes", nodeCount); | ||
| for (int i = 0; i < nodeCount; i++) { | ||
| nodeNames.add(internalCluster().startNode()); | ||
| } | ||
| enablePreferPrimaryBalance(); | ||
| int shardCount, replicaCount; | ||
| ClusterState state; | ||
| for (int i = 0; i < numberOfIndices; i++) { | ||
| shardCount = randomIntBetween(1, maxShardCount); | ||
| replicaCount = randomIntBetween(0, maxReplicaCount); | ||
| createIndex("test" + i, shardCount, replicaCount, i % 2 == 0); | ||
| logger.info("--> Creating index {} with shard count {} and replica count {}", "test" + i, shardCount, replicaCount); | ||
| ensureGreen(TimeValue.timeValueSeconds(60)); | ||
| state = client().admin().cluster().prepareState().execute().actionGet().getState(); | ||
| logger.info(ShardAllocations.printShardDistribution(state)); | ||
| } | ||
| verifyPerIndexPrimaryBalance(); | ||
| } | ||
|
|
||
| /** | ||
| * This test verifies balanced primary shard allocation for a single index with large shard count in event of node | ||
| * going down and a new node joining the cluster. The results in shard distribution skewness and re-balancing logic | ||
| * ensures the primary shard distribution is balanced. | ||
| * | ||
| */ | ||
| public void testSingleIndexShardAllocation() throws Exception { | ||
| internalCluster().startClusterManagerOnlyNode(); | ||
| final int maxReplicaCount = 1; | ||
| final int maxShardCount = 50; | ||
| final int nodeCount = 5; | ||
|
|
||
| final List<String> nodeNames = new ArrayList<>(); | ||
| logger.info("--> Creating {} nodes", nodeCount); | ||
| for (int i = 0; i < nodeCount; i++) { | ||
| nodeNames.add(internalCluster().startNode()); | ||
| } | ||
| enablePreferPrimaryBalance(); | ||
|
|
||
| ClusterState state; | ||
| createIndex("test", maxShardCount, maxReplicaCount, true); | ||
| ensureGreen(TimeValue.timeValueSeconds(60)); | ||
| state = client().admin().cluster().prepareState().execute().actionGet().getState(); | ||
| logger.info(ShardAllocations.printShardDistribution(state)); | ||
| verifyPerIndexPrimaryBalance(); | ||
|
|
||
| // Remove a node | ||
| internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames.get(0))); | ||
| ensureGreen(TimeValue.timeValueSeconds(60)); | ||
| state = client().admin().cluster().prepareState().execute().actionGet().getState(); | ||
| logger.info(ShardAllocations.printShardDistribution(state)); | ||
| verifyPerIndexPrimaryBalance(); | ||
|
|
||
| // Add a new node | ||
| internalCluster().startDataOnlyNode(); | ||
| ensureGreen(TimeValue.timeValueSeconds(60)); | ||
| state = client().admin().cluster().prepareState().execute().actionGet().getState(); | ||
| logger.info(ShardAllocations.printShardDistribution(state)); | ||
| verifyPerIndexPrimaryBalance(); | ||
| } | ||
|
|
||
| /** | ||
| * Similar to testSingleIndexShardAllocation test but creates multiple indices, multiple node adding in and getting | ||
| * removed. The test asserts post each such event that primary shard distribution is balanced across single index. | ||
| */ | ||
| public void testAllocationWithDisruption() throws Exception { | ||
| internalCluster().startClusterManagerOnlyNode(); | ||
| final int maxReplicaCount = 2; | ||
| final int maxShardCount = 5; | ||
| final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10); | ||
| final int numberOfIndices = randomIntBetween(1, 10); | ||
|
|
||
| logger.info("--> Creating {} nodes", nodeCount); | ||
| final List<String> nodeNames = new ArrayList<>(); | ||
| for (int i = 0; i < nodeCount; i++) { | ||
| nodeNames.add(internalCluster().startNode()); | ||
| } | ||
| enablePreferPrimaryBalance(); | ||
|
|
||
| int shardCount, replicaCount, totalShardCount = 0, totalReplicaCount = 0; | ||
| ClusterState state; | ||
| for (int i = 0; i < numberOfIndices; i++) { | ||
| shardCount = randomIntBetween(1, maxShardCount); | ||
| totalShardCount += shardCount; | ||
| replicaCount = randomIntBetween(1, maxReplicaCount); | ||
| totalReplicaCount += replicaCount; | ||
| logger.info("--> Creating index test{} with primary {} and replica {}", i, shardCount, replicaCount); | ||
| createIndex("test" + i, shardCount, replicaCount, i % 2 == 0); | ||
| ensureGreen(TimeValue.timeValueSeconds(60)); | ||
| if (logger.isTraceEnabled()) { | ||
| state = client().admin().cluster().prepareState().execute().actionGet().getState(); | ||
| logger.info(ShardAllocations.printShardDistribution(state)); | ||
| } | ||
| } | ||
| state = client().admin().cluster().prepareState().execute().actionGet().getState(); | ||
| logger.info(ShardAllocations.printShardDistribution(state)); | ||
| verifyPerIndexPrimaryBalance(); | ||
|
|
||
| final int additionalNodeCount = randomIntBetween(1, 5); | ||
| logger.info("--> Adding {} nodes", additionalNodeCount); | ||
|
|
||
| internalCluster().startNodes(additionalNodeCount); | ||
| ensureGreen(TimeValue.timeValueSeconds(60)); | ||
| state = client().admin().cluster().prepareState().execute().actionGet().getState(); | ||
| logger.info(ShardAllocations.printShardDistribution(state)); | ||
| verifyPerIndexPrimaryBalance(); | ||
|
|
||
| logger.info("--> Stop one third nodes"); | ||
| for (int i = 0; i < nodeCount; i += 3) { | ||
| internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames.get(i))); | ||
| // give replica a chance to promote as primary before terminating node containing the replica | ||
| ensureGreen(TimeValue.timeValueSeconds(60)); | ||
| } | ||
| state = client().admin().cluster().prepareState().execute().actionGet().getState(); | ||
| logger.info(ShardAllocations.printShardDistribution(state)); | ||
| verifyPerIndexPrimaryBalance(); | ||
| } | ||
|
|
||
| /** | ||
| * Utility method which ensures cluster has balanced primary shard distribution across a single index. | ||
| * @throws Exception | ||
| */ | ||
| private void verifyPerIndexPrimaryBalance() throws Exception { | ||
| assertBusy(() -> { | ||
| final ClusterState currentState = client().admin().cluster().prepareState().execute().actionGet().getState(); | ||
| RoutingNodes nodes = currentState.getRoutingNodes(); | ||
| for (ObjectObjectCursor<String, IndexRoutingTable> index : currentState.getRoutingTable().indicesRouting()) { | ||
| final int totalPrimaryShards = index.value.primaryShardsActive(); | ||
| final int avgPrimaryShardsPerNode = (int) Math.ceil(totalPrimaryShards * 1f / currentState.getRoutingNodes().size()); | ||
| for (RoutingNode node : nodes) { | ||
| final int primaryCount = node.shardsWithState(index.key, STARTED) | ||
| .stream() | ||
| .filter(ShardRouting::primary) | ||
| .collect(Collectors.toList()) | ||
| .size(); | ||
| assertTrue(primaryCount <= avgPrimaryShardsPerNode); | ||
| } | ||
| } | ||
| }, 60, TimeUnit.SECONDS); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.