Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988))
- Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088))
- Enabled Async Shard Batch Fetch by default ([#18139](https://github.com/opensearch-project/OpenSearch/pull/18139))
- Allow to get the search request from the QueryCoordinatorContext ([#17818](https://github.com/opensearch-project/OpenSearch/pull/17818))

### Changed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@
request.query(rewrittenQuery);
super.doExecute(task, request, listener);
}, listener::onFailure);
Rewriteable.rewriteAndFetch(
request.query(),
searchService.getIndicesService().getRewriteContext(() -> request.nowInMillis),
rewriteListener
);
Rewriteable.rewriteAndFetch(request.query(), searchService.getRewriteContext(() -> request.nowInMillis, request), rewriteListener);

Check warning on line 115 in server/src/main/java/org/opensearch/action/explain/TransportExplainAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/explain/TransportExplainAction.java#L115

Added line #L115 was not covered by tests
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

package org.opensearch.index.query;

import org.opensearch.action.IndicesRequest;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.search.pipeline.PipelinedRequest;
import org.opensearch.transport.client.Client;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
Expand All @@ -31,9 +33,9 @@
@PublicApi(since = "2.19.0")
public class QueryCoordinatorContext implements QueryRewriteContext {
private final QueryRewriteContext rewriteContext;
private final PipelinedRequest searchRequest;
private final IndicesRequest searchRequest;

public QueryCoordinatorContext(QueryRewriteContext rewriteContext, PipelinedRequest searchRequest) {
public QueryCoordinatorContext(QueryRewriteContext rewriteContext, IndicesRequest searchRequest) {
this.rewriteContext = rewriteContext;
this.searchRequest = searchRequest;
}
Expand Down Expand Up @@ -84,10 +86,14 @@ public QueryCoordinatorContext convertToCoordinatorContext() {
}

public Map<String, Object> getContextVariables() {
if (searchRequest instanceof PipelinedRequest) {
return new HashMap<>(((PipelinedRequest) searchRequest).getPipelineProcessingContext().getAttributes());
} else {
return Collections.emptyMap();
}
}

// Read from pipeline context
Map<String, Object> contextVariables = new HashMap<>(searchRequest.getPipelineProcessingContext().getAttributes());

return contextVariables;
public IndicesRequest getSearchRequest() {
return searchRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.lucene.search.TopDocs;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.IndicesRequest;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.search.DeletePitInfo;
import org.opensearch.action.search.DeletePitResponse;
Expand Down Expand Up @@ -127,7 +128,6 @@
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.lookup.SearchLookup;
import org.opensearch.search.pipeline.PipelinedRequest;
import org.opensearch.search.profile.Profilers;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QuerySearchRequest;
Expand Down Expand Up @@ -1785,7 +1785,7 @@ private void rewriteAndFetchShardRequest(IndexShard shard, ShardSearchRequest re
/**
* Returns a new {@link QueryRewriteContext} with the given {@code now} provider
*/
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis, PipelinedRequest searchRequest) {
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis, IndicesRequest searchRequest) {
return new QueryCoordinatorContext(indicesService.getRewriteContext(nowInMillis), searchRequest);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.query;

import org.opensearch.action.IndicesRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.search.pipeline.PipelinedRequest;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;
import org.junit.Before;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class QueryCoordinatorContextTests extends OpenSearchTestCase {

private IndexNameExpressionResolver indexNameExpressionResolver;

@Before
public void setup() {
indexNameExpressionResolver = new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY));
}

public void testGetContextVariables_whenPipelinedSearchRequest_thenReturnVariables() {
final PipelinedRequest searchRequest = createDummyPipelinedRequest();
searchRequest.getPipelineProcessingContext().setAttribute("key", "value");

final QueryCoordinatorContext queryCoordinatorContext = new QueryCoordinatorContext(mock(QueryRewriteContext.class), searchRequest);

assertEquals(Map.of("key", "value"), queryCoordinatorContext.getContextVariables());
}

private PipelinedRequest createDummyPipelinedRequest() {
final Client client = mock(Client.class);
final ThreadPool threadPool = mock(ThreadPool.class);
final ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService();
when(threadPool.generic()).thenReturn(executorService);
when(threadPool.executor(anyString())).thenReturn(executorService);
final SearchPipelineService searchPipelineService = new SearchPipelineService(
mock(ClusterService.class),
threadPool,
null,
null,
null,
null,
this.writableRegistry(),
Collections.singletonList(new SearchPipelinePlugin() {
@Override
public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Parameters parameters) {
return Collections.emptyMap();
}

@Override
public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Parameters parameters) {
return Collections.emptyMap();
}

@Override
public Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getSearchPhaseResultsProcessors(Parameters parameters) {
return Collections.emptyMap();
}

}),
client
);
final SearchRequest searchRequest = new SearchRequest();
return searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver);
}

public void testGetContextVariables_whenNotPipelinedSearchRequest_thenReturnEmpty() {
final IndicesRequest searchRequest = mock(IndicesRequest.class);

final QueryCoordinatorContext queryCoordinatorContext = new QueryCoordinatorContext(mock(QueryRewriteContext.class), searchRequest);

assertTrue(queryCoordinatorContext.getContextVariables().isEmpty());
}

public void testGetSearchRequest() {
final IndicesRequest searchRequest = mock(IndicesRequest.class);

final QueryCoordinatorContext queryCoordinatorContext = new QueryCoordinatorContext(mock(QueryRewriteContext.class), searchRequest);

assertEquals(searchRequest, queryCoordinatorContext.getSearchRequest());
}
}
Loading