Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Make GRPC transport extensible to allow plugins to register and expose their own GRPC services ([#18516](https://github.com/opensearch-project/OpenSearch/pull/18516))
- Added approximation support for range queries with now in date field ([#18511](https://github.com/opensearch-project/OpenSearch/pull/18511))
- Upgrade to protobufs 0.6.0 and clean up deprecated TermQueryProtoUtils code ([#18880](https://github.com/opensearch-project/OpenSearch/pull/18880))
- Expand fetch phase profiling to multi-shard queries ([#18887](https://github.com/opensearch-project/OpenSearch/pull/18887))
- Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877))
- APIs for stream transport and new stream-based search api action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@

public class FetchProfilerIT extends OpenSearchIntegTestCase {

@Override
protected int numberOfShards() {
return 1; // Use a single shard to ensure all documents are in one shard
}

/**
* This test verifies that the fetch profiler returns reasonable results for a simple match_all query
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ public InternalSearchResponse merge(
assert currentOffset == sortedDocs.length : "expected no more score doc slices";
}
}
return reducedQueryPhase.buildResponse(hits);

return reducedQueryPhase.buildResponse(hits, fetchResults, this);
}

private SearchHits getHits(
Expand Down Expand Up @@ -736,11 +737,29 @@ public static final class ReducedQueryPhase {
}

/**
* Creates a new search response from the given merged hits.
* Creates a new search response from the given merged hits with fetch profile merging.
* @param hits the merged search hits
* @param fetchResults the fetch results to merge profiles from
* @param controller the SearchPhaseController instance to access mergeFetchProfiles method
* @see #merge(boolean, ReducedQueryPhase, Collection, IntFunction)
*/
public InternalSearchResponse buildResponse(SearchHits hits) {
return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases);
public InternalSearchResponse buildResponse(
SearchHits hits,
Collection<? extends SearchPhaseResult> fetchResults,
SearchPhaseController controller
) {
SearchProfileShardResults mergedProfileResults = shardResults != null
? controller.mergeFetchProfiles(shardResults, fetchResults)
: null;
return new InternalSearchResponse(
hits,
aggregations,
suggest,
mergedProfileResults,
timedOut,
terminatedEarly,
numReducePhases
);
}
}

Expand Down Expand Up @@ -870,4 +889,40 @@ static final class SortedTopDocs {
this.collapseValues = collapseValues;
}
}

/**
* Merges fetch phase profile results with query phase profile results.
*
* @param queryProfiles the query phase profile results (must not be null)
* @param fetchResults the fetch phase results to merge profiles from
* @return merged profile results containing both query and fetch phase data
*/
public SearchProfileShardResults mergeFetchProfiles(
SearchProfileShardResults queryProfiles,
Collection<? extends SearchPhaseResult> fetchResults
) {
Map<String, ProfileShardResult> mergedResults = new HashMap<>(queryProfiles.getShardResults());

// Merge fetch profiles into existing query profiles
for (SearchPhaseResult fetchResult : fetchResults) {
if (fetchResult.fetchResult() != null && fetchResult.fetchResult().getProfileResults() != null) {
ProfileShardResult fetchProfile = fetchResult.fetchResult().getProfileResults();
String shardId = fetchResult.getSearchShardTarget().toString();

ProfileShardResult existingProfile = mergedResults.get(shardId);
if (existingProfile != null) {
// Merge fetch profile data into existing query profile
ProfileShardResult merged = new ProfileShardResult(
existingProfile.getQueryProfileResults(),
existingProfile.getAggregationProfileResults(),
fetchProfile.getFetchProfileResult(), // Use fetch profile data
existingProfile.getNetworkTime()
);
mergedResults.put(shardId, merged);
}
}
}

return new SearchProfileShardResults(mergedResults);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,13 @@ public void executeFetchPhase(
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, true, System.nanoTime())
) {
fetchPhase.execute(searchContext);
if (searchContext.getProfilers() != null) {
ProfileShardResult shardResults = SearchProfileShardResults.buildFetchOnlyShardResults(
searchContext.getProfilers(),
searchContext.request()
);
searchContext.fetchResult().profileResults(shardResults);
}
if (readerContext.singleSession()) {
freeReaderContext(request.contextId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.search.fetch;

import org.opensearch.Version;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -40,6 +41,7 @@
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.query.QuerySearchResult;

import java.io.IOException;
Expand All @@ -55,13 +57,19 @@ public final class FetchSearchResult extends SearchPhaseResult {
private SearchHits hits;
// client side counter
private transient int counter;
private ProfileShardResult profileShardResults;

public FetchSearchResult() {}

public FetchSearchResult(StreamInput in) throws IOException {
super(in);
contextId = new ShardSearchContextId(in);
hits = new SearchHits(in);
if (in.getVersion().onOrAfter(Version.V_3_2_0)) {
profileShardResults = in.readOptionalWriteable(ProfileShardResult::new);
} else {
profileShardResults = null;
}
}

public FetchSearchResult(ShardSearchContextId id, SearchShardTarget shardTarget) {
Expand Down Expand Up @@ -104,9 +112,20 @@ public int counterGetAndIncrement() {
return counter++;
}

public void profileResults(ProfileShardResult shardResults) {
this.profileShardResults = shardResults;
}

public ProfileShardResult getProfileResults() {
return profileShardResults;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
contextId.writeTo(out);
hits.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_3_2_0)) {
out.writeOptionalWriteable(profileShardResults);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,30 @@ public static ProfileShardResult buildShardResults(Profilers profilers, ShardSea
}
return new ProfileShardResult(queryResults, aggResults, fetchResult, networkTime);
}

/**
* Helper method to build ProfileShardResult containing only fetch profile data.
* Used in multi-shard fetch phase where query profiling data is not available.
*
* @param profilers The {@link Profilers} to extract fetch data from
* @param request The shard search request
* @return A {@link ProfileShardResult} containing only fetch profile data
*/
public static ProfileShardResult buildFetchOnlyShardResults(Profilers profilers, ShardSearchRequest request) {
FetchProfiler fetchProfiler = profilers.getFetchProfiler();
List<ProfileResult> fetchTree = fetchProfiler.getTree();
FetchProfileShardResult fetchResult = new FetchProfileShardResult(fetchTree);
NetworkTime networkTime = new NetworkTime(0, 0);
if (request != null) {
networkTime.setInboundNetworkTime(request.getInboundNetworkTime());
networkTime.setOutboundNetworkTime(request.getOutboundNetworkTime());
}
// Return ProfileShardResult with empty query/agg results and only fetch data
return new ProfileShardResult(
Collections.emptyList(), // No query results in fetch-only phase
new AggregationProfileShardResult(Collections.emptyList()), // No aggregation results
fetchResult,
networkTime
);
}
}
Loading
Loading