Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public void testSendingShardFailure() throws Exception {

// fail a random shard
ShardRouting failedShard = randomFrom(
clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED)
clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED).toList()
);
ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonMasterNode);
CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toCollection;

/**
* A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards
* that are hosted on that nodes. Each {@link RoutingNode} has a unique node id that can be used to identify the node.
Expand Down Expand Up @@ -212,6 +209,20 @@ void remove(ShardRouting shard) {
assert invariant();
}

private static final ShardRouting[] EMPTY_SHARD_ROUTING_ARRAY = new ShardRouting[0];

public ShardRouting[] initializing() {
return initializingShards.toArray(EMPTY_SHARD_ROUTING_ARRAY);
}

public ShardRouting[] relocating() {
return relocatingShards.toArray(EMPTY_SHARD_ROUTING_ARRAY);
}

public ShardRouting[] started() {
return startedShards.toArray(EMPTY_SHARD_ROUTING_ARRAY);
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are not changed, only reordered methods


/**
* Determine the number of shards with a specific state
* @param state which should be counted
Expand All @@ -226,22 +237,8 @@ public int numberOfShardsWithState(ShardRoutingState state) {
* @param state state which should be listed
* @return List of shards
*/
public List<ShardRouting> shardsWithState(ShardRoutingState state) {
return new ArrayList<>(internalGetShardsWithState(state));
}

private static final ShardRouting[] EMPTY_SHARD_ROUTING_ARRAY = new ShardRouting[0];

public ShardRouting[] initializing() {
return initializingShards.toArray(EMPTY_SHARD_ROUTING_ARRAY);
}

public ShardRouting[] relocating() {
return relocatingShards.toArray(EMPTY_SHARD_ROUTING_ARRAY);
}

public ShardRouting[] started() {
return startedShards.toArray(EMPTY_SHARD_ROUTING_ARRAY);
public Stream<ShardRouting> shardsWithState(ShardRoutingState state) {
return internalGetShardsWithState(state).stream();
}

/**
Expand All @@ -250,18 +247,12 @@ public ShardRouting[] started() {
* @param states set of states which should be listed
* @return a list of shards
*/
public List<ShardRouting> shardsWithState(String index, ShardRoutingState... states) {
return Stream.of(states).flatMap(state -> shardsWithState(index, state).stream()).collect(toCollection(ArrayList::new));
public Stream<ShardRouting> shardsWithState(String index, ShardRoutingState... states) {
return Stream.of(states).flatMap(state -> shardsWithState(index, state));
}

public List<ShardRouting> shardsWithState(String index, ShardRoutingState state) {
var shards = new ArrayList<ShardRouting>();
for (ShardRouting shardEntry : internalGetShardsWithState(state)) {
if (shardEntry.getIndexName().equals(index)) {
shards.add(shardEntry);
}
}
return shards;
public Stream<ShardRouting> shardsWithState(String index, ShardRoutingState state) {
return shardsWithState(state).filter(shardRouting -> Objects.equals(shardRouting.getIndexName(), index));
}

private LinkedHashSet<ShardRouting> internalGetShardsWithState(ShardRoutingState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,16 @@ public void testNumberOfShardsWithState() {
}

public void testShardsWithState() {
assertThat(routingNode.shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1));
assertThat(routingNode.shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1));
assertThat(routingNode.shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(routingNode.shardsWithState(ShardRoutingState.STARTED).count(), equalTo(1L));
assertThat(routingNode.shardsWithState(ShardRoutingState.RELOCATING).count(), equalTo(1L));
assertThat(routingNode.shardsWithState(ShardRoutingState.INITIALIZING).count(), equalTo(1L));
}

public void testShardsWithStateInIndex() {
assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED).size(), equalTo(2));
assertThat(routingNode.shardsWithState("test", ShardRoutingState.STARTED).size(), equalTo(1));
assertThat(routingNode.shardsWithState("test", ShardRoutingState.RELOCATING).size(), equalTo(1));
assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED).count(), equalTo(2L));
assertThat(routingNode.shardsWithState("test", ShardRoutingState.STARTED).count(), equalTo(1L));
assertThat(routingNode.shardsWithState("test", ShardRoutingState.RELOCATING).count(), equalTo(1L));
assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING).count(), equalTo(1L));
}

