Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix AutoDateHistogramAggregator rounding assertion failure ([#17023](https://github.com/opensearch-project/OpenSearch/pull/17023))
- Add highlighting for wildcard search on `match_only_text` field ([#17101](https://github.com/opensearch-project/OpenSearch/pull/17101))
- Fix the failing CI's with `Failed to load eclipse jdt formatter` error ([#17172](https://github.com/opensearch-project/OpenSearch/pull/17172))

- Fix getting replication type from node settings in NodeVersionAllocationDecider ([#12811](https://github.com/opensearch-project/OpenSearch/pull/12811))
### Security

[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.18...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRecoveriesAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeVersionAllocationDecider(settings));
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.replication.common.ReplicationType;

import java.util.List;
Expand All @@ -58,15 +57,13 @@ public class NodeVersionAllocationDecider extends AllocationDecider {

public static final String NAME = "node_version";

private final ReplicationType replicationType;

public NodeVersionAllocationDecider(Settings settings) {
replicationType = IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(settings);
}
public NodeVersionAllocationDecider() {}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (shardRouting.primary()) {
IndexMetadata indexMd = allocation.metadata().getIndexSafe(shardRouting.index());
final ReplicationType replicationType = IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(indexMd.getSettings());
if (replicationType == ReplicationType.SEGMENT) {
Comment thread
KunjueYu marked this conversation as resolved.
List<ShardRouting> replicas = allocation.routingNodes()
.assignedShards(shardRouting.shardId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.cluster.routing.TestShardRouting;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.Decision;
Expand Down Expand Up @@ -440,9 +441,7 @@ public void testRebalanceDoesNotAllocatePrimaryAndReplicasOnDifferentVersionNode
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(newNode).add(oldNode1).add(oldNode2))
.build();
AllocationDeciders allocationDeciders = new AllocationDeciders(
Collections.singleton(new NodeVersionAllocationDecider(Settings.EMPTY))
);
AllocationDeciders allocationDeciders = new AllocationDeciders(Collections.singleton(new NodeVersionAllocationDecider()));
AllocationService strategy = new MockAllocationService(
allocationDeciders,
new TestGatewayAllocator(),
Expand Down Expand Up @@ -512,7 +511,7 @@ public void testRestoreDoesNotAllocateSnapshotOnOlderNodes() {
.nodes(DiscoveryNodes.builder().add(newNode).add(oldNode1).add(oldNode2))
.build();
AllocationDeciders allocationDeciders = new AllocationDeciders(
Arrays.asList(new ReplicaAfterPrimaryActiveAllocationDecider(), new NodeVersionAllocationDecider(Settings.EMPTY))
Arrays.asList(new ReplicaAfterPrimaryActiveAllocationDecider(), new NodeVersionAllocationDecider())
);
AllocationService strategy = new MockAllocationService(
allocationDeciders,
Expand Down Expand Up @@ -647,9 +646,7 @@ public void testRebalanceDoesNotAllocatePrimaryOnHigherVersionNodesSegrepEnabled
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(newNode1).add(newNode2).add(oldNode1).add(oldNode2))
.build();
AllocationDeciders allocationDeciders = new AllocationDeciders(
Collections.singleton(new NodeVersionAllocationDecider(segmentReplicationSettings))
);
AllocationDeciders allocationDeciders = new AllocationDeciders(Collections.singleton(new NodeVersionAllocationDecider()));
AllocationService strategy = new MockAllocationService(
allocationDeciders,
new TestGatewayAllocator(),
Expand All @@ -671,6 +668,98 @@ public void testRebalanceDoesNotAllocatePrimaryOnHigherVersionNodesSegrepEnabled
);
}

public void testCanAllocatePrimaryOnHigherVersionNodesDocRepEnabled() {
Comment thread
KunjueYu marked this conversation as resolved.
ShardId shard1 = new ShardId("test1", "_na_", 0);
final DiscoveryNode newNode1 = new DiscoveryNode(
"newNode1",
buildNewFakeTransportAddress(),
emptyMap(),
CLUSTER_MANAGER_DATA_ROLES,
Version.CURRENT
);
final DiscoveryNode oldNode1 = new DiscoveryNode(
"oldNode1",
buildNewFakeTransportAddress(),
emptyMap(),
CLUSTER_MANAGER_DATA_ROLES,
VersionUtils.getPreviousVersion()
);
final DiscoveryNode oldNode2 = new DiscoveryNode(
"oldNode2",
buildNewFakeTransportAddress(),
emptyMap(),
CLUSTER_MANAGER_DATA_ROLES,
VersionUtils.getPreviousVersion()
);
AllocationId allocationId1P = AllocationId.newInitializing();
AllocationId allocationId1R = AllocationId.newInitializing();

Settings documentReplicationSettings = Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.build();
Metadata metadata = Metadata.builder()
.put(
IndexMetadata.builder(shard1.getIndexName())
.settings(settings(Version.CURRENT).put(documentReplicationSettings))
.numberOfShards(1)
.numberOfReplicas(1)
.putInSyncAllocationIds(0, Sets.newHashSet(allocationId1P.getId(), allocationId1R.getId()))
)
.build();
RoutingTable routingTable = RoutingTable.builder()
.add(
IndexRoutingTable.builder(shard1.getIndex())
.addIndexShard(
new IndexShardRoutingTable.Builder(shard1).addShard(
TestShardRouting.newShardRouting(
shard1.getIndexName(),
shard1.getId(),
oldNode1.getId(),
null,
true,
ShardRoutingState.STARTED,
allocationId1P
)
)
.addShard(
TestShardRouting.newShardRouting(
shard1.getIndexName(),
shard1.getId(),
oldNode2.getId(),
null,
false,
ShardRoutingState.STARTED,
allocationId1R
)
)
.build()
)
)
.build();
ClusterState state = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(newNode1).add(oldNode1).add(oldNode2))
.build();
AllocationDeciders allocationDeciders = new AllocationDeciders(Collections.singleton(new NodeVersionAllocationDecider()));
AllocationService strategy = new MockAllocationService(
allocationDeciders,
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE
);
// move the primary shard to the node with higher version
AllocationCommands allocationCommands = new AllocationCommands(new MoveAllocationCommand("test1", 0, "oldNode1", "newNode1"));
state = strategy.reroute(state, allocationCommands, true, false).getClusterState();
// the primary shard will be moved successfully
assertThat(state.routingTable().index(shard1.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1));
assertThat(
state.routingTable().index(shard1.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).get(0).primary(),
equalTo(true)
);
}

private ClusterState stabilize(ClusterState clusterState, AllocationService service) {
logger.trace("RoutingNodes: {}", clusterState.getRoutingNodes());

Expand Down Expand Up @@ -771,7 +860,7 @@ public void testMessages() {
RoutingAllocation routingAllocation = new RoutingAllocation(null, clusterState.getRoutingNodes(), clusterState, null, null, 0);
routingAllocation.debugDecision(true);

final NodeVersionAllocationDecider allocationDecider = new NodeVersionAllocationDecider(Settings.EMPTY);
final NodeVersionAllocationDecider allocationDecider = new NodeVersionAllocationDecider();
Decision decision = allocationDecider.canAllocate(primaryShard, newNode, routingAllocation);
assertThat(decision.type(), is(Decision.Type.YES));
assertThat(decision.getExplanation(), is("the primary shard is new or already existed on the node"));
Expand Down