Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement fixed interval refresh task scheduling ([#17777](https://github.com/opensearch-project/OpenSearch/pull/17777))
- Add GRPC DocumentService and Bulk endpoint ([#17727](https://github.com/opensearch-project/OpenSearch/pull/17727))
- Added scale to zero (`search_only` mode) support for OpenSearch reader writer separation ([#17299](https://github.com/opensearch-project/OpenSearch/pull/17299)
- Add target indices info to the QueryCoordinatorContext ([#17818](https://github.com/opensearch-project/OpenSearch/pull/17818))

### Changed
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@ protected void doExecute(Task task, ExplainRequest request, ActionListener<Expla
request.query(rewrittenQuery);
super.doExecute(task, request, listener);
}, listener::onFailure);
Rewriteable.rewriteAndFetch(
request.query(),
searchService.getIndicesService().getRewriteContext(() -> request.nowInMillis),
rewriteListener
);
Rewriteable.rewriteAndFetch(request.query(), searchService.getRewriteContext(() -> request.nowInMillis, request), rewriteListener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@

package org.opensearch.index.query;

import org.opensearch.action.IndicesRequest;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.index.IndexService;
import org.opensearch.search.pipeline.PipelinedRequest;
import org.opensearch.transport.client.Client;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;

Expand All @@ -31,11 +35,17 @@
@PublicApi(since = "2.19.0")
public class QueryCoordinatorContext implements QueryRewriteContext {
private final QueryRewriteContext rewriteContext;
private final PipelinedRequest searchRequest;

public QueryCoordinatorContext(QueryRewriteContext rewriteContext, PipelinedRequest searchRequest) {
private final IndicesRequest searchRequest;
private final List<IndexService> targetIndexServices;

public QueryCoordinatorContext(
QueryRewriteContext rewriteContext,
IndicesRequest searchRequest,
List<IndexService> targetIndexServices
) {
this.rewriteContext = rewriteContext;
this.searchRequest = searchRequest;
this.targetIndexServices = targetIndexServices;
}

@Override
Expand Down Expand Up @@ -84,10 +94,14 @@ public QueryCoordinatorContext convertToCoordinatorContext() {
}

public Map<String, Object> getContextVariables() {
if (searchRequest instanceof PipelinedRequest) {
return new HashMap<>(((PipelinedRequest) searchRequest).getPipelineProcessingContext().getAttributes());
} else {
return Collections.emptyMap();
}
}

// Read from pipeline context
Map<String, Object> contextVariables = new HashMap<>(searchRequest.getPipelineProcessingContext().getAttributes());

return contextVariables;
public List<IndexService> getTargetIndexServices() {
return Collections.unmodifiableList(targetIndexServices);
}
}
16 changes: 16 additions & 0 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.OpenSearchException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.IndicesRequest;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags.Flag;
Expand Down Expand Up @@ -1988,6 +1989,21 @@ public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) {
return getRewriteContext(nowInMillis, false);
}

/**
* Returns a target index services {@link IndexService} with the given {@link IndicesRequest} request
*/
public List<IndexService> getTargetIndexServiceList(IndicesRequest searchRequest) {
final Index[] targetIndices = indexNameExpressionResolver.concreteIndices(clusterService().state(), searchRequest);
final List<IndexService> targetIndicesList = new ArrayList<>();
for (Index index : targetIndices) {
final IndexService indexService = indexServiceSafe(index);
if (indexService != null) {
targetIndicesList.add(indexService);
}
}
return targetIndicesList;
}

/**
* Returns a new {@link QueryRewriteContext} for query validation with the given {@code now} provider
*/
Expand Down
10 changes: 7 additions & 3 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.lucene.search.TopDocs;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.IndicesRequest;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.search.DeletePitInfo;
import org.opensearch.action.search.DeletePitResponse;
Expand Down Expand Up @@ -126,7 +127,6 @@
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.lookup.SearchLookup;
import org.opensearch.search.pipeline.PipelinedRequest;
import org.opensearch.search.profile.Profilers;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QuerySearchRequest;
Expand Down Expand Up @@ -1773,8 +1773,12 @@ private void rewriteAndFetchShardRequest(IndexShard shard, ShardSearchRequest re
/**
* Returns a new {@link QueryRewriteContext} with the given {@code now} provider
*/
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis, PipelinedRequest searchRequest) {
return new QueryCoordinatorContext(indicesService.getRewriteContext(nowInMillis), searchRequest);
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis, IndicesRequest searchRequest) {
return new QueryCoordinatorContext(
indicesService.getRewriteContext(nowInMillis),
searchRequest,
indicesService.getTargetIndexServiceList(searchRequest)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.Version;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.admin.indices.stats.IndexShardStats;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -684,6 +685,20 @@ public void testCanCacheSizeNonzero() {
}
}

public void testGetTargetIndexServiceList() {
// prepare test data
createIndex("index");
IndicesService indicesService = getIndicesService();
SearchRequest searchRequest = new SearchRequest("index");

// invoke
final List<IndexService> targetIndices = indicesService.getTargetIndexServiceList(searchRequest);

// verify
assertEquals(1, targetIndices.size());
assertEquals("index", targetIndices.get(0).index().getName());
}

private void setupMocksForCanCache(TestSearchContext context, IndexReader.CacheHelper cacheHelper) {
ContextIndexSearcher searcher = mock(ContextIndexSearcher.class);
context.setSearcher(searcher);
Expand Down
Loading