public void testNumberOfOwningShards() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;

import java.util.ArrayList;
import java.util.Collections;

import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;

public class AddIncrementallyTests extends ESAllocationTestCase {
Expand All @@ -58,28 +59,28 @@ public void testAddNodesAndIndices() {
clusterState = addNodes(clusterState, service, 1, nodeOffset++);
assertNumIndexShardsPerNode(clusterState, equalTo(2));
clusterState = addNodes(clusterState, service, 1, nodeOffset++);
assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2));
assertNumIndexShardsPerNode(clusterState, lessThanOrEqualTo(2));
assertAtLeastOneIndexShardPerNode(clusterState);
clusterState = removeNodes(clusterState, service, 1);
assertNumIndexShardsPerNode(clusterState, equalTo(2));

clusterState = addIndex(clusterState, service, 3, 2, 3);
assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(2));
assertNumIndexShardsPerNode(clusterState, "test3", equalTo(2));
assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2));
assertNumIndexShardsPerNode(clusterState, lessThanOrEqualTo(2));

clusterState = addIndex(clusterState, service, 4, 2, 3);
assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(4));
assertNumIndexShardsPerNode(clusterState, "test4", equalTo(2));
assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2));
assertNumIndexShardsPerNode(clusterState, lessThanOrEqualTo(2));
clusterState = addNodes(clusterState, service, 1, nodeOffset++);
assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2));
assertNumIndexShardsPerNode(clusterState, lessThanOrEqualTo(2));
assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(0));
clusterState = removeNodes(clusterState, service, 1);
assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(4));
assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2));
assertNumIndexShardsPerNode(clusterState, lessThanOrEqualTo(2));
clusterState = addNodes(clusterState, service, 1, nodeOffset++);
assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2));
assertNumIndexShardsPerNode(clusterState, lessThanOrEqualTo(2));
assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(0));
logger.debug("ClusterState: {}", clusterState.getRoutingNodes());
}
Expand Down Expand Up @@ -220,15 +221,15 @@ private void assertNumIndexShardsPerNode(ClusterState state, Matcher<Integer> ma

private void assertNumIndexShardsPerNode(ClusterState state, String index, Matcher<Integer> matcher) {
for (RoutingNode node : state.getRoutingNodes()) {
assertThat(node.shardsWithState(index, STARTED).size(), matcher);
assertThat(Math.toIntExact(node.shardsWithState(index, STARTED).count()), matcher);
}
}

private void assertAtLeastOneIndexShardPerNode(ClusterState state) {
for (String index : state.routingTable().indicesRouting().keySet()) {

for (RoutingNode node : state.getRoutingNodes()) {
assertThat(node.shardsWithState(index, STARTED).size(), Matchers.greaterThanOrEqualTo(1));
assertThat(node.shardsWithState(index, STARTED).count(), greaterThanOrEqualTo(1L));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ public void testCancelCommand() {
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState(STARTED).iterator().next().primary(), equalTo(true));
assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState(STARTED).findFirst().get().primary(), equalTo(true));
assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(0));
} else {
logger.info("--> cancel the move of the replica shard");
Expand Down Expand Up @@ -669,7 +669,10 @@ public void testCancelCommand() {
assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").numberOfShardsWithState(INITIALIZING), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState(INITIALIZING).get(0).relocatingNodeId(), nullValue());
assertThat(
clusterState.getRoutingNodes().node("node3").shardsWithState(INITIALIZING).findFirst().get().relocatingNodeId(),
nullValue()
);

logger.info("--> start the former target replica shard");
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
Expand All @@ -689,7 +692,7 @@ public void testCancelCommand() {
).clusterState();
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState(STARTED).iterator().next().primary(), equalTo(true));
assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState(STARTED).findFirst().get().primary(), equalTo(true));
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(0));
}
Expand Down Expand Up @@ -1058,7 +1061,7 @@ public void testConflictingCommandsInSingleRequest() {
clusterState = startInitializingShardsAndReroute(allocation, clusterState);

final ClusterState updatedClusterState = clusterState;
assertThat(updatedClusterState.getRoutingNodes().node(node1).shardsWithState(STARTED).size(), equalTo(1));
assertThat(updatedClusterState.getRoutingNodes().node(node1).numberOfShardsWithState(STARTED), equalTo(1));

logger.info("--> subsequent replica allocation fails as all configured replicas have been allocated");
assertThat(expectThrows(IllegalArgumentException.class, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ private void assertIndexBalance(

for (String index : routingTable.indicesRouting().keySet()) {
for (RoutingNode node : nodes) {
assertThat(node.shardsWithState(index, STARTED).size(), greaterThanOrEqualTo(minAvgNumberOfShards));
assertThat(node.shardsWithState(index, STARTED).size(), lessThanOrEqualTo(maxAvgNumberOfShards));
assertThat(Math.toIntExact(node.shardsWithState(index, STARTED).count()), greaterThanOrEqualTo(minAvgNumberOfShards));
assertThat(Math.toIntExact(node.shardsWithState(index, STARTED).count()), lessThanOrEqualTo(maxAvgNumberOfShards));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
allocator.allocate(routingAllocation);
ShardRouting shardToRebalance = null;
for (RoutingNode routingNode : routingAllocation.routingNodes()) {
List<ShardRouting> relocatingShards = routingNode.shardsWithState(ShardRoutingState.RELOCATING);
List<ShardRouting> relocatingShards = routingNode.shardsWithState(ShardRoutingState.RELOCATING).toList();
if (relocatingShards.size() > 0) {
shardToRebalance = randomFrom(relocatingShards);
break;
Expand Down Expand Up @@ -276,7 +276,7 @@ public void testNodeDecisionsRanking() {
if (node.numberOfShardsWithState(ShardRoutingState.STARTED) == 2) {
nodesWithTwoShards.add(node.nodeId());
if (shardToRebalance == null) {
shardToRebalance = node.shardsWithState(ShardRoutingState.STARTED).get(0);
shardToRebalance = node.shardsWithState(ShardRoutingState.STARTED).findFirst().get();
}
} else {
assertEquals(3, node.numberOfShardsWithState(ShardRoutingState.STARTED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ public void testBalanceAllNodesStarted() {
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(4));

assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).count(), equalTo(2L));
assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).count(), equalTo(2L));
assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).count(), equalTo(2L));

assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).count(), equalTo(2L));
assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).count(), equalTo(2L));
assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).count(), equalTo(2L));
}

