diff --git a/server/src/internalClusterTest/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index 6f8f2eb11a802..4a823ed997ea8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -339,7 +339,7 @@ public void testSendingShardFailure() throws Exception { // fail a random shard ShardRouting failedShard = randomFrom( - clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED) + clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED).toList() ); ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonMasterNode); CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index 399618de90e2a..53671458e0cfb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -20,15 +20,12 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Stream; -import static java.util.stream.Collectors.toCollection; - /** * A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards * that are hosted on that nodes. Each {@link RoutingNode} has a unique node id that can be used to identify the node. @@ -212,6 +209,20 @@ void remove(ShardRouting shard) { assert invariant(); } + private static final ShardRouting[] EMPTY_SHARD_ROUTING_ARRAY = new ShardRouting[0]; + + public ShardRouting[] initializing() { + return initializingShards.toArray(EMPTY_SHARD_ROUTING_ARRAY); + } + + public ShardRouting[] relocating() { + return relocatingShards.toArray(EMPTY_SHARD_ROUTING_ARRAY); + } + + public ShardRouting[] started() { + return startedShards.toArray(EMPTY_SHARD_ROUTING_ARRAY); + } + /** * Determine the number of shards with a specific state * @param state which should be counted @@ -226,22 +237,8 @@ public int numberOfShardsWithState(ShardRoutingState state) { * @param state state which should be listed * @return List of shards */ - public List shardsWithState(ShardRoutingState state) { - return new ArrayList<>(internalGetShardsWithState(state)); - } - - private static final ShardRouting[] EMPTY_SHARD_ROUTING_ARRAY = new ShardRouting[0]; - - public ShardRouting[] initializing() { - return initializingShards.toArray(EMPTY_SHARD_ROUTING_ARRAY); - } - - public ShardRouting[] relocating() { - return relocatingShards.toArray(EMPTY_SHARD_ROUTING_ARRAY); - } - - public ShardRouting[] started() { - return startedShards.toArray(EMPTY_SHARD_ROUTING_ARRAY); + public Stream shardsWithState(ShardRoutingState state) { + return internalGetShardsWithState(state).stream(); } /** @@ -250,18 +247,12 @@ public ShardRouting[] started() { * @param states set of states which should be listed * @return a list of shards */ - public List shardsWithState(String index, ShardRoutingState... states) { - return Stream.of(states).flatMap(state -> shardsWithState(index, state).stream()).collect(toCollection(ArrayList::new)); + public Stream shardsWithState(String index, ShardRoutingState... states) { + return Stream.of(states).flatMap(state -> shardsWithState(index, state)); } - public List shardsWithState(String index, ShardRoutingState state) { - var shards = new ArrayList(); - for (ShardRouting shardEntry : internalGetShardsWithState(state)) { - if (shardEntry.getIndexName().equals(index)) { - shards.add(shardEntry); - } - } - return shards; + public Stream shardsWithState(String index, ShardRoutingState state) { + return shardsWithState(state).filter(shardRouting -> Objects.equals(shardRouting.getIndexName(), index)); } private LinkedHashSet internalGetShardsWithState(ShardRoutingState state) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java index c40f95f384b28..15bd91d9a561e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java @@ -102,16 +102,16 @@ public void testNumberOfShardsWithState() { } public void testShardsWithState() { - assertThat(routingNode.shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1)); - assertThat(routingNode.shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); - assertThat(routingNode.shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(routingNode.shardsWithState(ShardRoutingState.STARTED).count(), equalTo(1L)); + assertThat(routingNode.shardsWithState(ShardRoutingState.RELOCATING).count(), equalTo(1L)); + assertThat(routingNode.shardsWithState(ShardRoutingState.INITIALIZING).count(), equalTo(1L)); } public void testShardsWithStateInIndex() { - assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED).size(), equalTo(2)); - assertThat(routingNode.shardsWithState("test", ShardRoutingState.STARTED).size(), equalTo(1)); - assertThat(routingNode.shardsWithState("test", ShardRoutingState.RELOCATING).size(), equalTo(1)); - assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED).count(), equalTo(2L)); + assertThat(routingNode.shardsWithState("test", ShardRoutingState.STARTED).count(), equalTo(1L)); + assertThat(routingNode.shardsWithState("test", ShardRoutingState.RELOCATING).count(), equalTo(1L)); + assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING).count(), equalTo(1L)); } public void testNumberOfOwningShards() { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AddIncrementallyTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AddIncrementallyTests.java index c78c78e7b5fd5..76f0d141a40a7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AddIncrementallyTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AddIncrementallyTests.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; import org.hamcrest.Matcher; -import org.hamcrest.Matchers; import java.util.ArrayList; import java.util.Collections; @@ -33,6 +32,8 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; public class AddIncrementallyTests extends ESAllocationTestCase { @@ -58,7 +59,7 @@ public void testAddNodesAndIndices() { clusterState = addNodes(clusterState, service, 1, nodeOffset++); assertNumIndexShardsPerNode(clusterState, equalTo(2)); clusterState = addNodes(clusterState, service, 1, nodeOffset++); - assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2)); + assertNumIndexShardsPerNode(clusterState, lessThanOrEqualTo(2)); assertAtLeastOneIndexShardPerNode(clusterState); clusterState = removeNodes(clusterState, service, 1); assertNumIndexShardsPerNode(clusterState, equalTo(2)); @@ -66,20 +67,20 @@ public void testAddNodesAndIndices() { clusterState = addIndex(clusterState, service, 3, 2, 3); assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(2)); assertNumIndexShardsPerNode(clusterState, "test3", equalTo(2)); - assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2)); + assertNumIndexShardsPerNode(clusterState, lessThanOrEqualTo(2)); clusterState = addIndex(clusterState, service, 4, 2, 3); assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(4)); assertNumIndexShardsPerNode(clusterState, "test4", equalTo(2)); - assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2)); + assertNumIndexShardsPerNode(clusterState, lessThanOrEqualTo(2)); clusterState = addNodes(clusterState, service, 1, nodeOffset++); - assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2)); + assertNumIndexShardsPerNode(clusterState, lessThanOrEqualTo(2)); assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(0)); clusterState = removeNodes(clusterState, service, 1); assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(4)); - assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2)); + assertNumIndexShardsPerNode(clusterState, lessThanOrEqualTo(2)); clusterState = addNodes(clusterState, service, 1, nodeOffset++); - assertNumIndexShardsPerNode(clusterState, Matchers.lessThanOrEqualTo(2)); + assertNumIndexShardsPerNode(clusterState, lessThanOrEqualTo(2)); assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(0)); logger.debug("ClusterState: {}", clusterState.getRoutingNodes()); } @@ -220,7 +221,7 @@ private void assertNumIndexShardsPerNode(ClusterState state, Matcher ma private void assertNumIndexShardsPerNode(ClusterState state, String index, Matcher matcher) { for (RoutingNode node : state.getRoutingNodes()) { - assertThat(node.shardsWithState(index, STARTED).size(), matcher); + assertThat(Math.toIntExact(node.shardsWithState(index, STARTED).count()), matcher); } } @@ -228,7 +229,7 @@ private void assertAtLeastOneIndexShardPerNode(ClusterState state) { for (String index : state.routingTable().indicesRouting().keySet()) { for (RoutingNode node : state.getRoutingNodes()) { - assertThat(node.shardsWithState(index, STARTED).size(), Matchers.greaterThanOrEqualTo(1)); + assertThat(node.shardsWithState(index, STARTED).count(), greaterThanOrEqualTo(1L)); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java index ebac38764fc8e..ea720f635872c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java @@ -622,7 +622,7 @@ public void testCancelCommand() { assertThat(newState, not(equalTo(clusterState))); clusterState = newState; assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(0)); - assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState(STARTED).iterator().next().primary(), equalTo(true)); + assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState(STARTED).findFirst().get().primary(), equalTo(true)); assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(0)); } else { logger.info("--> cancel the move of the replica shard"); @@ -669,7 +669,10 @@ public void testCancelCommand() { assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(0)); assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().node("node3").numberOfShardsWithState(INITIALIZING), equalTo(1)); - assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState(INITIALIZING).get(0).relocatingNodeId(), nullValue()); + assertThat( + clusterState.getRoutingNodes().node("node3").shardsWithState(INITIALIZING).findFirst().get().relocatingNodeId(), + nullValue() + ); logger.info("--> start the former target replica shard"); clusterState = startInitializingShardsAndReroute(allocation, clusterState); @@ -689,7 +692,7 @@ public void testCancelCommand() { ).clusterState(); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; - assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState(STARTED).iterator().next().primary(), equalTo(true)); + assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState(STARTED).findFirst().get().primary(), equalTo(true)); assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(0)); assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(0)); } @@ -1058,7 +1061,7 @@ public void testConflictingCommandsInSingleRequest() { clusterState = startInitializingShardsAndReroute(allocation, clusterState); final ClusterState updatedClusterState = clusterState; - assertThat(updatedClusterState.getRoutingNodes().node(node1).shardsWithState(STARTED).size(), equalTo(1)); + assertThat(updatedClusterState.getRoutingNodes().node(node1).numberOfShardsWithState(STARTED), equalTo(1)); logger.info("--> subsequent replica allocation fails as all configured replicas have been allocated"); assertThat(expectThrows(IllegalArgumentException.class, () -> { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java index fb0c2f0b2fd30..d4c0b2694c2a3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -279,8 +279,8 @@ private void assertIndexBalance( for (String index : routingTable.indicesRouting().keySet()) { for (RoutingNode node : nodes) { - assertThat(node.shardsWithState(index, STARTED).size(), greaterThanOrEqualTo(minAvgNumberOfShards)); - assertThat(node.shardsWithState(index, STARTED).size(), lessThanOrEqualTo(maxAvgNumberOfShards)); + assertThat(Math.toIntExact(node.shardsWithState(index, STARTED).count()), greaterThanOrEqualTo(minAvgNumberOfShards)); + assertThat(Math.toIntExact(node.shardsWithState(index, STARTED).count()), lessThanOrEqualTo(maxAvgNumberOfShards)); } } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalancedSingleShardTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalancedSingleShardTests.java index e1531a38efcb0..7efac54bf3b88 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalancedSingleShardTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalancedSingleShardTests.java @@ -221,7 +221,7 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca allocator.allocate(routingAllocation); ShardRouting shardToRebalance = null; for (RoutingNode routingNode : routingAllocation.routingNodes()) { - List relocatingShards = routingNode.shardsWithState(ShardRoutingState.RELOCATING); + List relocatingShards = routingNode.shardsWithState(ShardRoutingState.RELOCATING).toList(); if (relocatingShards.size() > 0) { shardToRebalance = randomFrom(relocatingShards); break; @@ -276,7 +276,7 @@ public void testNodeDecisionsRanking() { if (node.numberOfShardsWithState(ShardRoutingState.STARTED) == 2) { nodesWithTwoShards.add(node.nodeId()); if (shardToRebalance == null) { - shardToRebalance = node.shardsWithState(ShardRoutingState.STARTED).get(0); + shardToRebalance = node.shardsWithState(ShardRoutingState.STARTED).findFirst().get(); } } else { assertEquals(3, node.numberOfShardsWithState(ShardRoutingState.STARTED)); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/IndexBalanceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/IndexBalanceTests.java index b814aba17c69b..efbcd04a53355 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/IndexBalanceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/IndexBalanceTests.java @@ -142,13 +142,13 @@ public void testBalanceAllNodesStarted() { assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(4)); assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(4)); - assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).size(), equalTo(2)); - assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).size(), equalTo(2)); - assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).count(), equalTo(2L)); + assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).count(), equalTo(2L)); + assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).count(), equalTo(2L)); - assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).size(), equalTo(2)); - assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).size(), equalTo(2)); - assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).count(), equalTo(2L)); + assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).count(), equalTo(2L)); + assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).count(), equalTo(2L)); } public void testBalanceIncrementallyStartNodes() { @@ -265,13 +265,13 @@ public void testBalanceIncrementallyStartNodes() { assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(STARTED), equalTo(4)); assertThat(clusterState.getRoutingNodes().node("node3").numberOfShardsWithState(STARTED), equalTo(4)); - assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState("test", STARTED).size(), equalTo(2)); - assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState("test", STARTED).size(), equalTo(2)); - assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState("test", STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState("test", STARTED).count(), equalTo(2L)); + assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState("test", STARTED).count(), equalTo(2L)); + assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState("test", STARTED).count(), equalTo(2L)); - assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState("test1", STARTED).size(), equalTo(2)); - assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState("test1", STARTED).size(), equalTo(2)); - assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState("test1", STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState("test1", STARTED).count(), equalTo(2L)); + assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState("test1", STARTED).count(), equalTo(2L)); + assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState("test1", STARTED).count(), equalTo(2L)); } public void testBalanceAllNodesStartedAddIndex() { @@ -360,9 +360,9 @@ public void testBalanceAllNodesStartedAddIndex() { assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(2)); assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(2)); - assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).size(), equalTo(2)); - assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).size(), equalTo(2)); - assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).count(), equalTo(2L)); + assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).count(), equalTo(2L)); + assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).count(), equalTo(2L)); logger.info("Add new index 3 shards 1 replica"); @@ -432,8 +432,8 @@ public void testBalanceAllNodesStartedAddIndex() { assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(4)); assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(4)); - assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).size(), equalTo(2)); - assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).size(), equalTo(2)); - assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).count(), equalTo(2L)); + assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).count(), equalTo(2L)); + assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).count(), equalTo(2L)); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java index 2dbcf8f62cab5..41f3550da4c50 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java @@ -126,8 +126,7 @@ public void testRemovingInitializingReplicasIfPrimariesFails() { assertThat(shardsWithState(routingNodes, STARTED).size(), equalTo(1)); assertThat(shardsWithState(routingNodes, INITIALIZING).size(), equalTo(0)); assertThat(shardsWithState(routingNodes, UNASSIGNED).size(), equalTo(3)); // 2 replicas and one primary - assertThat(routingNodes.node(nodeIdRemaining).shardsWithState(STARTED).get(0).primary(), equalTo(true)); + assertThat(routingNodes.node(nodeIdRemaining).shardsWithState(STARTED).findFirst().get().primary(), equalTo(true)); assertThat(clusterState.metadata().index("test").primaryTerm(0), equalTo(2L)); - } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesIntegrityTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesIntegrityTests.java index b6100e1844a3f..d94016b3aad7b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesIntegrityTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesIntegrityTests.java @@ -168,13 +168,13 @@ public void testBalanceIncrementallyStartNodes() { assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(STARTED), equalTo(4)); assertThat(clusterState.getRoutingNodes().node("node3").numberOfShardsWithState(STARTED), equalTo(4)); - assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState("test", STARTED).size(), equalTo(2)); - assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState("test", STARTED).size(), equalTo(2)); - assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState("test", STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState("test", STARTED).count(), equalTo(2L)); + assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState("test", STARTED).count(), equalTo(2L)); + assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState("test", STARTED).count(), equalTo(2L)); - assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState("test1", STARTED).size(), equalTo(2)); - assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState("test1", STARTED).size(), equalTo(2)); - assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState("test1", STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState("test1", STARTED).count(), equalTo(2L)); + assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState("test1", STARTED).count(), equalTo(2L)); + assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState("test1", STARTED).count(), equalTo(2L)); } public void testBalanceAllNodesStartedAddIndex() { @@ -257,9 +257,9 @@ public void testBalanceAllNodesStartedAddIndex() { assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(2)); assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(2)); - assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).size(), equalTo(2)); - assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).size(), equalTo(2)); - assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node1").shardsWithState("test", STARTED).count(), equalTo(2L)); + assertThat(routingNodes.node("node2").shardsWithState("test", STARTED).count(), equalTo(2L)); + assertThat(routingNodes.node("node3").shardsWithState("test", STARTED).count(), equalTo(2L)); logger.info("Add new index 3 shards 1 replica"); @@ -318,9 +318,9 @@ public void testBalanceAllNodesStartedAddIndex() { assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(4)); assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(4)); - assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).size(), equalTo(2)); - assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).size(), equalTo(2)); - assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).size(), equalTo(2)); + assertThat(routingNodes.node("node1").shardsWithState("test1", STARTED).count(), equalTo(2L)); + assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).count(), equalTo(2L)); + assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).count(), equalTo(2L)); logger.info("kill one node"); IndexShardRoutingTable indexShardRoutingTable = clusterState.routingTable().index("test").shard(0); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java index d646ef7750c60..cb100b5768e57 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java @@ -182,7 +182,9 @@ public void testSingleIndexShardFailed() { RoutingNodes routingNodes = clusterState.getRoutingNodes(); newState = strategy.applyFailedShards( clusterState, - List.of(new FailedShard(routingNodes.node("node1").shardsWithState(INITIALIZING).get(0), null, null, randomBoolean())), + List.of( + new FailedShard(routingNodes.node("node1").shardsWithState(INITIALIZING).findFirst().get(), null, null, randomBoolean()) + ), List.of() ); assertThat(newState, not(equalTo(clusterState))); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 38ac3dfb64224..deab959aa17a6 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -276,7 +276,7 @@ public static ClusterState startInitializingShardsAndReroute( ClusterState clusterState, RoutingNode routingNode ) { - return startShardsAndReroute(allocationService, clusterState, routingNode.shardsWithState(INITIALIZING)); + return startShardsAndReroute(allocationService, clusterState, routingNode.shardsWithState(INITIALIZING).toList()); } /** diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/routing/RoutingNodesHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/routing/RoutingNodesHelper.java index fc123429cea71..4ed514fa16706 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/routing/RoutingNodesHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/routing/RoutingNodesHelper.java @@ -24,7 +24,7 @@ private RoutingNodesHelper() {} public static List shardsWithState(RoutingNodes routingNodes, ShardRoutingState state) { return state == ShardRoutingState.UNASSIGNED ? iterableAsArrayList(routingNodes.unassigned()) - : routingNodes.stream().flatMap(routingNode -> routingNode.shardsWithState(state).stream()).toList(); + : routingNodes.stream().flatMap(routingNode -> routingNode.shardsWithState(state)).toList(); } public static List shardsWithState(RoutingNodes routingNodes, String index, ShardRoutingState states) { diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java index 806bf22d26884..faae39f94ce64 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java @@ -267,7 +267,6 @@ static ShutdownShardMigrationStatus shardMigrationStatus( Optional> unmovableShard = currentState.getRoutingNodes() .node(nodeId) .shardsWithState(ShardRoutingState.STARTED) - .stream() .map(shardRouting -> new Tuple<>(shardRouting, allocationService.explainShardAllocation(shardRouting, allocation))) // Given that we're checking the status of a node that's shutting down, no shards should be allowed to remain .filter(pair -> { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java index 26ee1a85a271a..ca91d51a2d129 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java @@ -234,7 +234,7 @@ private void checkWatchIndexHasChanged(IndexMetadata metadata, ClusterChangedEve RoutingNode routingNode = state.getRoutingNodes().node(localNodeId); // no local shards, exit early - List localShardRouting = routingNode.shardsWithState(watchIndex, STARTED, RELOCATING); + List localShardRouting = routingNode.shardsWithState(watchIndex, STARTED, RELOCATING).toList(); if (localShardRouting.isEmpty()) { configuration = INACTIVE; } else { @@ -281,7 +281,6 @@ private boolean hasShardAllocationIdChanged(String watchIndex, ClusterState stat Set clusterStateLocalShardIds = state.getRoutingNodes() .node(localNodeId) .shardsWithState(watchIndex, STARTED, RELOCATING) - .stream() .map(ShardRouting::shardId) .collect(Collectors.toSet()); Set configuredLocalShardIds = new HashSet<>(configuration.localShards.keySet()); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index b5f29085fd5f8..73c3a4c093fdf 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -144,7 +144,7 @@ public void clusterChanged(ClusterChangedEvent event) { } String watchIndex = watcherIndexMetadata.getIndex().getName(); - List localShards = routingNode.shardsWithState(watchIndex, RELOCATING, STARTED); + List localShards = routingNode.shardsWithState(watchIndex, RELOCATING, STARTED).toList(); // no local shards, empty out watcher and dont waste resources! if (localShards.isEmpty()) { pauseExecution("no local watcher shards found"); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index 347ca20f7a94e..9a0fa93f0605c 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -323,7 +323,7 @@ private Collection loadWatches(ClusterState clusterState) { if (routingNode == null) { return Collections.emptyList(); } - List localShards = routingNode.shardsWithState(watchIndexName, RELOCATING, STARTED); + List localShards = routingNode.shardsWithState(watchIndexName, RELOCATING, STARTED).toList(); // find out all allocation ids List watchIndexShardRoutings = clusterState.getRoutingTable().allShards(watchIndexName); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java index 985a0948516eb..7e71c9a8f94e6 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java @@ -56,6 +56,7 @@ import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.stream.Stream; import static java.util.Arrays.asList; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -330,14 +331,13 @@ public void testClusterChangedWatchAliasChanged() throws Exception { boolean emptyShards = randomBoolean(); if (emptyShards) { - when(routingNode.shardsWithState(eq(newActiveWatchIndex), any(ShardRoutingState[].class))).thenReturn(Collections.emptyList()); + when(routingNode.shardsWithState(eq(newActiveWatchIndex), any(ShardRoutingState[].class))).thenReturn(Stream.empty()); } else { Index index = new Index(newActiveWatchIndex, "uuid"); ShardId shardId = new ShardId(index, 0); ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, "node_1", true, STARTED); - List routing = Collections.singletonList(shardRouting); - when(routingNode.shardsWithState(eq(newActiveWatchIndex), eq(STARTED), eq(RELOCATING))).thenReturn(routing); - when(routingTable.allShards(eq(newActiveWatchIndex))).thenReturn(routing); + when(routingNode.shardsWithState(eq(newActiveWatchIndex), eq(STARTED), eq(RELOCATING))).thenReturn(Stream.of(shardRouting)); + when(routingTable.allShards(eq(newActiveWatchIndex))).thenReturn(List.of(shardRouting)); IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(index).addShard(shardRouting).build(); when(routingTable.index(newActiveWatchIndex)).thenReturn(indexRoutingTable); }