Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Mute the query profile IT with concurrent execution ([#9840](https://github.com/opensearch-project/OpenSearch/pull/9840))
- Force merge with `only_expunge_deletes` honors max segment size ([#10036](https://github.com/opensearch-project/OpenSearch/pull/10036))
- Add the means to extract the contextual properties from HttpChannel, TcpCChannel and TrasportChannel without excessive typecasting ([#10562](https://github.com/opensearch-project/OpenSearch/pull/10562))
- Search pipelines now support asynchronous request and response processors to avoid blocking on a transport thread ([#10598](https://github.com/opensearch-project/OpenSearch/pull/10598))
- [Remote Store] Add Remote Store backpressure rejection stats to `_nodes/stats` ([#10524](https://github.com/opensearch-project/OpenSearch/pull/10524))
- [BUG] Fix java.lang.SecurityException in repository-gcs plugin ([#10642](https://github.com/opensearch-project/OpenSearch/pull/10642))
- Add telemetry tracer/metric enable flag and integ test. ([#10395](https://github.com/opensearch-project/OpenSearch/pull/10395))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,24 +506,51 @@ private void executeRequest(
ActionListener<SearchResponse> listener;
try {
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest);
listener = ActionListener.wrap(
r -> originalListener.onResponse(searchRequest.transformResponse(r)),
originalListener::onFailure
);
listener = searchRequest.transformResponseListener(originalListener);
} catch (Exception e) {
originalListener.onFailure(e);
return;
}

if (searchQueryMetricsEnabled) {
try {
searchQueryCategorizer.categorize(searchRequest.source());
} catch (Exception e) {
logger.error("Error while trying to categorize the query.", e);
ActionListener<SearchRequest> requestTransformListener = ActionListener.wrap(sr -> {
if (searchQueryMetricsEnabled) {
try {
searchQueryCategorizer.categorize(sr.source());
} catch (Exception e) {
logger.error("Error while trying to categorize the query.", e);
}
}
}

ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
ActionListener<SearchSourceBuilder> rewriteListener = buildRewriteListener(
sr,
task,
timeProvider,
searchAsyncActionProvider,
listener,
searchRequestOperationsListener
);
if (sr.source() == null) {
rewriteListener.onResponse(sr.source());
} else {
Rewriteable.rewriteAndFetch(
sr.source(),
searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener
);
}
}, listener::onFailure);
searchRequest.transformRequest(requestTransformListener);
}

private ActionListener<SearchSourceBuilder> buildRewriteListener(
SearchRequest searchRequest,
Task task,
SearchTimeProvider timeProvider,
SearchAsyncActionProvider searchAsyncActionProvider,
ActionListener<SearchResponse> listener,
SearchRequestOperationsListener searchRequestOperationsListener
) {
return ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
// situations when source is rewritten to null due to a bug
Expand Down Expand Up @@ -634,15 +661,6 @@ private void executeRequest(
}
}
}, listener::onFailure);
if (searchRequest.source() == null) {
rewriteListener.onResponse(searchRequest.source());
} else {
Rewriteable.rewriteAndFetch(
searchRequest.source(),
searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener
);
}
}

static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
Expand Down
206 changes: 127 additions & 79 deletions server/src/main/java/org/opensearch/search/pipeline/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.search.SearchPhaseResult;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -117,92 +119,138 @@ protected void afterResponseProcessor(Processor processor, long timeInNanos) {}

protected void onResponseProcessorFailed(Processor processor) {}

SearchRequest transformRequest(SearchRequest request) throws SearchPipelineProcessingException {
if (searchRequestProcessors.isEmpty() == false) {
long pipelineStart = relativeTimeSupplier.getAsLong();
beforeTransformRequest();
try {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
request.writeTo(bytesStreamOutput);
try (StreamInput in = bytesStreamOutput.bytes().streamInput()) {
try (StreamInput input = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry)) {
request = new SearchRequest(input);
}
}
}
for (SearchRequestProcessor processor : searchRequestProcessors) {
beforeRequestProcessor(processor);
long start = relativeTimeSupplier.getAsLong();
try {
request = processor.processRequest(request);
} catch (Exception e) {
onRequestProcessorFailed(processor);
if (processor.isIgnoreFailure()) {
logger.warn(
"The exception from request processor ["
+ processor.getType()
+ "] in the search pipeline ["
+ id
+ "] was ignored",
e
);
} else {
throw e;
}
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterRequestProcessor(processor, took);
}
void transformRequest(SearchRequest request, ActionListener<SearchRequest> requestListener) throws SearchPipelineProcessingException {
if (searchRequestProcessors.isEmpty()) {
requestListener.onResponse(request);
return;
}

try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
request.writeTo(bytesStreamOutput);
try (StreamInput in = bytesStreamOutput.bytes().streamInput()) {
try (StreamInput input = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry)) {
request = new SearchRequest(input);
}
} catch (Exception e) {
onTransformRequestFailure();
throw new SearchPipelineProcessingException(e);
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
afterTransformRequest(took);
}
} catch (IOException e) {
requestListener.onFailure(new SearchPipelineProcessingException(e));
return;
}
return request;

ActionListener<SearchRequest> finalListener = getTerminalSearchRequestActionListener(requestListener);

// Chain listeners back-to-front
ActionListener<SearchRequest> currentListener = finalListener;
for (int i = searchRequestProcessors.size() - 1; i >= 0; i--) {
final ActionListener<SearchRequest> nextListener = currentListener;
SearchRequestProcessor processor = searchRequestProcessors.get(i);
currentListener = ActionListener.wrap(r -> {
long start = relativeTimeSupplier.getAsLong();
beforeRequestProcessor(processor);
processor.processRequestAsync(r, ActionListener.wrap(rr -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterRequestProcessor(processor, took);
nextListener.onResponse(rr);
}, e -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterRequestProcessor(processor, took);
onRequestProcessorFailed(processor);
if (processor.isIgnoreFailure()) {
logger.warn(
"The exception from request processor ["
+ processor.getType()
+ "] in the search pipeline ["
+ id
+ "] was ignored",
e
);
nextListener.onResponse(r);
} else {
nextListener.onFailure(new SearchPipelineProcessingException(e));
}
}));
}, finalListener::onFailure);
}

beforeTransformRequest();
currentListener.onResponse(request);
}

SearchResponse transformResponse(SearchRequest request, SearchResponse response) throws SearchPipelineProcessingException {
if (searchResponseProcessors.isEmpty() == false) {
long pipelineStart = relativeTimeSupplier.getAsLong();
beforeTransformResponse();
try {
for (SearchResponseProcessor processor : searchResponseProcessors) {
beforeResponseProcessor(processor);
long start = relativeTimeSupplier.getAsLong();
try {
response = processor.processResponse(request, response);
} catch (Exception e) {
onResponseProcessorFailed(processor);
if (processor.isIgnoreFailure()) {
logger.warn(
"The exception from response processor ["
+ processor.getType()
+ "] in the search pipeline ["
+ id
+ "] was ignored",
e
);
} else {
throw e;
}
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterResponseProcessor(processor, took);
private ActionListener<SearchRequest> getTerminalSearchRequestActionListener(ActionListener<SearchRequest> requestListener) {
final long pipelineStart = relativeTimeSupplier.getAsLong();

return ActionListener.wrap(r -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
afterTransformRequest(took);
requestListener.onResponse(new PipelinedRequest(this, r));
}, e -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
afterTransformRequest(took);
onTransformRequestFailure();
requestListener.onFailure(new SearchPipelineProcessingException(e));
});
}

ActionListener<SearchResponse> transformResponseListener(SearchRequest request, ActionListener<SearchResponse> responseListener) {
if (searchResponseProcessors.isEmpty()) {
// No response transformation necessary
return responseListener;
}

long[] pipelineStart = new long[1];

final ActionListener<SearchResponse> originalListener = responseListener;
responseListener = ActionListener.wrap(r -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]);
afterTransformResponse(took);
originalListener.onResponse(r);
}, e -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]);
afterTransformResponse(took);
onTransformResponseFailure();
originalListener.onFailure(e);
});
ActionListener<SearchResponse> finalListener = responseListener; // Jump directly to this one on exception.

for (int i = searchResponseProcessors.size() - 1; i >= 0; i--) {
final ActionListener<SearchResponse> currentFinalListener = responseListener;
final SearchResponseProcessor processor = searchResponseProcessors.get(i);

responseListener = ActionListener.wrap(r -> {
beforeResponseProcessor(processor);
final long start = relativeTimeSupplier.getAsLong();
processor.processResponseAsync(request, r, ActionListener.wrap(rr -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterResponseProcessor(processor, took);
currentFinalListener.onResponse(rr);
}, e -> {
onResponseProcessorFailed(processor);
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterResponseProcessor(processor, took);
if (processor.isIgnoreFailure()) {
logger.warn(
"The exception from response processor ["
+ processor.getType()
+ "] in the search pipeline ["
+ id
+ "] was ignored",
e
);
// Pass the previous response through to the next processor in the chain
currentFinalListener.onResponse(r);
} else {
currentFinalListener.onFailure(new SearchPipelineProcessingException(e));
}
}
} catch (Exception e) {
onTransformResponseFailure();
throw new SearchPipelineProcessingException(e);
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
afterTransformResponse(took);
}
}));
}, finalListener::onFailure);
}
return response;
final ActionListener<SearchResponse> chainListener = responseListener;
return ActionListener.wrap(r -> {
beforeTransformResponse();
pipelineStart[0] = relativeTimeSupplier.getAsLong();
chainListener.onResponse(r);
}, originalListener::onFailure);

}

<Result extends SearchPhaseResult> void runSearchPhaseResultsTransformer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.search.SearchPhaseResults;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.search.SearchPhaseResult;

/**
Expand All @@ -27,8 +28,12 @@ public final class PipelinedRequest extends SearchRequest {
this.pipeline = pipeline;
}

public SearchResponse transformResponse(SearchResponse response) {
return pipeline.transformResponse(this, response);
public void transformRequest(ActionListener<SearchRequest> requestListener) {
pipeline.transformRequest(this, requestListener);
}

public ActionListener<SearchResponse> transformResponseListener(ActionListener<SearchResponse> responseListener) {
return pipeline.transformResponseListener(this, responseListener);
}

public <Result extends SearchPhaseResult> void transformSearchPhaseResults(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,7 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) {
pipeline = pipelineHolder.pipeline;
}
}
SearchRequest transformedRequest = pipeline.transformRequest(searchRequest);
return new PipelinedRequest(pipeline, transformedRequest);
return new PipelinedRequest(pipeline, searchRequest);
}

Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessorFactories() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,37 @@
package org.opensearch.search.pipeline;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.core.action.ActionListener;

/**
* Interface for a search pipeline processor that modifies a search request.
*/
public interface SearchRequestProcessor extends Processor {

/**
* Transform a {@link SearchRequest}. Executed on the coordinator node before any {@link org.opensearch.action.search.SearchPhase}
* executes.
* <p>
* Implement this method if the processor makes no asynchronous calls.
* @param request the executed {@link SearchRequest}
* @return a new {@link SearchRequest} (or the input {@link SearchRequest} if no changes)
* @throws Exception if an error occurs during processing
*/
SearchRequest processRequest(SearchRequest request) throws Exception;

/**
* Transform a {@link SearchRequest}. Executed on the coordinator node before any {@link org.opensearch.action.search.SearchPhase}
* executes.
* <p>
* Expert method: Implement this if the processor needs to make asynchronous calls. Otherwise, implement processRequest.
* @param request the executed {@link SearchRequest}
* @param requestListener callback to be invoked on successful processing or on failure
*/
default void processRequestAsync(SearchRequest request, ActionListener<SearchRequest> requestListener) {
try {
requestListener.onResponse(processRequest(request));
} catch (Exception e) {
requestListener.onFailure(e);
}
}
}
Loading