public void testBalanceIncrementallyStartNodes() {
Expand Down Expand Up @@ -265,13 +265,13 @@ public void testBalanceIncrementallyStartNodes() {
assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(clusterState.getRoutingNodes().node("node3").numberOfShardsWithState(STARTED), equalTo(4));

assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState("test", STARTED).count(), equalTo(2L));
assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState("test", STARTED).count(), equalTo(2L));
assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState("test", STARTED).count(), equalTo(2L));

assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState("test1", STARTED).count(), equalTo(2L));
assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState("test1", STARTED).count(), equalTo(2L));
assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState("test1", STARTED).count(), equalTo(2L));
}

public void testBalanceAllNodesStartedAddIndex() {
Expand Down Expand Up @@ -360,9 +360,9 @@ public void testBalanceAllNodesStartedAddIndex() {
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(2));
assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(2));

assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).count(), equalTo(2L));
assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).count(), equalTo(2L));
assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).count(), equalTo(2L));

logger.info("Add new index 3 shards 1 replica");

Expand Down Expand Up @@ -432,8 +432,8 @@ public void testBalanceAllNodesStartedAddIndex() {
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(4));
assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(4));

assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).size(), equalTo(2));
assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).count(), equalTo(2L));
assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).count(), equalTo(2L));
assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).count(), equalTo(2L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ public void testRemovingInitializingReplicasIfPrimariesFails() {
assertThat(shardsWithState(routingNodes, STARTED).size(), equalTo(1));
assertThat(shardsWithState(routingNodes, INITIALIZING).size(), equalTo(0));
assertThat(shardsWithState(routingNodes, UNASSIGNED).size(), equalTo(3)); // 2 replicas and one primary
assertThat(routingNodes.node(nodeIdRemaining).shardsWithState(STARTED).get(0).primary(), equalTo(true));
assertThat(routingNodes.node(nodeIdRemaining).shardsWithState(STARTED).findFirst().get().primary(), equalTo(true));
assertThat(clusterState.metadata().index("test").primaryTerm(0), equalTo(2L));

}
}
Loading