Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add GRPC DocumentService and Bulk endpoint ([#17727](https://github.com/opensearch-project/OpenSearch/pull/17727))
- Added scale to zero (`search_only` mode) support for OpenSearch reader writer separation ([#17299](https://github.com/opensearch-project/OpenSearch/pull/17299)
- [Star Tree] [Search] Resolving numeric range aggregation with metric aggregation using star-tree ([#17273](https://github.com/opensearch-project/OpenSearch/pull/17273))
- Added Search Only strict routing setting ([#17803](https://github.com/opensearch-project/OpenSearch/pull/17803))

### Changed
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.indices.settings;

import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.ClusterState;
Expand All @@ -30,6 +31,8 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.core.IsEqual.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchOnlyReplicaIT extends RemoteStoreBaseIntegTestCase {
Expand Down Expand Up @@ -236,6 +239,80 @@ public void testSearchReplicaRoutingPreference() throws IOException {
assertEquals(nodeId, indexShardRoutingTable.searchOnlyReplicas().get(0).currentNodeId());
}

public void testSearchReplicaRoutingPreferenceWhenSearchReplicaUnassigned() {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
createIndex(TEST_INDEX, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1).build());
ensureYellow(TEST_INDEX);
client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

// By default cluster.routing.search_only.strict is set as true
// When cluster.routing.search_only.strict is set as true, and no assigned search replica is available,
// search request will fail since it will route only to search replica but it's not available
Throwable throwable = assertThrows(
SearchPhaseExecutionException.class,
() -> client().prepareSearch(TEST_INDEX).setPreference(null).setQuery(QueryBuilders.matchAllQuery()).get()
);

assertEquals("all shards failed", throwable.getMessage());

// Set cluster.routing.search_only.strict as false
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.routing.search_only.strict", false))
.get();

// When cluster.routing.search_only.strict is set as false, and no assigned search replica is available;
// search request will fall back to querying writers
SearchResponse response = client().prepareSearch(TEST_INDEX).setPreference(null).setQuery(QueryBuilders.matchAllQuery()).get();

String nodeId = response.getHits().getAt(0).getShard().getNodeId();
IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable();
assertEquals(nodeId, indexShardRoutingTable.primaryShard().currentNodeId());
}

public void testSearchReplicaRoutingPreferenceWhenSearchReplicaAssigned() {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
createIndex(TEST_INDEX, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1).build());
ensureYellow(TEST_INDEX);
client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

internalCluster().startSearchOnlyNode();
ensureGreen(TEST_INDEX);

// By default cluster.routing.search_only.strict is set as true
// When cluster.routing.search_only.strict is set as true, and assigned search replica is available;
// search request will succeed
SearchResponse response = client().prepareSearch(TEST_INDEX).setPreference(null).setQuery(QueryBuilders.matchAllQuery()).get();

String nodeId = response.getHits().getAt(0).getShard().getNodeId();
IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable();
assertEquals(nodeId, indexShardRoutingTable.searchOnlyReplicas().get(0).currentNodeId());

// Set cluster.routing.search_only.strict as false
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.routing.search_only.strict", false))
.get();

// When cluster.routing.search_only.strict is set as false, and assigned search replica is available;
// search request can land on either writer or reader
response = client().prepareSearch(TEST_INDEX).setPreference(null).setQuery(QueryBuilders.matchAllQuery()).get();

nodeId = response.getHits().getAt(0).getShard().getNodeId();
indexShardRoutingTable = getIndexShardRoutingTable();
assertThat(
nodeId,
anyOf(
equalTo(indexShardRoutingTable.primaryShard().currentNodeId()),
equalTo(indexShardRoutingTable.searchOnlyReplicas().get(0).currentNodeId())
)
);
}

public void testUnableToAllocateSearchReplicaWontBlockRegularReplicaAllocation() {
int numSearchReplicas = 1;
int numWriterReplicas = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,21 @@ public class OperationRouting {
Preference.PREFER_NODES
);

public static final Setting<Boolean> STRICT_SEARCH_ONLY_ROUTING_ENABLED = Setting.boolSetting(
"cluster.routing.search_only.strict",
true,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private volatile List<String> awarenessAttributes;
private volatile boolean useAdaptiveReplicaSelection;
private volatile boolean ignoreAwarenessAttr;
private volatile double weightedRoutingDefaultWeight;
private volatile boolean isFailOpenEnabled;
private volatile boolean isStrictWeightedShardRouting;
private volatile boolean ignoreWeightedRouting;
private volatile boolean isStrictSearchOnlyShardRouting;
private final boolean isReaderWriterSplitEnabled;

public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
Expand All @@ -140,12 +148,14 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
this.isFailOpenEnabled = WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(settings);
this.isStrictWeightedShardRouting = STRICT_WEIGHTED_SHARD_ROUTING_ENABLED.get(settings);
this.ignoreWeightedRouting = IGNORE_WEIGHTED_SHARD_ROUTING.get(settings);
this.isStrictSearchOnlyShardRouting = STRICT_SEARCH_ONLY_ROUTING_ENABLED.get(settings);
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes);
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_DEFAULT_WEIGHT, this::setWeightedRoutingDefaultWeight);
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled);
clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting);
clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting);
clusterSettings.addSettingsUpdateConsumer(STRICT_SEARCH_ONLY_ROUTING_ENABLED, this::setStrictSearchOnlyShardRouting);
this.isReaderWriterSplitEnabled = FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings);
}

Expand Down Expand Up @@ -193,6 +203,10 @@ public double getWeightedRoutingDefaultWeight() {
return this.weightedRoutingDefaultWeight;
}

void setStrictSearchOnlyShardRouting(boolean strictSearchOnlyShardRouting) {
this.isStrictSearchOnlyShardRouting = strictSearchOnlyShardRouting;
}

public ShardIterator indexShards(ClusterState clusterState, String index, String id, @Nullable String routing) {
return shards(clusterState, index, id, routing).shardsIt();
}
Expand Down Expand Up @@ -265,7 +279,7 @@ public GroupShardsIterator<ShardIterator> searchShards(

if (isReaderWriterSplitEnabled) {
if (preference == null || preference.isEmpty()) {
if (indexMetadataForShard.getNumberOfSearchOnlyReplicas() > 0) {
if (indexMetadataForShard.getNumberOfSearchOnlyReplicas() > 0 && isStrictSearchOnlyShardRouting) {
preference = Preference.SEARCH_REPLICA.type();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ public void apply(Settings value, Settings current, Settings previous) {
OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED,
OperationRouting.STRICT_WEIGHTED_SHARD_ROUTING_ENABLED,
OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING,
OperationRouting.STRICT_SEARCH_ONLY_ROUTING_ENABLED,
IndexGraveyard.SETTING_MAX_TOMBSTONES,
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,77 @@ public void testSearchReplicaDefaultRouting() throws Exception {
}
}

public void testSearchReplicaRoutingWhenSearchOnlyStrictSettingIsFalse() throws Exception {
final int numShards = 1;
final int numReplicas = 2;
final int numSearchReplicas = 2;
final String indexName = "test";
final String[] indexNames = new String[] { indexName };

ClusterService clusterService = null;
ThreadPool threadPool = null;

try {
OperationRouting opRouting = new OperationRouting(
Settings.builder().put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, "true").build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
opRouting.setStrictSearchOnlyShardRouting(false);

ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(
indexNames,
numShards,
numReplicas,
numSearchReplicas
);
IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index(indexName).getShards().get(0);
ShardId shardId = indexShardRoutingTable.searchOnlyReplicas().get(0).shardId();

threadPool = new TestThreadPool("testSearchReplicaDefaultRouting");
clusterService = ClusterServiceUtils.createClusterService(threadPool);

// add a search replica in initializing state:
DiscoveryNode node = new DiscoveryNode(
"node_initializing",
OpenSearchTestCase.buildNewFakeTransportAddress(),
Collections.emptyMap(),
new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES),
Version.CURRENT
);

IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
.settings(Settings.builder().put(state.metadata().index(indexName).getSettings()).build())
.numberOfSearchReplicas(3)
.numberOfReplicas(2)
.build();
Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(indexMetadata, false).generateClusterUuidIfNeeded();
IndexRoutingTable.Builder indexShardRoutingBuilder = IndexRoutingTable.builder(indexMetadata.getIndex());
indexShardRoutingBuilder.addIndexShard(indexShardRoutingTable);
indexShardRoutingBuilder.addShard(
TestShardRouting.newShardRouting(shardId, node.getId(), null, false, true, ShardRoutingState.INITIALIZING, null)
);
state = ClusterState.builder(state)
.routingTable(RoutingTable.builder().add(indexShardRoutingBuilder).build())
.metadata(metadataBuilder.build())
.build();

GroupShardsIterator<ShardIterator> groupIterator = opRouting.searchShards(state, indexNames, null, null);
assertThat("one group per shard", groupIterator.size(), equalTo(numShards));
for (ShardIterator shardIterator : groupIterator) {
assertEquals("We should have all 6 shards returned", shardIterator.size(), 6);
for (ShardRouting shardRouting : shardIterator) {
assertTrue(
"Any shard can exist with when cluster.routing.search_only.strict is set as false",
shardRouting.isSearchOnly() || shardRouting.primary() || shardRouting.isSearchOnly() == false
);
}
}
} finally {
IOUtils.close(clusterService);
terminate(threadPool);
}
}

private DiscoveryNode[] setupNodes() {
// Sets up two data nodes in zone-a and one data node in zone-b
List<String> zones = Arrays.asList("a", "a", "b");
Expand Down
Loading