Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Indexed IP field supports `terms_query` with more than 1025 IP masks [#16391](https://github.com/opensearch-project/OpenSearch/pull/16391)
- Make entries for dependencies from server/build.gradle to gradle version catalog ([#16707](https://github.com/opensearch-project/OpenSearch/pull/16707))
- Sliced search only fans out to shards matched by the selected slice, reducing open search contexts ([#16771](https://github.com/opensearch-project/OpenSearch/pull/16771))
- Allow extended plugins to be optional ([#16909](https://github.com/opensearch-project/OpenSearch/pull/16909))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
"default":"open",
"description":"Whether to expand wildcard expression to concrete indices that are open, closed or both."
}
},
"body":{
"description":"The search source (in order to specify slice parameters)"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
---
"Search shards with slice specified in body":
- skip:
version: " - 2.99.99"
reason: "Added slice body to search_shards in 2.19"
- do:
indices.create:
index: test_index
body:
settings:
index:
number_of_shards: 7
number_of_replicas: 0

- do:
search_shards:
index: test_index
body:
slice:
id: 0
max: 3
- length: { shards: 3 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 0 }
- match: { shards.1.0.shard: 3 }
- match: { shards.2.0.shard: 6 }

- do:
search_shards:
index: test_index
body:
slice:
id: 1
max: 3
- length: { shards: 2 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 1 }
- match: { shards.1.0.shard: 4 }

- do:
search_shards:
index: test_index
body:
slice:
id: 2
max: 3
- length: { shards: 2 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 2 }
- match: { shards.1.0.shard: 5 }


- do:
search_shards:
index: test_index
preference: "_shards:0,2,4,6"
body:
slice:
id: 0
max: 3
- length: { shards: 2 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 0 }
- match: { shards.1.0.shard: 6 }

- do:
search_shards:
index: test_index
preference: "_shards:0,2,4,6"
body:
slice:
id: 1
max: 3
- length: { shards: 1 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 2 }

- do:
search_shards:
index: test_index
preference: "_shards:0,2,4,6"
body:
slice:
id: 2
max: 3
- length: { shards: 1 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 4 }
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.IndicesRequest;
import org.opensearch.action.support.IndicesOptions;
Expand All @@ -41,6 +42,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.search.slice.SliceBuilder;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -61,6 +63,8 @@ public class ClusterSearchShardsRequest extends ClusterManagerNodeReadRequest<Cl
@Nullable
private String preference;
private IndicesOptions indicesOptions = IndicesOptions.lenientExpandOpen();
@Nullable
private SliceBuilder sliceBuilder;

public ClusterSearchShardsRequest() {}

Expand All @@ -76,6 +80,12 @@ public ClusterSearchShardsRequest(StreamInput in) throws IOException {
preference = in.readOptionalString();

indicesOptions = IndicesOptions.readIndicesOptions(in);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
boolean hasSlice = in.readBoolean();
if (hasSlice) {
sliceBuilder = new SliceBuilder(in);
}
}
}

@Override
Expand All @@ -84,8 +94,15 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(indices);
out.writeOptionalString(routing);
out.writeOptionalString(preference);

indicesOptions.writeIndicesOptions(out);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (sliceBuilder != null) {
out.writeBoolean(true);
sliceBuilder.writeTo(out);
} else {
out.writeBoolean(false);
}
}
}

@Override
Expand Down Expand Up @@ -166,4 +183,13 @@ public ClusterSearchShardsRequest preference(String preference) {
public String preference() {
return this.preference;
}

public ClusterSearchShardsRequest slice(SliceBuilder sliceBuilder) {
this.sliceBuilder = sliceBuilder;
return this;
}

public SliceBuilder slice() {
return this.sliceBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected void clusterManagerOperation(

Set<String> nodeIds = new HashSet<>();
GroupShardsIterator<ShardIterator> groupShardsIterator = clusterService.operationRouting()
.searchShards(clusterState, concreteIndices, routingMap, request.preference());
.searchShards(clusterState, concreteIndices, routingMap, request.preference(), null, null, request.slice());
ShardRouting shard;
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];
int currentGroup = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener<
throw blockException;
}

shardsIt = clusterService.operationRouting()
.searchShards(clusterService.state(), new String[] { request.index() }, null, null, null, null);
shardsIt = clusterService.operationRouting().searchShards(clusterService.state(), new String[] { request.index() }, null, null);
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.slice.SliceBuilder;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
Expand Down Expand Up @@ -551,6 +552,7 @@ private ActionListener<SearchSourceBuilder> buildRewriteListener(
);
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
SliceBuilder slice = searchRequest.source() == null ? null : searchRequest.source().slice();
collectSearchShards(
searchRequest.indicesOptions(),
searchRequest.preference(),
Expand All @@ -559,6 +561,7 @@ private ActionListener<SearchSourceBuilder> buildRewriteListener(
remoteClusterIndices,
remoteClusterService,
threadPool,
slice,
ActionListener.wrap(searchShardsResponses -> {
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup(
searchShardsResponses
Expand Down Expand Up @@ -787,6 +790,7 @@ static void collectSearchShards(
Map<String, OriginalIndices> remoteIndicesByCluster,
RemoteClusterService remoteClusterService,
ThreadPool threadPool,
SliceBuilder slice,
ActionListener<Map<String, ClusterSearchShardsResponse>> listener
) {
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
Expand All @@ -800,7 +804,8 @@ static void collectSearchShards(
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices).indicesOptions(indicesOptions)
.local(true)
.preference(preference)
.routing(routing);
.routing(routing)
.slice(slice);
clusterClient.admin()
.cluster()
.searchShards(
Expand Down Expand Up @@ -1042,14 +1047,16 @@ private void executeSearch(
concreteLocalIndices[i] = indices[i].getName();
}
Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
SliceBuilder slice = searchRequest.source() == null ? null : searchRequest.source().slice();
GroupShardsIterator<ShardIterator> localShardRoutings = clusterService.operationRouting()
.searchShards(
clusterState,
concreteLocalIndices,
routingMap,
searchRequest.preference(),
searchService.getResponseCollectorService(),
nodeSearchCounts
nodeSearchCounts,
slice
);
localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)
.map(it -> new SearchShardIterator(searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster.routing;

import org.apache.lucene.util.CollectionUtil;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
Expand All @@ -44,14 +45,17 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.Strings;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.node.ResponseCollectorService;
import org.opensearch.search.slice.SliceBuilder;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -230,7 +234,7 @@ public GroupShardsIterator<ShardIterator> searchShards(
@Nullable Map<String, Set<String>> routing,
@Nullable String preference
) {
return searchShards(clusterState, concreteIndices, routing, preference, null, null);
return searchShards(clusterState, concreteIndices, routing, preference, null, null, null);
}

public GroupShardsIterator<ShardIterator> searchShards(
Expand All @@ -239,11 +243,14 @@ public GroupShardsIterator<ShardIterator> searchShards(
@Nullable Map<String, Set<String>> routing,
@Nullable String preference,
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts
@Nullable Map<String, Long> nodeCounts,
@Nullable SliceBuilder slice
) {
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
final Set<ShardIterator> set = new HashSet<>(shards.size());

Map<Index, List<ShardIterator>> shardIterators = new HashMap<>();
for (IndexShardRoutingTable shard : shards) {

IndexMetadata indexMetadataForShard = indexMetadata(clusterState, shard.shardId.getIndex().getName());
if (indexMetadataForShard.isRemoteSnapshot() && (preference == null || preference.isEmpty())) {
preference = Preference.PRIMARY.type();
Expand Down Expand Up @@ -274,10 +281,31 @@ public GroupShardsIterator<ShardIterator> searchShards(
clusterState.metadata().weightedRoutingMetadata()
);
if (iterator != null) {
set.add(iterator);
shardIterators.computeIfAbsent(iterator.shardId().getIndex(), k -> new ArrayList<>()).add(iterator);
}
}
List<ShardIterator> allShardIterators = new ArrayList<>();
if (slice != null) {
for (List<ShardIterator> indexIterators : shardIterators.values()) {
// Filter the returned shards for the given slice
CollectionUtil.timSort(indexIterators);
// We use the ordinal of the iterator in the group (after sorting) rather than the shard id, because
// computeTargetedShards may return a subset of shards for an index, if a routing parameter was
// specified. In that case, the set of routable shards is considered the full universe of available
// shards for each index, when mapping shards to slices. If no routing parameter was specified,
// then ordinals and shard IDs are the same. This mimics the logic in
// org.opensearch.search.slice.SliceBuilder.toFilter.
for (int i = 0; i < indexIterators.size(); i++) {
if (slice.shardMatches(i, indexIterators.size())) {
allShardIterators.add(indexIterators.get(i));
}
}
}
} else {
shardIterators.values().forEach(allShardIterators::addAll);
}
return GroupShardsIterator.sortAndCreate(new ArrayList<>(set));

return GroupShardsIterator.sortAndCreate(allShardIterators);
}

public static ShardIterator getShards(ClusterState clusterState, ShardId shardId) {
Expand Down Expand Up @@ -311,6 +339,7 @@ private Set<IndexShardRoutingTable> computeTargetedShards(
set.add(indexShard);
}
}

}
return set;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -81,6 +82,13 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
clusterSearchShardsRequest.routing(request.param("routing"));
clusterSearchShardsRequest.preference(request.param("preference"));
clusterSearchShardsRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterSearchShardsRequest.indicesOptions()));
if (request.hasContentOrSourceParam()) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.parseXContent(request.contentOrSourceParamParser());
if (sourceBuilder.slice() != null) {
clusterSearchShardsRequest.slice(sourceBuilder.slice());
}
}
return channel -> client.admin().cluster().searchShards(clusterSearchShardsRequest, new RestToXContentListener<>(channel));
}
}
Loading