Skip to content

Commit 3ad33d2

Browse files
[BUG] Fix remote shards balancer when filtering throttled nodes (#11724)
* fix remote shards balancer Signed-off-by: panguixin <panguixin@bytedance.com> * add change log Signed-off-by: panguixin <panguixin@bytedance.com> --------- Signed-off-by: panguixin <panguixin@bytedance.com> Signed-off-by: Andrew Ross <andrross@amazon.com> Co-authored-by: Andrew Ross <andrross@amazon.com> (cherry picked from commit 9f649e0) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 13a1038 commit 3ad33d2

3 files changed

Lines changed: 22 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
151151
- Fix typo in API annotation check message ([11836](https://github.com/opensearch-project/OpenSearch/pull/11836))
152152
- Fix memory leak issue in ReorganizingLongHash ([#11953](https://github.com/opensearch-project/OpenSearch/issues/11953))
153153
- Prevent setting remote_snapshot store type on index creation ([#11867](https://github.com/opensearch-project/OpenSearch/pull/11867))
154+
- [BUG] Fix remote shards balancer when filtering throttled nodes ([#11724](https://github.com/opensearch-project/OpenSearch/pull/11724))
154155

155156
### Security
156157

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public final class RemoteShardsBalancer extends ShardsBalancer {
4343
private final Logger logger;
4444
private final RoutingAllocation allocation;
4545
private final RoutingNodes routingNodes;
46+
// indicates if there are any nodes being throttled for allocating any unassigned shards
47+
private boolean anyNodesThrottled = false;
4648

4749
public RemoteShardsBalancer(Logger logger, RoutingAllocation allocation) {
4850
this.logger = logger;
@@ -358,12 +360,16 @@ private void allocateUnassignedReplicas(Queue<RoutingNode> nodeQueue, Map<String
358360
}
359361

360362
private void ignoreRemainingShards(Map<String, UnassignedIndexShards> unassignedShardMap) {
363+
// If any nodes are throttled during allocation, mark all remaining unassigned shards as THROTTLED
364+
final UnassignedInfo.AllocationStatus status = anyNodesThrottled
365+
? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED
366+
: UnassignedInfo.AllocationStatus.DECIDERS_NO;
361367
for (UnassignedIndexShards indexShards : unassignedShardMap.values()) {
362368
for (ShardRouting shard : indexShards.getPrimaries()) {
363-
routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes());
369+
routingNodes.unassigned().ignoreShard(shard, status, allocation.changes());
364370
}
365371
for (ShardRouting shard : indexShards.getReplicas()) {
366-
routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes());
372+
routingNodes.unassigned().ignoreShard(shard, status, allocation.changes());
367373
}
368374
}
369375
}
@@ -424,11 +430,11 @@ private void allocateUnassignedShards(
424430
private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouting shard) {
425431
boolean allocated = false;
426432
boolean throttled = false;
427-
Set<String> nodesCheckedForShard = new HashSet<>();
433+
int numNodesToCheck = nodeQueue.size();
428434
while (nodeQueue.isEmpty() == false) {
429435
RoutingNode node = nodeQueue.poll();
436+
--numNodesToCheck;
430437
Decision allocateDecision = allocation.deciders().canAllocate(shard, node, allocation);
431-
nodesCheckedForShard.add(node.nodeId());
432438
if (allocateDecision.type() == Decision.Type.YES) {
433439
if (logger.isTraceEnabled()) {
434440
logger.trace("Assigned shard [{}] to [{}]", shardShortSummary(shard), node.nodeId());
@@ -467,6 +473,10 @@ private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouti
467473
}
468474
nodeQueue.offer(node);
469475
} else {
476+
if (nodeLevelDecision.type() == Decision.Type.THROTTLE) {
477+
anyNodesThrottled = true;
478+
}
479+
470480
if (logger.isTraceEnabled()) {
471481
logger.trace(
472482
"Cannot allocate any shard to node: [{}]. Removing from queue. Node level decisions: [{}],[{}]",
@@ -478,14 +488,14 @@ private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouti
478488
}
479489

480490
// Break out if all nodes in the queue have been checked for this shard
481-
if (nodeQueue.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) {
491+
if (numNodesToCheck == 0) {
482492
break;
483493
}
484494
}
485495
}
486496

487497
if (allocated == false) {
488-
UnassignedInfo.AllocationStatus status = throttled
498+
UnassignedInfo.AllocationStatus status = (throttled || anyNodesThrottled)
489499
? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED
490500
: UnassignedInfo.AllocationStatus.DECIDERS_NO;
491501
routingNodes.unassigned().ignoreShard(shard, status, allocation.changes());

server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
229229
return Decision.ALWAYS;
230230
}
231231
}
232+
233+
@Override
234+
public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) {
235+
return throttle ? Decision.THROTTLE : Decision.YES;
236+
}
232237
});
233238
Collections.shuffle(deciders, random());
234239
return new AllocationDeciders(deciders);

0 commit comments

Comments
 (0)