diff --git a/CHANGELOG.md b/CHANGELOG.md index 720fd7fc9ac1e..c6ebbb68f613a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -129,7 +129,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix AutoDateHistogramAggregator rounding assertion failure ([#17023](https://github.com/opensearch-project/OpenSearch/pull/17023)) - Add highlighting for wildcard search on `match_only_text` field ([#17101](https://github.com/opensearch-project/OpenSearch/pull/17101)) - Fix the failing CI's with `Failed to load eclipse jdt formatter` error ([#17172](https://github.com/opensearch-project/OpenSearch/pull/17172)) - +- Fix getting replication type from node settings in NodeVersionAllocationDecider ([#12811](https://github.com/opensearch-project/OpenSearch/pull/12811)) ### Security [Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.18...2.x diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 9081432093106..6279338489540 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -387,7 +387,7 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new ConcurrentRecoveriesAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings)); - addAllocationDecider(deciders, new NodeVersionAllocationDecider(settings)); + addAllocationDecider(deciders, new NodeVersionAllocationDecider()); addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider()); addAllocationDecider(deciders, new RestoreInProgressAllocationDecider()); addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings)); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java index 9344b4c87830d..b4df80ac7b3f4 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java @@ -39,7 +39,6 @@ import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.RoutingAllocation; -import org.opensearch.common.settings.Settings; import org.opensearch.indices.replication.common.ReplicationType; import java.util.List; @@ -58,15 +57,13 @@ public class NodeVersionAllocationDecider extends AllocationDecider { public static final String NAME = "node_version"; - private final ReplicationType replicationType; - - public NodeVersionAllocationDecider(Settings settings) { - replicationType = IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(settings); - } + public NodeVersionAllocationDecider() {} @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { if (shardRouting.primary()) { + IndexMetadata indexMd = allocation.metadata().getIndexSafe(shardRouting.index()); + final ReplicationType replicationType = IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(indexMd.getSettings()); if (replicationType == ReplicationType.SEGMENT) { List replicas = allocation.routingNodes() .assignedShards(shardRouting.shardId()) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java index ac16c5db05a99..f6648eccb4d02 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java @@ -56,6 +56,7 @@ import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.opensearch.cluster.routing.allocation.command.AllocationCommands; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.Decision; @@ -440,9 +441,7 @@ public void testRebalanceDoesNotAllocatePrimaryAndReplicasOnDifferentVersionNode .routingTable(routingTable) .nodes(DiscoveryNodes.builder().add(newNode).add(oldNode1).add(oldNode2)) .build(); - AllocationDeciders allocationDeciders = new AllocationDeciders( - Collections.singleton(new NodeVersionAllocationDecider(Settings.EMPTY)) - ); + AllocationDeciders allocationDeciders = new AllocationDeciders(Collections.singleton(new NodeVersionAllocationDecider())); AllocationService strategy = new MockAllocationService( allocationDeciders, new TestGatewayAllocator(), @@ -512,7 +511,7 @@ public void testRestoreDoesNotAllocateSnapshotOnOlderNodes() { .nodes(DiscoveryNodes.builder().add(newNode).add(oldNode1).add(oldNode2)) .build(); AllocationDeciders allocationDeciders = new AllocationDeciders( - Arrays.asList(new ReplicaAfterPrimaryActiveAllocationDecider(), new NodeVersionAllocationDecider(Settings.EMPTY)) + Arrays.asList(new ReplicaAfterPrimaryActiveAllocationDecider(), new NodeVersionAllocationDecider()) ); AllocationService strategy = new MockAllocationService( allocationDeciders, @@ -647,9 +646,7 @@ public void testRebalanceDoesNotAllocatePrimaryOnHigherVersionNodesSegrepEnabled .routingTable(routingTable) .nodes(DiscoveryNodes.builder().add(newNode1).add(newNode2).add(oldNode1).add(oldNode2)) .build(); - AllocationDeciders allocationDeciders = new AllocationDeciders( - Collections.singleton(new NodeVersionAllocationDecider(segmentReplicationSettings)) - ); + AllocationDeciders allocationDeciders = new AllocationDeciders(Collections.singleton(new NodeVersionAllocationDecider())); AllocationService strategy = new MockAllocationService( allocationDeciders, new TestGatewayAllocator(), @@ -671,6 +668,98 @@ public void testRebalanceDoesNotAllocatePrimaryOnHigherVersionNodesSegrepEnabled ); } + public void testCanAllocatePrimaryOnHigherVersionNodesDocRepEnabled() { + ShardId shard1 = new ShardId("test1", "_na_", 0); + final DiscoveryNode newNode1 = new DiscoveryNode( + "newNode1", + buildNewFakeTransportAddress(), + emptyMap(), + CLUSTER_MANAGER_DATA_ROLES, + Version.CURRENT + ); + final DiscoveryNode oldNode1 = new DiscoveryNode( + "oldNode1", + buildNewFakeTransportAddress(), + emptyMap(), + CLUSTER_MANAGER_DATA_ROLES, + VersionUtils.getPreviousVersion() + ); + final DiscoveryNode oldNode2 = new DiscoveryNode( + "oldNode2", + buildNewFakeTransportAddress(), + emptyMap(), + CLUSTER_MANAGER_DATA_ROLES, + VersionUtils.getPreviousVersion() + ); + AllocationId allocationId1P = AllocationId.newInitializing(); + AllocationId allocationId1R = AllocationId.newInitializing(); + + Settings documentReplicationSettings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) + .build(); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(shard1.getIndexName()) + .settings(settings(Version.CURRENT).put(documentReplicationSettings)) + .numberOfShards(1) + .numberOfReplicas(1) + .putInSyncAllocationIds(0, Sets.newHashSet(allocationId1P.getId(), allocationId1R.getId())) + ) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .add( + IndexRoutingTable.builder(shard1.getIndex()) + .addIndexShard( + new IndexShardRoutingTable.Builder(shard1).addShard( + TestShardRouting.newShardRouting( + shard1.getIndexName(), + shard1.getId(), + oldNode1.getId(), + null, + true, + ShardRoutingState.STARTED, + allocationId1P + ) + ) + .addShard( + TestShardRouting.newShardRouting( + shard1.getIndexName(), + shard1.getId(), + oldNode2.getId(), + null, + false, + ShardRoutingState.STARTED, + allocationId1R + ) + ) + .build() + ) + ) + .build(); + ClusterState state = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(newNode1).add(oldNode1).add(oldNode2)) + .build(); + AllocationDeciders allocationDeciders = new AllocationDeciders(Collections.singleton(new NodeVersionAllocationDecider())); + AllocationService strategy = new MockAllocationService( + allocationDeciders, + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + EmptyClusterInfoService.INSTANCE, + EmptySnapshotsInfoService.INSTANCE + ); + // move the primary shard to the node with higher version + AllocationCommands allocationCommands = new AllocationCommands(new MoveAllocationCommand("test1", 0, "oldNode1", "newNode1")); + state = strategy.reroute(state, allocationCommands, true, false).getClusterState(); + // the primary shard will be moved successfully + assertThat(state.routingTable().index(shard1.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); + assertThat( + state.routingTable().index(shard1.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).get(0).primary(), + equalTo(true) + ); + } + private ClusterState stabilize(ClusterState clusterState, AllocationService service) { logger.trace("RoutingNodes: {}", clusterState.getRoutingNodes()); @@ -771,7 +860,7 @@ public void testMessages() { RoutingAllocation routingAllocation = new RoutingAllocation(null, clusterState.getRoutingNodes(), clusterState, null, null, 0); routingAllocation.debugDecision(true); - final NodeVersionAllocationDecider allocationDecider = new NodeVersionAllocationDecider(Settings.EMPTY); + final NodeVersionAllocationDecider allocationDecider = new NodeVersionAllocationDecider(); Decision decision = allocationDecider.canAllocate(primaryShard, newNode, routingAllocation); assertThat(decision.type(), is(Decision.Type.YES)); assertThat(decision.getExplanation(), is("the primary shard is new or already existed on the node"));