diff --git a/CHANGELOG.md b/CHANGELOG.md index 241d88049214d..cdc4da175e75f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/)) - Update script supports java.lang.String.sha1() and java.lang.String.sha256() methods ([#16923](https://github.com/opensearch-project/OpenSearch/pull/16923)) - Added a precaution to handle extreme date values during sorting to prevent `arithmetic_exception: long overflow` ([#16812](https://github.com/opensearch-project/OpenSearch/pull/16812)). +- Add `verbose_pipeline` parameter to output each processor's execution details ([#16843](https://github.com/opensearch-project/OpenSearch/pull/16843)). - Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678)) - Introduce a setting to disable download of full cluster state from remote on term mismatch([#16798](https://github.com/opensearch-project/OpenSearch/pull/16798/)) - Added ability to retrieve value from DocValues in a flat_object filed([#16802](https://github.com/opensearch-project/OpenSearch/pull/16802)) diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponse.java b/server/src/main/java/org/opensearch/action/search/SearchResponse.java index 899c71e91e3ab..0d55fbf2e7f88 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/opensearch/action/search/SearchResponse.java @@ -59,6 +59,7 @@ import org.opensearch.search.aggregations.Aggregations; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.internal.InternalSearchResponse; +import org.opensearch.search.pipeline.ProcessorExecutionDetail; import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.profile.SearchProfileShardResults; import org.opensearch.search.suggest.Suggest; @@ -73,6 +74,7 @@ import java.util.function.Supplier; import static org.opensearch.action.search.SearchResponseSections.EXT_FIELD; +import static org.opensearch.action.search.SearchResponseSections.PROCESSOR_RESULT_FIELD; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; /** @@ -394,6 +396,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE List failures = new ArrayList<>(); Clusters clusters = Clusters.EMPTY; List extBuilders = new ArrayList<>(); + List processorResult = new ArrayList<>(); for (Token token = parser.nextToken(); token != Token.END_OBJECT; token = parser.nextToken()) { if (token == Token.FIELD_NAME) { currentFieldName = parser.currentName(); @@ -517,6 +520,11 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE extBuilders.add(searchExtBuilder); } } + } else if (PROCESSOR_RESULT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { + while ((token = parser.nextToken()) != Token.END_ARRAY) { + ProcessorExecutionDetail detail = ProcessorExecutionDetail.fromXContent(parser); + processorResult.add(detail); + } } else { parser.skipChildren(); } @@ -530,7 +538,8 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE terminatedEarly, profile, numReducePhases, - extBuilders + extBuilders, + processorResult ); return new SearchResponse( searchResponseSections, diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java b/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java index bca2c8a52b691..5eb305d91ee04 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java +++ b/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java @@ -40,6 +40,7 @@ import org.opensearch.search.SearchExtBuilder; import org.opensearch.search.SearchHits; import org.opensearch.search.aggregations.Aggregations; +import org.opensearch.search.pipeline.ProcessorExecutionDetail; import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.profile.SearchProfileShardResults; import org.opensearch.search.suggest.Suggest; @@ -65,7 +66,7 @@ public class SearchResponseSections implements ToXContentFragment { public static final ParseField EXT_FIELD = new ParseField("ext"); - + public static final ParseField PROCESSOR_RESULT_FIELD = new ParseField("processor_results"); protected final SearchHits hits; protected final Aggregations aggregations; protected final Suggest suggest; @@ -74,6 +75,7 @@ public class SearchResponseSections implements ToXContentFragment { protected final Boolean terminatedEarly; protected final int numReducePhases; protected final List searchExtBuilders = new ArrayList<>(); + protected final List processorResult = new ArrayList<>(); public SearchResponseSections( SearchHits hits, @@ -84,7 +86,17 @@ public SearchResponseSections( SearchProfileShardResults profileResults, int numReducePhases ) { - this(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, Collections.emptyList()); + this( + hits, + aggregations, + suggest, + timedOut, + terminatedEarly, + profileResults, + numReducePhases, + Collections.emptyList(), + Collections.emptyList() + ); } public SearchResponseSections( @@ -95,7 +107,8 @@ public SearchResponseSections( Boolean terminatedEarly, SearchProfileShardResults profileResults, int numReducePhases, - List searchExtBuilders + List searchExtBuilders, + List processorResult ) { this.hits = hits; this.aggregations = aggregations; @@ -104,6 +117,7 @@ public SearchResponseSections( this.timedOut = timedOut; this.terminatedEarly = terminatedEarly; this.numReducePhases = numReducePhases; + this.processorResult.addAll(processorResult); this.searchExtBuilders.addAll(Objects.requireNonNull(searchExtBuilders, "searchExtBuilders must not be null")); } @@ -166,6 +180,10 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params) } builder.endObject(); } + + if (!processorResult.isEmpty()) { + builder.field(PROCESSOR_RESULT_FIELD.getPreferredName(), processorResult); + } return builder; } @@ -173,6 +191,10 @@ public List getSearchExtBuilders() { return Collections.unmodifiableList(this.searchExtBuilders); } + public List getProcessorResult() { + return processorResult; + } + protected void writeTo(StreamOutput out) throws IOException { throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java index 05465e32631fd..8e2fa8246ac1b 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java @@ -256,6 +256,9 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil if (request.hasParam("timeout")) { searchSourceBuilder.timeout(request.paramAsTime("timeout", null)); } + if (request.hasParam("verbose_pipeline")) { + searchSourceBuilder.verbosePipeline(request.paramAsBoolean("verbose_pipeline", false)); + } if (request.hasParam("terminate_after")) { int terminateAfter = request.paramAsInt("terminate_after", SearchContext.DEFAULT_TERMINATE_AFTER); if (terminateAfter < 0) { diff --git a/server/src/main/java/org/opensearch/search/SearchHits.java b/server/src/main/java/org/opensearch/search/SearchHits.java index 8232643b353f5..fab0c8fb59d9c 100644 --- a/server/src/main/java/org/opensearch/search/SearchHits.java +++ b/server/src/main/java/org/opensearch/search/SearchHits.java @@ -37,6 +37,7 @@ import org.apache.lucene.search.TotalHits.Relation; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.Lucene; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -166,6 +167,22 @@ public SearchHit[] getHits() { return this.hits; } + /** + * Creates a deep copy of this SearchHits instance. + * + * @return a deep copy of the current SearchHits object + * @throws IOException if an I/O exception occurs during serialization or deserialization + */ + public SearchHits deepCopy() throws IOException { + try (BytesStreamOutput out = new BytesStreamOutput()) { + this.writeTo(out); + + try (StreamInput in = out.bytes().streamInput()) { + return new SearchHits(in); + } + } + } + /** * Return the hit as the provided position. */ diff --git a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java index 9c438401b9fbe..fb21eaff5f857 100644 --- a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java @@ -136,6 +136,7 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R public static final ParseField SLICE = new ParseField("slice"); public static final ParseField POINT_IN_TIME = new ParseField("pit"); public static final ParseField SEARCH_PIPELINE = new ParseField("search_pipeline"); + public static final ParseField VERBOSE_SEARCH_PIPELINE = new ParseField("verbose_pipeline"); public static SearchSourceBuilder fromXContent(XContentParser parser) throws IOException { return fromXContent(parser, true); @@ -226,6 +227,8 @@ public static HighlightBuilder highlight() { private String searchPipeline; + private boolean verbosePipeline = false; + /** * Constructs a new search source builder. */ @@ -302,6 +305,9 @@ public SearchSourceBuilder(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_2_18_0)) { searchPipeline = in.readOptionalString(); } + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + verbosePipeline = in.readBoolean(); + } } @Override @@ -385,6 +391,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_18_0)) { out.writeOptionalString(searchPipeline); } + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeBoolean(verbosePipeline); + } } /** @@ -1142,6 +1151,26 @@ public SearchSourceBuilder pipeline(String searchPipeline) { return this; } + /** + * Enables or disables verbose mode for the search pipeline. + * + * When verbose mode is enabled, detailed information about each processor + * in the search pipeline is included in the search response. This includes + * the processor name, execution status, input, output, and time taken for processing. + * + * This parameter is primarily intended for debugging purposes, allowing users + * to track how data flows and transforms through the search pipeline. + * + */ + public SearchSourceBuilder verbosePipeline(Boolean verbosePipeline) { + this.verbosePipeline = verbosePipeline; + return this; + } + + public Boolean verbosePipeline() { + return verbosePipeline; + } + /** * Rewrites this search source builder into its primitive form. e.g. by * rewriting the QueryBuilder. If the builder did not change the identity @@ -1240,6 +1269,7 @@ private SearchSourceBuilder shallowCopy( rewrittenBuilder.derivedFieldsObject = derivedFieldsObject; rewrittenBuilder.derivedFields = derivedFields; rewrittenBuilder.searchPipeline = searchPipeline; + rewrittenBuilder.verbosePipeline = verbosePipeline; return rewrittenBuilder; } @@ -1309,6 +1339,8 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th profile = parser.booleanValue(); } else if (SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) { searchPipeline = parser.text(); + } else if (VERBOSE_SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) { + verbosePipeline = parser.booleanValue(); } else { throw new ParsingException( parser.getTokenLocation(), @@ -1642,6 +1674,10 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t builder.field(SEARCH_PIPELINE.getPreferredName(), searchPipeline); } + if (verbosePipeline) { + builder.field(VERBOSE_SEARCH_PIPELINE.getPreferredName(), verbosePipeline); + } + return builder; } @@ -1920,7 +1956,8 @@ public int hashCode() { pointInTimeBuilder, derivedFieldsObject, derivedFields, - searchPipeline + searchPipeline, + verbosePipeline ); } @@ -1966,7 +2003,8 @@ public boolean equals(Object obj) { && Objects.equals(pointInTimeBuilder, other.pointInTimeBuilder) && Objects.equals(derivedFieldsObject, other.derivedFieldsObject) && Objects.equals(derivedFields, other.derivedFields) - && Objects.equals(searchPipeline, other.searchPipeline); + && Objects.equals(searchPipeline, other.searchPipeline) + && Objects.equals(verbosePipeline, other.verbosePipeline); } @Override diff --git a/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java b/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java index c9d7b0084c1e1..c014cd2577662 100644 --- a/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java +++ b/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java @@ -42,6 +42,7 @@ import org.opensearch.search.SearchExtBuilder; import org.opensearch.search.SearchHits; import org.opensearch.search.aggregations.InternalAggregations; +import org.opensearch.search.pipeline.ProcessorExecutionDetail; import org.opensearch.search.profile.SearchProfileShardResults; import org.opensearch.search.suggest.Suggest; @@ -73,7 +74,17 @@ public InternalSearchResponse( Boolean terminatedEarly, int numReducePhases ) { - this(hits, aggregations, suggest, profileResults, timedOut, terminatedEarly, numReducePhases, Collections.emptyList()); + this( + hits, + aggregations, + suggest, + profileResults, + timedOut, + terminatedEarly, + numReducePhases, + Collections.emptyList(), + Collections.emptyList() + ); } public InternalSearchResponse( @@ -84,9 +95,20 @@ public InternalSearchResponse( boolean timedOut, Boolean terminatedEarly, int numReducePhases, - List searchExtBuilderList + List searchExtBuilderList, + List processorResult ) { - super(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, searchExtBuilderList); + super( + hits, + aggregations, + suggest, + timedOut, + terminatedEarly, + profileResults, + numReducePhases, + searchExtBuilderList, + processorResult + ); } public InternalSearchResponse(StreamInput in) throws IOException { @@ -98,7 +120,8 @@ public InternalSearchResponse(StreamInput in) throws IOException { in.readOptionalBoolean(), in.readOptionalWriteable(SearchProfileShardResults::new), in.readVInt(), - readSearchExtBuildersOnOrAfter(in) + readSearchExtBuildersOnOrAfter(in), + readProcessorResultOnOrAfter(in) ); } @@ -112,6 +135,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(profileResults); out.writeVInt(numReducePhases); writeSearchExtBuildersOnOrAfter(out, searchExtBuilders); + writeProcessorResultOnOrAfter(out, processorResult); } private static List readSearchExtBuildersOnOrAfter(StreamInput in) throws IOException { @@ -123,4 +147,15 @@ private static void writeSearchExtBuildersOnOrAfter(StreamOutput out, List readProcessorResultOnOrAfter(StreamInput in) throws IOException { + return (in.getVersion().onOrAfter(Version.V_3_0_0)) ? in.readList(ProcessorExecutionDetail::new) : Collections.emptyList(); + } + + private static void writeProcessorResultOnOrAfter(StreamOutput out, List processorResult) throws IOException { + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeList(processorResult); + } + } + } diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index c88dfb2060393..e2d00834bd57d 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -144,7 +144,11 @@ void transformRequest(SearchRequest request, ActionListener reque ActionListener currentListener = finalListener; for (int i = searchRequestProcessors.size() - 1; i >= 0; i--) { final ActionListener nextListener = currentListener; - SearchRequestProcessor processor = searchRequestProcessors.get(i); + // Conditionally wrap the current processor with a TrackingSearchRequestProcessorWrapper + // if verbosePipeline mode is enabled. This allows detailed execution tracking for debugging purposes. + final SearchRequestProcessor processor = request.source().verbosePipeline() + ? new TrackingSearchRequestProcessorWrapper(searchRequestProcessors.get(i)) + : searchRequestProcessors.get(i); currentListener = ActionListener.wrap(r -> { long start = relativeTimeSupplier.getAsLong(); beforeRequestProcessor(processor); @@ -156,7 +160,9 @@ void transformRequest(SearchRequest request, ActionListener reque long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); afterRequestProcessor(processor, took); onRequestProcessorFailed(processor); - if (processor.isIgnoreFailure()) { + // When verbosePipeline is enabled, all processor failures are ignored to ensure the execution chain continues without + // interruption.TrackingSearchResponseProcessorWrapper will log all errors in detail for debugging purposes + if (processor.isIgnoreFailure() || r.source().verbosePipeline()) { logger.warn( "The exception from request processor [" + processor.getType() @@ -201,7 +207,6 @@ ActionListener transformResponseListener( PipelineProcessingContext requestContext ) { if (searchResponseProcessors.isEmpty()) { - // No response transformation necessary return responseListener; } @@ -222,8 +227,9 @@ ActionListener transformResponseListener( for (int i = searchResponseProcessors.size() - 1; i >= 0; i--) { final ActionListener currentFinalListener = responseListener; - final SearchResponseProcessor processor = searchResponseProcessors.get(i); - + final SearchResponseProcessor processor = request.source().verbosePipeline() + ? new TrackingSearchResponseProcessorWrapper(searchResponseProcessors.get(i)) + : searchResponseProcessors.get(i); responseListener = ActionListener.wrap(r -> { beforeResponseProcessor(processor); final long start = relativeTimeSupplier.getAsLong(); @@ -235,7 +241,9 @@ ActionListener transformResponseListener( onResponseProcessorFailed(processor); long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); afterResponseProcessor(processor, took); - if (processor.isIgnoreFailure()) { + // When verbosePipeline is enabled, all processor failures are ignored to ensure the execution chain continues without + // interruption.TrackingSearchResponseProcessorWrapper will log all errors in detail for debugging purposes + if (processor.isIgnoreFailure() || request.source().verbosePipeline()) { logger.warn( "The exception from response processor [" + processor.getType() diff --git a/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java b/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java index a1f2b8b99d958..7e86c30ddbbd9 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java +++ b/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java @@ -8,7 +8,10 @@ package org.opensearch.search.pipeline; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -16,6 +19,7 @@ */ public class PipelineProcessingContext { private final Map attributes = new HashMap<>(); + private final List processorExecutionDetails = new ArrayList<>(); /** * Set a generic attribute in the state for this request. Overwrites any existing value. @@ -35,4 +39,22 @@ public void setAttribute(String name, Object value) { public Object getAttribute(String name) { return attributes.get(name); } + + /** + * Add a ProcessorExecutionDetail to the list of execution details. + * + * @param detail the ProcessorExecutionDetail to add + */ + public void addProcessorExecutionDetail(ProcessorExecutionDetail detail) { + processorExecutionDetails.add(detail); + } + + /** + * Get all ProcessorExecutionDetails recorded in this context. + * + * @return a list of ProcessorExecutionDetails + */ + public List getProcessorExecutionDetails() { + return Collections.unmodifiableList(processorExecutionDetails); + } } diff --git a/server/src/main/java/org/opensearch/search/pipeline/PipelinedRequest.java b/server/src/main/java/org/opensearch/search/pipeline/PipelinedRequest.java index d550fbb768133..f5ce94946dd32 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/PipelinedRequest.java +++ b/server/src/main/java/org/opensearch/search/pipeline/PipelinedRequest.java @@ -15,6 +15,8 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.search.SearchPhaseResult; +import java.util.List; + /** * Groups a search pipeline based on a request and the request after being transformed by the pipeline. * @@ -35,7 +37,15 @@ public void transformRequest(ActionListener requestListener) { } public ActionListener transformResponseListener(ActionListener responseListener) { - return pipeline.transformResponseListener(this, responseListener, requestContext); + return pipeline.transformResponseListener(this, ActionListener.wrap(response -> { + // Extract processor execution details + List details = requestContext.getProcessorExecutionDetails(); + // Add details to the response's InternalResponse if available + if (!details.isEmpty() && response.getInternalResponse() != null) { + response.getInternalResponse().getProcessorResult().addAll(details); + } + responseListener.onResponse(response); + }, responseListener::onFailure), requestContext); } public void transformSearchPhaseResults( diff --git a/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java b/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java new file mode 100644 index 0000000000000..61c627c3dc54c --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java @@ -0,0 +1,299 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.xcontent.XContentUtils; +import org.opensearch.core.ParseField; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.DeprecationHandler; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; + +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; + +/** + * Detailed information about a processor execution in a search pipeline. + * + * @opensearch.api + */ +@PublicApi(since = "2.19.0") +public class ProcessorExecutionDetail implements Writeable, ToXContentObject { + + private final String processorName; + private long durationMillis; + private Object inputData; + private Object outputData; + private ProcessorStatus status; + private String errorMessage; + private String tag; + private static final ParseField PROCESSOR_NAME_FIELD = new ParseField("processor_name"); + private static final ParseField DURATION_MILLIS_FIELD = new ParseField("duration_millis"); + private static final ParseField INPUT_DATA_FIELD = new ParseField("input_data"); + private static final ParseField OUTPUT_DATA_FIELD = new ParseField("output_data"); + private static final ParseField STATUS_FIELD = new ParseField("status"); + private static final ParseField ERROR_MESSAGE_FIELD = new ParseField("error"); + private static final ParseField TAG_FIELD = new ParseField("tag"); + // Key for processor execution details + public static final String PROCESSOR_EXECUTION_DETAILS_KEY = "processorExecutionDetails"; + + /** + * Constructor for ProcessorExecutionDetail + */ + public ProcessorExecutionDetail( + String processorName, + long durationMillis, + Object inputData, + Object outputData, + ProcessorStatus status, + String errorMessage, + String tag + ) { + this.processorName = processorName; + this.durationMillis = durationMillis; + this.inputData = inputData; + this.outputData = outputData; + this.status = status; + this.errorMessage = errorMessage; + this.tag = tag; + } + + public ProcessorExecutionDetail(String processorName) { + this(processorName, 0, null, null, ProcessorStatus.SUCCESS, null, null); + } + + public ProcessorExecutionDetail(String processorName, String tag) { + this(processorName, 0, null, null, ProcessorStatus.SUCCESS, null, tag); + } + + public ProcessorExecutionDetail(StreamInput in) throws IOException { + this.processorName = in.readString(); + this.durationMillis = in.readLong(); + this.inputData = in.readGenericValue(); + this.outputData = in.readGenericValue(); + this.status = in.readEnum(ProcessorStatus.class); + this.errorMessage = in.readString(); + this.tag = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(processorName); + out.writeLong(durationMillis); + out.writeGenericValue(inputData); + out.writeGenericValue(outputData); + out.writeEnum(status); + out.writeString(errorMessage); + out.writeString(tag); + } + + public String getProcessorName() { + return processorName; + } + + public long getDurationMillis() { + return durationMillis; + } + + public Object getInputData() { + return inputData; + + } + + public Object getOutputData() { + return outputData; + } + + public void markProcessorAsFailed(ProcessorStatus status, String errorMessage) { + this.status = status; + this.errorMessage = errorMessage; + } + + public ProcessorStatus getStatus() { + return status; + } + + /** + * Adds or updates the input data for this processor execution detail. + * + * @param inputData the new input data + */ + public void addInput(Object inputData) { + this.inputData = inputData; + } + + /** + * Adds or updates the output data for this processor execution detail. + * + * @param outputData the new output data + */ + public void addOutput(Object outputData) { + this.outputData = outputData; + } + + /** + * Adds or updates the duration of the processor execution. + * + * @param durationMillis the new duration in milliseconds + */ + public void addTook(long durationMillis) { + this.durationMillis = durationMillis; + } + + /** + * Serializes the processor execution detail into XContent. + * Includes the error message only if the processor has failed. + */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(PROCESSOR_NAME_FIELD.getPreferredName(), processorName); + // tag is optional when setting up processor + if (tag != null) { + builder.field(TAG_FIELD.getPreferredName(), tag); + } + builder.field(DURATION_MILLIS_FIELD.getPreferredName(), durationMillis); + builder.field(STATUS_FIELD.getPreferredName(), status.name().toLowerCase(Locale.ROOT)); + if (status == ProcessorStatus.FAIL) { + builder.field(ERROR_MESSAGE_FIELD.getPreferredName(), errorMessage); + } + addFieldToXContent(builder, INPUT_DATA_FIELD.getPreferredName(), inputData, params); + addFieldToXContent(builder, OUTPUT_DATA_FIELD.getPreferredName(), outputData, params); + + builder.endObject(); + return builder; + } + + private void addFieldToXContent(XContentBuilder builder, String fieldName, Object data, Params params) throws IOException { + if (data == null) { + builder.nullField(fieldName); + return; + } + + if (data instanceof List) { + builder.startArray(fieldName); + for (Object item : (List) data) { + writeItemToXContent(builder, item, params); + } + builder.endArray(); + } else if (data instanceof Map) { + builder.startObject(fieldName); + for (Map.Entry entry : ((Map) data).entrySet()) { + addFieldToXContent(builder, entry.getKey().toString(), entry.getValue(), params); + } + builder.endObject(); + } else if (data instanceof ToXContentObject) { + builder.field(fieldName); + ((ToXContentObject) data).toXContent(builder, params); + } else if (data instanceof String) { + // If the data is a String, attempt to parse it as JSON + String jsonString = (String) data; + try { + // check if its json string + try ( + XContentParser parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, jsonString) + ) { + Map parsedMap = parser.map(); + + builder.startObject(fieldName); + for (Map.Entry entry : parsedMap.entrySet()) { + addFieldToXContent(builder, entry.getKey(), entry.getValue(), params); + } + builder.endObject(); + } + } catch (IOException e) { + // If parsing fails, write the string as a plain field + builder.field(fieldName, jsonString); + } + } else { + builder.field(fieldName, data); + } + } + + private void writeItemToXContent(XContentBuilder builder, Object item, Params params) throws IOException { + if (item instanceof ToXContentObject) { + ((ToXContentObject) item).toXContent(builder, params); + } else { + builder.value(item); + } + } + + public static ProcessorExecutionDetail fromXContent(XContentParser parser) throws IOException { + String processorName = null; + long durationMillis = 0; + Object inputData = null; + Object outputData = null; + ProcessorStatus status = null; + String errorMessage = null; + String tag = null; + if (parser.currentToken() != XContentParser.Token.START_OBJECT) { + parser.nextToken(); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + } + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + + if (PROCESSOR_NAME_FIELD.match(fieldName, parser.getDeprecationHandler())) { + processorName = parser.text(); + } else if (TAG_FIELD.match(fieldName, parser.getDeprecationHandler())) { + tag = parser.text(); + } else if (DURATION_MILLIS_FIELD.match(fieldName, parser.getDeprecationHandler())) { + durationMillis = parser.longValue(); + } else if (STATUS_FIELD.match(fieldName, parser.getDeprecationHandler())) { + status = ProcessorStatus.valueOf(parser.text().toUpperCase(Locale.ROOT)); + } else if (ERROR_MESSAGE_FIELD.match(fieldName, parser.getDeprecationHandler())) { + errorMessage = parser.text(); + } else if (INPUT_DATA_FIELD.match(fieldName, parser.getDeprecationHandler())) { + inputData = XContentUtils.readValue(parser, parser.currentToken()); + } else if (OUTPUT_DATA_FIELD.match(fieldName, parser.getDeprecationHandler())) { + outputData = XContentUtils.readValue(parser, parser.currentToken()); + } else { + parser.skipChildren(); + } + } + + if (processorName == null) { + throw new IllegalArgumentException("Processor name is required"); + } + + return new ProcessorExecutionDetail(processorName, durationMillis, inputData, outputData, status, errorMessage, tag); + } + + @Override + public int hashCode() { + return Objects.hash(processorName, durationMillis, inputData, outputData, status, errorMessage, tag); + } + + /** + * Represents the status of a processor in the search pipeline. + * + *

This enum is used to indicate whether a processor has executed successfully + * or encountered a failure during its execution. It helps in categorizing the + * execution result of processors within a pipeline. + * + */ + @PublicApi(since = "2.19.0") + public enum ProcessorStatus { + SUCCESS, + FAIL + } +} diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index 012d6695c042b..27b837740c0ca 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -367,7 +367,6 @@ static ClusterState innerDelete(DeleteSearchPipelineRequest request, ClusterStat public PipelinedRequest resolvePipeline(SearchRequest searchRequest, IndexNameExpressionResolver indexNameExpressionResolver) { Pipeline pipeline = Pipeline.NO_OP_PIPELINE; - if (searchRequest.source() != null && searchRequest.source().searchPipelineSource() != null) { // Pipeline defined in search request (ad hoc pipeline). if (searchRequest.pipeline() != null) { @@ -426,6 +425,9 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest, IndexNameEx pipeline = pipelineHolder.pipeline; } } + if (searchRequest.source() != null && searchRequest.source().verbosePipeline() && pipeline.equals(Pipeline.NO_OP_PIPELINE)) { + throw new IllegalArgumentException("The 'verbose pipeline' option requires a search pipeline to be defined."); + } PipelineProcessingContext requestContext = new PipelineProcessingContext(); return new PipelinedRequest(pipeline, searchRequest, requestContext); } diff --git a/server/src/main/java/org/opensearch/search/pipeline/TrackingSearchRequestProcessorWrapper.java b/server/src/main/java/org/opensearch/search/pipeline/TrackingSearchRequestProcessorWrapper.java new file mode 100644 index 0000000000000..594b3b5e382db --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/TrackingSearchRequestProcessorWrapper.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.core.action.ActionListener; + +/** + * Wrapper for SearchRequestProcessor to track execution details. + * + * @opensearch.internal + */ +public class TrackingSearchRequestProcessorWrapper implements SearchRequestProcessor { + + private final SearchRequestProcessor wrappedProcessor; + + /** + * Constructor for the wrapper. + * + * @param wrappedProcessor the actual processor to be wrapped + */ + public TrackingSearchRequestProcessorWrapper(SearchRequestProcessor wrappedProcessor) { + this.wrappedProcessor = wrappedProcessor; + } + + @Override + public String getType() { + return wrappedProcessor.getType(); + } + + @Override + public String getTag() { + return wrappedProcessor.getTag(); + } + + @Override + public String getDescription() { + return wrappedProcessor.getDescription(); + } + + @Override + public boolean isIgnoreFailure() { + return wrappedProcessor.isIgnoreFailure(); + } + + @Override + public SearchRequest processRequest(SearchRequest request) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void processRequestAsync( + SearchRequest request, + PipelineProcessingContext requestContext, + ActionListener requestListener + ) { + ProcessorExecutionDetail detail = new ProcessorExecutionDetail(getType(), getTag()); + long start = System.nanoTime(); + detail.addInput(request.source().toString()); + wrappedProcessor.processRequestAsync(request, requestContext, ActionListener.wrap(result -> { + detail.addOutput(result.source().toString()); + long took = System.nanoTime() - start; + detail.addTook(took); + requestContext.addProcessorExecutionDetail(detail); + requestListener.onResponse(result); + }, e -> { + detail.markProcessorAsFailed(ProcessorExecutionDetail.ProcessorStatus.FAIL, e.getMessage()); + requestContext.addProcessorExecutionDetail(detail); + requestListener.onFailure(e); + })); + } +} diff --git a/server/src/main/java/org/opensearch/search/pipeline/TrackingSearchResponseProcessorWrapper.java b/server/src/main/java/org/opensearch/search/pipeline/TrackingSearchResponseProcessorWrapper.java new file mode 100644 index 0000000000000..51b0a5ebc7103 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/TrackingSearchResponseProcessorWrapper.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.core.action.ActionListener; + +import java.io.IOException; +import java.util.Arrays; + +/** + * Wrapper for SearchResponseProcessor to track execution details. + * + * @opensearch.internal + */ +public class TrackingSearchResponseProcessorWrapper implements SearchResponseProcessor { + + private final SearchResponseProcessor wrappedProcessor; + + /** + * Constructor for the wrapper. + * + * @param wrappedProcessor the actual processor to be wrapped + */ + public TrackingSearchResponseProcessorWrapper(SearchResponseProcessor wrappedProcessor) { + if (wrappedProcessor == null) { + throw new IllegalArgumentException("Wrapped processor cannot be null."); + } + this.wrappedProcessor = wrappedProcessor; + } + + @Override + public String getType() { + return wrappedProcessor.getType(); + } + + @Override + public String getTag() { + return wrappedProcessor.getTag(); + } + + @Override + public String getDescription() { + return wrappedProcessor.getDescription(); + } + + @Override + public boolean isIgnoreFailure() { + return wrappedProcessor.isIgnoreFailure(); + } + + @Override + public SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelineProcessingContext requestContext) { + throw new UnsupportedOperationException(); + } + + @Override + public void processResponseAsync( + SearchRequest request, + SearchResponse response, + PipelineProcessingContext requestContext, + ActionListener responseListener + ) { + ProcessorExecutionDetail detail = new ProcessorExecutionDetail(getType(), getTag()); + long start = System.nanoTime(); + try { + detail.addInput(Arrays.asList(response.getHits().deepCopy().getHits())); + } catch (IOException e) { + responseListener.onFailure(e); + return; + } + wrappedProcessor.processResponseAsync(request, response, requestContext, ActionListener.wrap(result -> { + detail.addOutput(Arrays.asList(result.getHits().deepCopy().getHits())); + long took = System.nanoTime() - start; + detail.addTook(took); + requestContext.addProcessorExecutionDetail(detail); + responseListener.onResponse(result); + }, e -> { + detail.markProcessorAsFailed(ProcessorExecutionDetail.ProcessorStatus.FAIL, e.getMessage()); + requestContext.addProcessorExecutionDetail(detail); + responseListener.onFailure(e); + })); + } + +} diff --git a/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java b/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java index c9e59ab4ea04d..97586a0f96c42 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java @@ -62,6 +62,7 @@ import org.opensearch.search.aggregations.AggregationsTests; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.internal.InternalSearchResponse; +import org.opensearch.search.pipeline.ProcessorExecutionDetail; import org.opensearch.search.profile.SearchProfileShardResults; import org.opensearch.search.profile.SearchProfileShardResultsTests; import org.opensearch.search.suggest.Suggest; @@ -76,6 +77,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.UUID; @@ -176,7 +178,8 @@ public SearchResponse createTestItem( timedOut, terminatedEarly, numReducePhases, - searchExtBuilders + searchExtBuilders, + Collections.emptyList() ); } else { internalSearchResponse = InternalSearchResponse.empty(); @@ -311,6 +314,26 @@ public void testToXContent() { hit.score(2.0f); SearchHit[] hits = new SearchHit[] { hit }; String dummyId = UUID.randomUUID().toString(); + List processorResults = List.of( + new ProcessorExecutionDetail( + "processor1", + 50, + List.of(1), + List.of(1), + ProcessorExecutionDetail.ProcessorStatus.SUCCESS, + null, + null + ), + new ProcessorExecutionDetail( + "processor2", + 30, + List.of(3), + List.of(3), + ProcessorExecutionDetail.ProcessorStatus.SUCCESS, + null, + null + ) + ); { SearchResponse response = new SearchResponse( new InternalSearchResponse( @@ -321,7 +344,8 @@ public void testToXContent() { false, null, 1, - List.of(new DummySearchExtBuilder(dummyId)) + List.of(new DummySearchExtBuilder(dummyId)), + processorResults ), null, 0, @@ -354,6 +378,22 @@ public void testToXContent() { { expectedString.append("{\"dummy\":\"" + dummyId + "\"}"); } + expectedString.append(",\"processor_results\":"); + expectedString.append("["); + for (int i = 0; i < processorResults.size(); i++) { + ProcessorExecutionDetail detail = processorResults.get(i); + expectedString.append("{"); + expectedString.append("\"processor_name\":\"").append(detail.getProcessorName()).append("\","); + expectedString.append("\"duration_millis\":").append(detail.getDurationMillis()).append(","); + expectedString.append("\"status\":\"").append(detail.getStatus().toString().toLowerCase(Locale.ROOT)).append("\","); + expectedString.append("\"input_data\":").append(detail.getInputData()).append(","); + expectedString.append("\"output_data\":").append(detail.getOutputData()); + expectedString.append("}"); + if (i < processorResults.size() - 1) { + expectedString.append(","); + } + } + expectedString.append("]"); } expectedString.append("}"); assertEquals(expectedString.toString(), Strings.toString(MediaTypeRegistry.JSON, response)); diff --git a/server/src/test/java/org/opensearch/search/GenericSearchExtBuilderTests.java b/server/src/test/java/org/opensearch/search/GenericSearchExtBuilderTests.java index 8fb1814962155..8d910d4cc57d3 100644 --- a/server/src/test/java/org/opensearch/search/GenericSearchExtBuilderTests.java +++ b/server/src/test/java/org/opensearch/search/GenericSearchExtBuilderTests.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -251,7 +252,8 @@ public SearchResponse createTestItem( timedOut, terminatedEarly, numReducePhases, - searchExtBuilders + searchExtBuilders, + Collections.emptyList() ); } else { internalSearchResponse = InternalSearchResponse.empty(); diff --git a/server/src/test/java/org/opensearch/search/builder/SearchSourceBuilderTests.java b/server/src/test/java/org/opensearch/search/builder/SearchSourceBuilderTests.java index da8ccc9e121e0..90962a5c613f1 100644 --- a/server/src/test/java/org/opensearch/search/builder/SearchSourceBuilderTests.java +++ b/server/src/test/java/org/opensearch/search/builder/SearchSourceBuilderTests.java @@ -703,6 +703,30 @@ public void testParseFromAndSize() throws IOException { } } + public void testVerbosePipeline() throws IOException { + { + String restContent = "{ \"verbose_pipeline\": true }"; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, restContent)) { + SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(parser); + assertTrue(searchSourceBuilder.verbosePipeline()); + } + } + { + String restContent = "{ \"verbose_pipeline\": false }"; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, restContent)) { + SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(parser); + assertFalse(searchSourceBuilder.verbosePipeline()); + } + } + { + String restContent = "{ \"query\": { \"match_all\": {} } }"; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, restContent)) { + SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(parser); + assertFalse(searchSourceBuilder.verbosePipeline()); + } + } + } + private void assertIndicesBoostParseErrorMessage(String restContent, String expectedErrorMessage) throws IOException { try (XContentParser parser = createParser(JsonXContent.jsonXContent, restContent)) { ParsingException e = expectThrows(ParsingException.class, () -> SearchSourceBuilder.fromXContent(parser)); diff --git a/server/src/test/java/org/opensearch/search/pipeline/ProcessorExecutionDetailTests.java b/server/src/test/java/org/opensearch/search/pipeline/ProcessorExecutionDetailTests.java new file mode 100644 index 0000000000000..0a5b62add6aa1 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/pipeline/ProcessorExecutionDetailTests.java @@ -0,0 +1,187 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.xcontent.DeprecationHandler; +import org.opensearch.core.xcontent.MediaType; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class ProcessorExecutionDetailTests extends OpenSearchTestCase { + + public void testSerializationRoundtrip() throws IOException { + ProcessorExecutionDetail detail = new ProcessorExecutionDetail( + "testProcessor", + 123L, + Map.of("key", "value"), + List.of(1, 2, 3), + ProcessorExecutionDetail.ProcessorStatus.SUCCESS, + "", + "" + ); + ProcessorExecutionDetail deserialized; + try (BytesStreamOutput output = new BytesStreamOutput()) { + detail.writeTo(output); + try (StreamInput input = output.bytes().streamInput()) { + deserialized = new ProcessorExecutionDetail(input); + } + } + assertEquals("testProcessor", deserialized.getProcessorName()); + assertEquals(123L, deserialized.getDurationMillis()); + assertEquals(Map.of("key", "value"), deserialized.getInputData()); + assertEquals(List.of(1, 2, 3), deserialized.getOutputData()); + } + + public void testAddMethods() { + ProcessorExecutionDetail detail = new ProcessorExecutionDetail("testProcessor"); + detail.addTook(456L); + detail.addInput(Map.of("newKey", "newValue")); + detail.addOutput(List.of(4, 5, 6)); + assertEquals(456L, detail.getDurationMillis()); + assertEquals(Map.of("newKey", "newValue"), detail.getInputData()); + assertEquals(List.of(4, 5, 6), detail.getOutputData()); + } + + public void testToXContent() throws IOException { + ProcessorExecutionDetail detail = new ProcessorExecutionDetail( + "testProcessor", + 123L, + Map.of("key1", "value1"), + List.of(1, 2, 3), + ProcessorExecutionDetail.ProcessorStatus.SUCCESS, + "", + null + ); + + XContentBuilder actualBuilder = XContentBuilder.builder(JsonXContent.jsonXContent); + detail.toXContent(actualBuilder, ToXContent.EMPTY_PARAMS); + + String expected = "{" + + " \"processor_name\": \"testProcessor\"," + + " \"duration_millis\": 123," + + " \"status\": \"success\"," + + " \"input_data\": {\"key1\": \"value1\"}," + + " \"output_data\": [1, 2, 3]" + + "}"; + + XContentParser expectedParser = JsonXContent.jsonXContent.createParser( + this.xContentRegistry(), + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + expected + ); + XContentBuilder expectedBuilder = XContentBuilder.builder(JsonXContent.jsonXContent); + expectedBuilder.generator().copyCurrentStructure(expectedParser); + + assertEquals( + XContentHelper.convertToMap(BytesReference.bytes(expectedBuilder), false, (MediaType) MediaTypeRegistry.JSON), + XContentHelper.convertToMap(BytesReference.bytes(actualBuilder), false, (MediaType) MediaTypeRegistry.JSON) + ); + } + + public void testToXContentWithProcessorError() throws IOException { + ProcessorExecutionDetail detail = new ProcessorExecutionDetail( + "testProcessor", + 123L, + Map.of("key1", "value1"), + List.of(1, 2, 3), + ProcessorExecutionDetail.ProcessorStatus.FAIL, + "processor 1 fail", + "123" + ); + + XContentBuilder actualBuilder = XContentBuilder.builder(JsonXContent.jsonXContent); + detail.toXContent(actualBuilder, ToXContent.EMPTY_PARAMS); + + String expected = "{" + + " \"processor_name\": \"testProcessor\"," + + " \"tag\": \"123\"," + + " \"duration_millis\": 123," + + " \"status\": \"fail\"," + + " \"error\": \"processor 1 fail\"," + + " \"input_data\": {\"key1\": \"value1\"}," + + " \"output_data\": [1, 2, 3]" + + "}"; + + XContentParser expectedParser = JsonXContent.jsonXContent.createParser( + this.xContentRegistry(), + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + expected + ); + XContentBuilder expectedBuilder = XContentBuilder.builder(JsonXContent.jsonXContent); + expectedBuilder.generator().copyCurrentStructure(expectedParser); + + assertEquals( + XContentHelper.convertToMap(BytesReference.bytes(expectedBuilder), false, (MediaType) MediaTypeRegistry.JSON), + XContentHelper.convertToMap(BytesReference.bytes(actualBuilder), false, (MediaType) MediaTypeRegistry.JSON) + ); + } + + public void testFromXContent() throws IOException { + String json = "{" + + " \"processor_name\": \"testProcessor\"," + + " \"duration_millis\": 123," + + " \"input_data\": {\"key1\": \"value1\"}," + + " \"output_data\": [1, 2, 3]" + + "}"; + + try ( + XContentParser parser = JsonXContent.jsonXContent.createParser( + this.xContentRegistry(), + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + json + ) + ) { + ProcessorExecutionDetail detail = ProcessorExecutionDetail.fromXContent(parser); + + assertEquals("testProcessor", detail.getProcessorName()); + assertEquals(123L, detail.getDurationMillis()); + assertEquals(Map.of("key1", "value1"), detail.getInputData()); + assertEquals(List.of(1, 2, 3), detail.getOutputData()); + } + } + + public void testFromXContentWithPRocessorError() throws IOException { + String json = "{" + + " \"processor_name\": \"testProcessor\"," + + " \"duration_millis\": 123," + + " \"status\": \"fail\"," + + " \"error\": \"processor 1 fail\"," + + " \"input_data\": {\"key1\": \"value1\"}," + + " \"output_data\": [1, 2, 3]" + + "}"; + + try ( + XContentParser parser = JsonXContent.jsonXContent.createParser( + this.xContentRegistry(), + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + json + ) + ) { + ProcessorExecutionDetail detail = ProcessorExecutionDetail.fromXContent(parser); + + assertEquals("testProcessor", detail.getProcessorName()); + assertEquals(123L, detail.getDurationMillis()); + assertEquals(Map.of("key1", "value1"), detail.getInputData()); + assertEquals(List.of(1, 2, 3), detail.getOutputData()); + assertEquals(ProcessorExecutionDetail.ProcessorStatus.FAIL, detail.getStatus()); + } + } +} diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index b52205996f34b..9668bf57db40f 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -669,12 +669,14 @@ public void testTransformResponse() throws Exception { // First try without specifying a pipeline, which should be a no-op. SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(createDefaultSearchSourceBuilder()); PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse notTransformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); assertSame(searchResponse, notTransformedResponse); // Now apply a pipeline searchRequest = new SearchRequest().pipeline("p1"); + searchRequest.source(createDefaultSearchSourceBuilder()); pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse transformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); assertEquals(size, transformedResponse.getHits().getHits().length); @@ -762,6 +764,7 @@ public void testTransformSearchPhase() { // First try without specifying a pipeline, which should be a no-op. SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(createDefaultSearchSourceBuilder()); PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); AtomicArray notTransformedSearchPhaseResults = searchPhaseResults.getAtomicArray(); pipelinedRequest.transformSearchPhaseResults( @@ -774,6 +777,7 @@ public void testTransformSearchPhase() { // Now set the pipeline as p1 searchRequest = new SearchRequest().pipeline("p1"); + searchRequest.source(createDefaultSearchSourceBuilder()); pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); pipelinedRequest.transformSearchPhaseResults( @@ -792,6 +796,7 @@ public void testTransformSearchPhase() { // Check Processor doesn't run for between other phases searchRequest = new SearchRequest().pipeline("p1"); + searchRequest.source(createDefaultSearchSourceBuilder()); pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); AtomicArray notTransformedSearchPhaseResult = searchPhaseResults.getAtomicArray(); pipelinedRequest.transformSearchPhaseResults( @@ -1105,7 +1110,7 @@ public void testExceptionOnResponseProcessing() throws Exception { PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); - SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); + SearchResponse response = createDefaultSearchResponse(); // Exception thrown when processing response expectThrows(SearchPipelineProcessingException.class, () -> syncTransformResponse(pipelinedRequest, response)); } @@ -1169,7 +1174,7 @@ public void testCatchExceptionOnResponseProcessing() throws Exception { PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); - SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); + SearchResponse response = createDefaultSearchResponse(); // Caught Exception thrown when processing response and produced warn level logging message try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(Pipeline.class))) { @@ -1209,7 +1214,8 @@ public void testStats() throws Exception { SearchPipelineService searchPipelineService = getSearchPipelineService(requestProcessors, responseProcessors); SearchRequest request = new SearchRequest(); - SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); + request.source(createDefaultSearchSourceBuilder()); + SearchResponse response = createDefaultSearchResponse(); syncExecutePipeline( searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline"), indexNameExpressionResolver), @@ -1307,7 +1313,8 @@ public void testStatsEnabledIgnoreFailure() throws Exception { ); SearchRequest request = new SearchRequest(); - SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); + request.source(createDefaultSearchSourceBuilder()); + SearchResponse response = createDefaultSearchResponse(); syncExecutePipeline( searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline"), indexNameExpressionResolver), @@ -1578,9 +1585,12 @@ public void testStatefulProcessors() throws Exception { .build(); searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); - PipelinedRequest request = searchPipelineService.resolvePipeline(new SearchRequest().pipeline("p1"), indexNameExpressionResolver); + PipelinedRequest request = searchPipelineService.resolvePipeline( + new SearchRequest().source(createDefaultSearchSourceBuilder()).pipeline("p1"), + indexNameExpressionResolver + ); assertNull(contextHolder.get()); - syncExecutePipeline(request, new SearchResponse(null, null, 0, 0, 0, 0, null, null)); + syncExecutePipeline(request, createDefaultSearchResponse()); assertNotNull(contextHolder.get()); assertEquals("b", contextHolder.get()); } @@ -1757,4 +1767,184 @@ public void testInvalidIndexResolveIndexDefaultPipeline() throws Exception { assertEquals(5, pipelinedRequest.source().size()); } + public void testVerbosePipelineExecution() throws Exception { + SearchPipelineService searchPipelineService = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "verbose_pipeline", + new PipelineConfiguration( + "verbose_pipeline", + new BytesArray( + "{" + + "\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ]," + + "\"response_processors\": [ { \"fixed_score\": { \"score\": 5.0 } } ]" + + "}" + ), + MediaTypeRegistry.JSON + ) + ) + ); + + ClusterState initialState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState updatedState = ClusterState.builder(initialState) + .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + + searchPipelineService.applyClusterState(new ClusterChangedEvent("clusterStateUpdated", updatedState, initialState)); + + SearchRequest searchRequest = new SearchRequest().source(SearchSourceBuilder.searchSource().size(10)).pipeline("verbose_pipeline"); + searchRequest.source().verbosePipeline(true); + + PipelinedRequest pipelinedRequest = syncTransformRequest( + searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); + + SearchResponseSections sections = new SearchResponseSections( + new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0.0f), + null, + null, + false, + null, + null, + 1, + List.of(), + List.of() + ); + + SearchResponse searchResponse = new SearchResponse(sections, null, 0, 0, 0, 0, null, null); + SearchResponse transformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); + List executionDetails = transformedResponse.getInternalResponse().getProcessorResult(); + + assertNotNull(executionDetails); + assertEquals(2, executionDetails.size()); + assertEquals("scale_request_size", executionDetails.get(0).getProcessorName()); + assertEquals("fixed_score", executionDetails.get(1).getProcessorName()); + } + + public void testVerbosePipelineWithoutDefinedPipelineThrowsException() { + SearchPipelineService searchPipelineService = createWithProcessors(); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(SearchSourceBuilder.searchSource().verbosePipeline(true)); + + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); + assertTrue(e.getMessage(), e.getMessage().contains("The 'verbose pipeline' option requires a search pipeline to be defined.")); + } + + public void testVerbosePipelineWithRequestProcessorOnly() throws Exception { + SearchPipelineService searchPipelineService = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "request_only_pipeline", + new PipelineConfiguration( + "request_only_pipeline", + new BytesArray("{" + "\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ]" + "}"), + MediaTypeRegistry.JSON + ) + ) + ); + + ClusterState initialState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState updatedState = ClusterState.builder(initialState) + .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + + searchPipelineService.applyClusterState(new ClusterChangedEvent("clusterStateUpdated", updatedState, initialState)); + + SearchRequest searchRequest = new SearchRequest().source(SearchSourceBuilder.searchSource().size(10)) + .pipeline("request_only_pipeline"); + searchRequest.source().verbosePipeline(true); + + PipelinedRequest pipelinedRequest = syncTransformRequest( + searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); + + SearchResponseSections sections = new SearchResponseSections( + new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0.0f), + null, + null, + false, + null, + null, + 1, + List.of(), + List.of() + ); + + SearchResponse searchResponse = new SearchResponse(sections, null, 0, 0, 0, 0, null, null); + SearchResponse transformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); + List executionDetails = transformedResponse.getInternalResponse().getProcessorResult(); + + assertNotNull(executionDetails); + assertEquals(1, executionDetails.size()); + assertEquals("scale_request_size", executionDetails.get(0).getProcessorName()); + } + + public void testVerbosePipelineWithResponseProcessorOnly() throws Exception { + SearchPipelineService searchPipelineService = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "response_only_pipeline", + new PipelineConfiguration( + "response_only_pipeline", + new BytesArray("{" + "\"response_processors\": [ { \"fixed_score\": { \"score\": 5.0 } } ]" + "}"), + MediaTypeRegistry.JSON + ) + ) + ); + + ClusterState initialState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState updatedState = ClusterState.builder(initialState) + .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + + searchPipelineService.applyClusterState(new ClusterChangedEvent("clusterStateUpdated", updatedState, initialState)); + + SearchRequest searchRequest = new SearchRequest().source(SearchSourceBuilder.searchSource().size(10)) + .pipeline("response_only_pipeline"); + searchRequest.source().verbosePipeline(true); + + PipelinedRequest pipelinedRequest = syncTransformRequest( + searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); + + SearchResponseSections sections = new SearchResponseSections( + new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0.0f), + null, + null, + false, + null, + null, + 1, + List.of(), + List.of() + ); + + SearchResponse searchResponse = new SearchResponse(sections, null, 0, 0, 0, 0, null, null); + SearchResponse transformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); + List executionDetails = transformedResponse.getInternalResponse().getProcessorResult(); + + assertNotNull(executionDetails); + assertEquals(1, executionDetails.size()); + assertEquals("fixed_score", executionDetails.get(0).getProcessorName()); + } + + private SearchSourceBuilder createDefaultSearchSourceBuilder() { + return SearchSourceBuilder.searchSource().size(10); + } + + private SearchResponse createDefaultSearchResponse() { + SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0.0f); + + SearchResponseSections sections = new SearchResponseSections(searchHits, null, null, false, null, null, 1, List.of(), List.of()); + + return new SearchResponse(sections, null, 0, 0, 0, 0, null, null); + } + } diff --git a/server/src/test/java/org/opensearch/search/pipeline/TrackingSearchRequestProcessorWrapperTests.java b/server/src/test/java/org/opensearch/search/pipeline/TrackingSearchRequestProcessorWrapperTests.java new file mode 100644 index 0000000000000..adbbc9efc0686 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/pipeline/TrackingSearchRequestProcessorWrapperTests.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.core.action.ActionListener; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import org.mockito.Mockito; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; + +public class TrackingSearchRequestProcessorWrapperTests extends OpenSearchTestCase { + private SearchRequestProcessor mockProcessor; + private TrackingSearchRequestProcessorWrapper wrapper; + private PipelineProcessingContext context; + + @Before + public void setUp() throws Exception { + super.setUp(); + mockProcessor = Mockito.mock(SearchRequestProcessor.class); + wrapper = new TrackingSearchRequestProcessorWrapper(mockProcessor); + context = new PipelineProcessingContext(); + } + + public void testProcessRequestAsyncSuccess() { + SearchRequest inputRequest = new SearchRequest(); + inputRequest.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())); + + SearchRequest outputRequest = new SearchRequest(); + outputRequest.source(new SearchSourceBuilder().query(QueryBuilders.termQuery("field", "value"))); + + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(2); + listener.onResponse(outputRequest); + return null; + }).when(mockProcessor).processRequestAsync(any(SearchRequest.class), eq(context), any()); + + ActionListener listener = ActionListener.wrap(response -> { + assertEquals(outputRequest, response); + ProcessorExecutionDetail detail = context.getProcessorExecutionDetails().get(0); + assertEquals(wrapper.getType(), detail.getProcessorName()); + assertEquals(ProcessorExecutionDetail.ProcessorStatus.SUCCESS, detail.getStatus()); + }, e -> fail("Unexpected exception: " + e.getMessage())); + + wrapper.processRequestAsync(inputRequest, context, listener); + } + +} diff --git a/server/src/test/java/org/opensearch/search/pipeline/TrackingSearchResponseProcessorWrapperTests.java b/server/src/test/java/org/opensearch/search/pipeline/TrackingSearchResponseProcessorWrapperTests.java new file mode 100644 index 0000000000000..4075274948d30 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/pipeline/TrackingSearchResponseProcessorWrapperTests.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.core.action.ActionListener; +import org.opensearch.search.SearchHits; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import org.mockito.Mockito; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TrackingSearchResponseProcessorWrapperTests extends OpenSearchTestCase { + private SearchResponseProcessor mockProcessor; + private TrackingSearchResponseProcessorWrapper wrapper; + private PipelineProcessingContext context; + + @Before + public void setUp() throws Exception { + super.setUp(); + mockProcessor = Mockito.mock(SearchResponseProcessor.class); + wrapper = new TrackingSearchResponseProcessorWrapper(mockProcessor); + context = new PipelineProcessingContext(); + } + + public void testConstructorThrowsExceptionWhenProcessorIsNull() { + IllegalArgumentException exception = expectThrows( + IllegalArgumentException.class, + () -> new TrackingSearchResponseProcessorWrapper(null) + ); + + assertEquals("Wrapped processor cannot be null.", exception.getMessage()); + } + + public void testProcessResponseAsync() { + SearchRequest mockRequest = new SearchRequest(); + SearchResponse inputResponse = Mockito.mock(SearchResponse.class); + SearchResponse outputResponse = Mockito.mock(SearchResponse.class); + + when(inputResponse.getHits()).thenReturn(SearchHits.empty()); + when(outputResponse.getHits()).thenReturn(SearchHits.empty()); + + wrapper.processResponseAsync(mockRequest, inputResponse, context, new ActionListener<>() { + @Override + public void onResponse(SearchResponse result) { + assertEquals(outputResponse, result); + assertFalse(context.getProcessorExecutionDetails().isEmpty()); + ProcessorExecutionDetail detail = context.getProcessorExecutionDetails().get(0); + assertEquals(wrapper.getType(), detail.getProcessorName()); + assertNotNull(detail.getInputData()); + assertNotNull(detail.getOutputData()); + assertEquals(ProcessorExecutionDetail.ProcessorStatus.SUCCESS, detail.getStatus()); + } + + @Override + public void onFailure(Exception e) { + fail("Should not trigger failure"); + } + }); + + verify(mockProcessor).processResponseAsync(eq(mockRequest), eq(inputResponse), eq(context), any()); + } +}