Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Ensure consistency of system flag on IndexMetadata after diff is applied ([#16644](https://github.com/opensearch-project/OpenSearch/pull/16644))
- Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763))
- Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606))
- Fix remote shards balance ([#15335](https://github.com/opensearch-project/OpenSearch/pull/15335))
- Always use `constant_score` query for `match_only_text` field ([#16964](https://github.com/opensearch-project/OpenSearch/pull/16964))

### Security
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,17 @@ void balance() {
final Map<String, Integer> nodePrimaryShardCount = calculateNodePrimaryShardCount(remoteRoutingNodes);
int totalPrimaryShardCount = nodePrimaryShardCount.values().stream().reduce(0, Integer::sum);

totalPrimaryShardCount += routingNodes.unassigned().getNumPrimaries();
int avgPrimaryPerNode = (totalPrimaryShardCount + routingNodes.size() - 1) / routingNodes.size();
int unassignedRemotePrimaryShardCount = 0;
for (ShardRouting shard : routingNodes.unassigned()) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) && shard.primary()) {
unassignedRemotePrimaryShardCount++;
}
}
totalPrimaryShardCount += unassignedRemotePrimaryShardCount;
final int avgPrimaryPerNode = (totalPrimaryShardCount + remoteRoutingNodes.size() - 1) / remoteRoutingNodes.size();

ArrayDeque<RoutingNode> sourceNodes = new ArrayDeque<>();
ArrayDeque<RoutingNode> targetNodes = new ArrayDeque<>();
final ArrayDeque<RoutingNode> sourceNodes = new ArrayDeque<>();
final ArrayDeque<RoutingNode> targetNodes = new ArrayDeque<>();
for (RoutingNode node : remoteRoutingNodes) {
if (nodePrimaryShardCount.get(node.nodeId()) > avgPrimaryPerNode) {
sourceNodes.add(node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public AllocationService createRemoteCapableAllocationService() {
}

public AllocationService createRemoteCapableAllocationService(String excludeNodes) {
Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.node_id", excludeNodes).build();
Settings settings = Settings.builder().put("cluster.routing.allocation.exclude._id", excludeNodes).build();
return new MockAllocationService(
randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()),
new TestGatewayAllocator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,51 @@ public class RemoteShardsRebalanceShardsTests extends RemoteShardsBalancerBaseTe
* Post rebalance primaries should be balanced across all the nodes.
*/
public void testShardAllocationAndRebalance() {
int localOnlyNodes = 20;
int remoteCapableNodes = 40;
int localIndices = 40;
int remoteIndices = 80;
final int localOnlyNodes = 20;
final int remoteCapableNodes = 40;
final int halfRemoteCapableNodes = remoteCapableNodes / 2;
final int localIndices = 40;
final int remoteIndices = 80;
ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices);
AllocationService service = this.createRemoteCapableAllocationService();
final StringBuilder excludeNodes = new StringBuilder();
for (int i = 0; i < halfRemoteCapableNodes; i++) {
excludeNodes.append(getNodeId(i, true));
if (i != (remoteCapableNodes / 2 - 1)) {
excludeNodes.append(", ");
}
}
AllocationService service = this.createRemoteCapableAllocationService(excludeNodes.toString());
clusterState = allocateShardsAndBalance(clusterState, service);
RoutingNodes routingNodes = clusterState.getRoutingNodes();
RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes);

final Map<String, Integer> nodePrimariesCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, true);
final Map<String, Integer> nodeReplicaCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, false);
Map<String, Integer> nodePrimariesCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, true);
Map<String, Integer> nodeReplicaCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, false);
int avgPrimariesPerNode = getTotalShardCountAcrossNodes(nodePrimariesCounter) / remoteCapableNodes;

// Primary and replica are balanced post first reroute
// Primary and replica are balanced after first allocating unassigned
for (RoutingNode node : routingNodes) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getNodePool(node))) {
if (Integer.parseInt(node.nodeId().split("-")[4]) < halfRemoteCapableNodes) {
assertEquals(0, (int) nodePrimariesCounter.getOrDefault(node.nodeId(), 0));
} else {
assertEquals(avgPrimariesPerNode * 2, (int) nodePrimariesCounter.get(node.nodeId()));
}
assertTrue(nodeReplicaCounter.getOrDefault(node.nodeId(), 0) >= 0);
}
}

// Remove exclude constraint and rebalance
service = this.createRemoteCapableAllocationService();
clusterState = allocateShardsAndBalance(clusterState, service);
routingNodes = clusterState.getRoutingNodes();
allocation = getRoutingAllocation(clusterState, routingNodes);
nodePrimariesCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, true);
nodeReplicaCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, false);
for (RoutingNode node : routingNodes) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getNodePool(node))) {
assertInRange(nodePrimariesCounter.get(node.nodeId()), avgPrimariesPerNode, remoteCapableNodes - 1);
assertTrue(nodeReplicaCounter.get(node.nodeId()) >= 0);
assertEquals(avgPrimariesPerNode, (int) nodePrimariesCounter.get(node.nodeId()));
assertTrue(nodeReplicaCounter.getOrDefault(node.nodeId(), 0) >= 0);
}
}
}
Expand Down