diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d1010059028a..3652bfc1d330c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Make GRPC transport extensible to allow plugins to register and expose their own GRPC services ([#18516](https://github.com/opensearch-project/OpenSearch/pull/18516)) - Added approximation support for range queries with now in date field ([#18511](https://github.com/opensearch-project/OpenSearch/pull/18511)) - Upgrade to protobufs 0.6.0 and clean up deprecated TermQueryProtoUtils code ([#18880](https://github.com/opensearch-project/OpenSearch/pull/18880)) +- Expand fetch phase profiling to multi-shard queries ([#18887](https://github.com/opensearch-project/OpenSearch/pull/18887)) - Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877)) - APIs for stream transport and new stream-based search api action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/profile/fetch/FetchProfilerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/profile/fetch/FetchProfilerIT.java index d85d6bb08b211..069a222b899bf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/profile/fetch/FetchProfilerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/profile/fetch/FetchProfilerIT.java @@ -28,11 +28,6 @@ public class FetchProfilerIT extends OpenSearchIntegTestCase { - @Override - protected int numberOfShards() { - return 1; // Use a single shard to ensure all documents are in one shard - } - /** * This test verifies that the fetch profiler returns reasonable results for a simple match_all query */ diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java b/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java index 43132b5cf58ab..de8bfa0016414 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java @@ -332,7 +332,8 @@ public InternalSearchResponse merge( assert currentOffset == sortedDocs.length : "expected no more score doc slices"; } } - return reducedQueryPhase.buildResponse(hits); + + return reducedQueryPhase.buildResponse(hits, fetchResults, this); } private SearchHits getHits( @@ -736,11 +737,29 @@ public static final class ReducedQueryPhase { } /** - * Creates a new search response from the given merged hits. + * Creates a new search response from the given merged hits with fetch profile merging. + * @param hits the merged search hits + * @param fetchResults the fetch results to merge profiles from + * @param controller the SearchPhaseController instance to access mergeFetchProfiles method * @see #merge(boolean, ReducedQueryPhase, Collection, IntFunction) */ - public InternalSearchResponse buildResponse(SearchHits hits) { - return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases); + public InternalSearchResponse buildResponse( + SearchHits hits, + Collection fetchResults, + SearchPhaseController controller + ) { + SearchProfileShardResults mergedProfileResults = shardResults != null + ? controller.mergeFetchProfiles(shardResults, fetchResults) + : null; + return new InternalSearchResponse( + hits, + aggregations, + suggest, + mergedProfileResults, + timedOut, + terminatedEarly, + numReducePhases + ); } } @@ -870,4 +889,40 @@ static final class SortedTopDocs { this.collapseValues = collapseValues; } } + + /** + * Merges fetch phase profile results with query phase profile results. + * + * @param queryProfiles the query phase profile results (must not be null) + * @param fetchResults the fetch phase results to merge profiles from + * @return merged profile results containing both query and fetch phase data + */ + public SearchProfileShardResults mergeFetchProfiles( + SearchProfileShardResults queryProfiles, + Collection fetchResults + ) { + Map mergedResults = new HashMap<>(queryProfiles.getShardResults()); + + // Merge fetch profiles into existing query profiles + for (SearchPhaseResult fetchResult : fetchResults) { + if (fetchResult.fetchResult() != null && fetchResult.fetchResult().getProfileResults() != null) { + ProfileShardResult fetchProfile = fetchResult.fetchResult().getProfileResults(); + String shardId = fetchResult.getSearchShardTarget().toString(); + + ProfileShardResult existingProfile = mergedResults.get(shardId); + if (existingProfile != null) { + // Merge fetch profile data into existing query profile + ProfileShardResult merged = new ProfileShardResult( + existingProfile.getQueryProfileResults(), + existingProfile.getAggregationProfileResults(), + fetchProfile.getFetchProfileResult(), // Use fetch profile data + existingProfile.getNetworkTime() + ); + mergedResults.put(shardId, merged); + } + } + } + + return new SearchProfileShardResults(mergedResults); + } } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 2bb865820fcf8..94439fd098891 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -944,6 +944,13 @@ public void executeFetchPhase( SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, true, System.nanoTime()) ) { fetchPhase.execute(searchContext); + if (searchContext.getProfilers() != null) { + ProfileShardResult shardResults = SearchProfileShardResults.buildFetchOnlyShardResults( + searchContext.getProfilers(), + searchContext.request() + ); + searchContext.fetchResult().profileResults(shardResults); + } if (readerContext.singleSession()) { freeReaderContext(request.contextId()); } diff --git a/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java b/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java index 26fa90141c2a9..ecf0eb88c2bcf 100644 --- a/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java +++ b/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java @@ -32,6 +32,7 @@ package org.opensearch.search.fetch; +import org.opensearch.Version; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -40,6 +41,7 @@ import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchShardTarget; import org.opensearch.search.internal.ShardSearchContextId; +import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.query.QuerySearchResult; import java.io.IOException; @@ -55,6 +57,7 @@ public final class FetchSearchResult extends SearchPhaseResult { private SearchHits hits; // client side counter private transient int counter; + private ProfileShardResult profileShardResults; public FetchSearchResult() {} @@ -62,6 +65,11 @@ public FetchSearchResult(StreamInput in) throws IOException { super(in); contextId = new ShardSearchContextId(in); hits = new SearchHits(in); + if (in.getVersion().onOrAfter(Version.V_3_2_0)) { + profileShardResults = in.readOptionalWriteable(ProfileShardResult::new); + } else { + profileShardResults = null; + } } public FetchSearchResult(ShardSearchContextId id, SearchShardTarget shardTarget) { @@ -104,9 +112,20 @@ public int counterGetAndIncrement() { return counter++; } + public void profileResults(ProfileShardResult shardResults) { + this.profileShardResults = shardResults; + } + + public ProfileShardResult getProfileResults() { + return profileShardResults; + } + @Override public void writeTo(StreamOutput out) throws IOException { contextId.writeTo(out); hits.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_3_2_0)) { + out.writeOptionalWriteable(profileShardResults); + } } } diff --git a/server/src/main/java/org/opensearch/search/profile/SearchProfileShardResults.java b/server/src/main/java/org/opensearch/search/profile/SearchProfileShardResults.java index 1696604fe05e4..1d81c5e1d4f15 100644 --- a/server/src/main/java/org/opensearch/search/profile/SearchProfileShardResults.java +++ b/server/src/main/java/org/opensearch/search/profile/SearchProfileShardResults.java @@ -228,4 +228,30 @@ public static ProfileShardResult buildShardResults(Profilers profilers, ShardSea } return new ProfileShardResult(queryResults, aggResults, fetchResult, networkTime); } + + /** + * Helper method to build ProfileShardResult containing only fetch profile data. + * Used in multi-shard fetch phase where query profiling data is not available. + * + * @param profilers The {@link Profilers} to extract fetch data from + * @param request The shard search request + * @return A {@link ProfileShardResult} containing only fetch profile data + */ + public static ProfileShardResult buildFetchOnlyShardResults(Profilers profilers, ShardSearchRequest request) { + FetchProfiler fetchProfiler = profilers.getFetchProfiler(); + List fetchTree = fetchProfiler.getTree(); + FetchProfileShardResult fetchResult = new FetchProfileShardResult(fetchTree); + NetworkTime networkTime = new NetworkTime(0, 0); + if (request != null) { + networkTime.setInboundNetworkTime(request.getInboundNetworkTime()); + networkTime.setOutboundNetworkTime(request.getOutboundNetworkTime()); + } + // Return ProfileShardResult with empty query/agg results and only fetch data + return new ProfileShardResult( + Collections.emptyList(), // No query results in fetch-only phase + new AggregationProfileShardResult(Collections.emptyList()), // No aggregation results + fetchResult, + networkTime + ); + } } diff --git a/server/src/test/java/org/opensearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/opensearch/action/search/SearchPhaseControllerTests.java index 964f79d23447a..14f1488c01dbb 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchPhaseControllerTests.java @@ -74,9 +74,19 @@ import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.fetch.FetchSearchResult; +import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.internal.ShardSearchContextId; +import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.search.profile.NetworkTime; +import org.opensearch.search.profile.ProfileResult; +import org.opensearch.search.profile.ProfileShardResult; +import org.opensearch.search.profile.SearchProfileShardResults; +import org.opensearch.search.profile.aggregation.AggregationProfileShardResult; +import org.opensearch.search.profile.fetch.FetchProfileShardResult; +import org.opensearch.search.profile.query.CollectorResult; +import org.opensearch.search.profile.query.QueryProfileShardResult; import org.opensearch.search.query.QuerySearchResult; import org.opensearch.search.suggest.SortBy; import org.opensearch.search.suggest.Suggest; @@ -91,6 +101,7 @@ import org.junit.Before; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -330,8 +341,9 @@ public void testMerge() { } int nShards = randomIntBetween(1, 20); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); - AtomicArray queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false); for (int trackTotalHits : new int[] { SearchContext.TRACK_TOTAL_HITS_DISABLED, SearchContext.TRACK_TOTAL_HITS_ACCURATE }) { + // Generate fresh query results for each iteration since profiles get consumed + AtomicArray queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false); SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase( queryResults.asList(), new ArrayList<>(), @@ -370,6 +382,55 @@ public void testMerge() { } assertThat(suggestSize, lessThanOrEqualTo(maxSuggestSize)); assertThat(mergedResponse.hits().getHits().length, equalTo(reducedQueryPhase.sortedTopDocs.scoreDocs.length - suggestSize)); + + // Verify profile merging worked correctly (when profiles are present) + if (mergedResponse.profileResults != null) { + assertEquals("Should have profiles for all shards", nShards, mergedResponse.profileResults.getShardResults().size()); + + for (Map.Entry entry : mergedResponse.profileResults.getShardResults().entrySet()) { + ProfileShardResult mergedProfile = entry.getValue(); + + if (!mergedProfile.getQueryProfileResults().isEmpty()) { + QueryProfileShardResult queryResult = mergedProfile.getQueryProfileResults().get(0); + assertEquals( + "Main query should be BooleanQuery", + "BooleanQuery", + queryResult.getQueryResults().get(0).getQueryName() + ); + assertEquals("Should have 2 query children", 2, queryResult.getQueryResults().get(0).getProfiledChildren().size()); + assertEquals( + "Collector should be TotalHitCountCollector", + "TotalHitCountCollector", + queryResult.getCollectorResult().getName() + ); + } + + if (!mergedProfile.getAggregationProfileResults().getProfileResults().isEmpty()) { + ProfileResult aggProfile = mergedProfile.getAggregationProfileResults().getProfileResults().get(0); + assertEquals("Aggregation should be GlobalAggregator", "GlobalAggregator", aggProfile.getQueryName()); + assertEquals("Should have 1 aggregation child", 1, aggProfile.getProfiledChildren().size()); + } + + if (!mergedProfile.getFetchProfileResult().getFetchProfileResults().isEmpty()) { + ProfileResult fetchProfile = mergedProfile.getFetchProfileResult().getFetchProfileResults().get(0); + assertEquals("Fetch phase should be FetchPhase", "FetchPhase", fetchProfile.getQueryName()); + assertEquals("Should have 2 fetch children", 2, fetchProfile.getProfiledChildren().size()); + assertEquals( + "First fetch child should be LoadStoredFields", + "LoadStoredFields", + fetchProfile.getProfiledChildren().get(0).getQueryName() + ); + assertEquals( + "Second fetch child should be LoadDocValues", + "LoadDocValues", + fetchProfile.getProfiledChildren().get(1).getQueryName() + ); + } + + assertNotNull("Network time should be preserved", mergedProfile.getNetworkTime()); + } + } + Suggest suggestResult = mergedResponse.suggest(); for (Suggest.Suggestion suggestion : reducedQueryPhase.suggest) { assertThat(suggestion, instanceOf(CompletionSuggestion.class)); @@ -388,6 +449,215 @@ public void testMerge() { } } + public void testMergeFetchProfiles() { + int nShards = 2; + + List shardTargets = new ArrayList<>(); + for (int shardIndex = 0; shardIndex < nShards; shardIndex++) { + SearchShardTarget shardTarget = new SearchShardTarget( + "node_" + shardIndex, + new ShardId("index", "uuid", shardIndex), + null, + OriginalIndices.NONE + ); + shardTargets.add(shardTarget); + } + + Map queryProfileMap = new HashMap<>(); + + for (int shardIndex = 0; shardIndex < nShards; shardIndex++) { + List queryChildren = Arrays.asList( + new ProfileResult("TermQuery", "field:value", Collections.emptyMap(), Collections.emptyMap(), 15L, Collections.emptyList()), + new ProfileResult( + "BooleanQuery", + "must clauses", + Collections.emptyMap(), + Collections.emptyMap(), + 25L, + Collections.emptyList() + ) + ); + ProfileResult mainQuery = new ProfileResult( + "BooleanQuery", + "main query", + Collections.emptyMap(), + Collections.emptyMap(), + 100L, + queryChildren + ); + + List collectorChildren = Arrays.asList( + new CollectorResult("SimpleTopScoreDocCollector", "collecting docs", 30L, Collections.emptyList()) + ); + CollectorResult collectorResult = new CollectorResult("TotalHitCountCollector", "counting hits", 50L, collectorChildren); + + QueryProfileShardResult queryShardResult = new QueryProfileShardResult( + Arrays.asList(mainQuery), + 20L, // rewrite time + collectorResult + ); + + List aggChildren = Arrays.asList( + new ProfileResult( + "TermsAggregator", + "terms on field", + Collections.emptyMap(), + Collections.emptyMap(), + 40L, + Collections.emptyList() + ) + ); + ProfileResult aggProfile = new ProfileResult( + "GlobalAggregator", + "global agg", + Collections.emptyMap(), + Collections.emptyMap(), + 60L, + aggChildren + ); + AggregationProfileShardResult aggResults = new AggregationProfileShardResult(Arrays.asList(aggProfile)); + + // Create empty fetch profile (will be replaced by merge) + FetchProfileShardResult emptyFetchResults = new FetchProfileShardResult(Collections.emptyList()); + + NetworkTime networkTime = new NetworkTime(5L, 10L); + + ProfileShardResult queryProfile = new ProfileShardResult( + Arrays.asList(queryShardResult), + aggResults, + emptyFetchResults, + networkTime + ); + + String shardId = shardTargets.get(shardIndex).toString(); + queryProfileMap.put(shardId, queryProfile); + } + + SearchProfileShardResults queryProfiles = new SearchProfileShardResults(queryProfileMap); + + List fetchResults = new ArrayList<>(); + + for (int shardIndex = 0; shardIndex < nShards; shardIndex++) { + SearchShardTarget shardTarget = shardTargets.get(shardIndex); + + FetchSearchResult fetchResult = new FetchSearchResult(new ShardSearchContextId("", shardIndex), shardTarget); + + List fetchChildren = Arrays.asList( + new ProfileResult( + "LoadStoredFields", + "loading _source", + Collections.emptyMap(), + Collections.emptyMap(), + 12L, + Collections.emptyList() + ), + new ProfileResult( + "LoadDocValues", + "loading doc values", + Collections.emptyMap(), + Collections.emptyMap(), + 8L, + Collections.emptyList() + ) + ); + ProfileResult fetchPhase = new ProfileResult( + "FetchPhase", + "fetch documents", + Collections.emptyMap(), + Collections.emptyMap(), + 35L, + fetchChildren + ); + + FetchProfileShardResult fetchProfileShardResult = new FetchProfileShardResult(Arrays.asList(fetchPhase)); + + ProfileShardResult fetchProfileResult = new ProfileShardResult( + Collections.emptyList(), // no query results in fetch + new AggregationProfileShardResult(Collections.emptyList()), // no agg results in fetch + fetchProfileShardResult, + new NetworkTime(2L, 3L) + ); + + fetchResult.profileResults(fetchProfileResult); + fetchResults.add(fetchResult); + } + + SearchProfileShardResults mergedProfiles = searchPhaseController.mergeFetchProfiles(queryProfiles, fetchResults); + + assertNotNull("Merged profiles should not be null", mergedProfiles); + assertEquals("Should have profiles for all shards", nShards, mergedProfiles.getShardResults().size()); + + for (int shardIndex = 0; shardIndex < nShards; shardIndex++) { + String shardId = shardTargets.get(shardIndex).toString(); + ProfileShardResult mergedProfile = mergedProfiles.getShardResults().get(shardId); + + assertNotNull("Merged profile should exist for shard " + shardId, mergedProfile); + + List queryResults = mergedProfile.getQueryProfileResults(); + assertNotNull("Query profile results should be preserved", queryResults); + assertEquals("Should have one query profile result", 1, queryResults.size()); + + QueryProfileShardResult queryResult = queryResults.get(0); + assertEquals("Query rewrite time should be preserved", 20L, queryResult.getRewriteTime()); + assertNotNull("Collector result should be preserved", queryResult.getCollectorResult()); + assertEquals("Collector name should be preserved", "TotalHitCountCollector", queryResult.getCollectorResult().getName()); + assertEquals("Should have collector children", 1, queryResult.getCollectorResult().getProfiledChildren().size()); + + List queryProfileResults = queryResult.getQueryResults(); + assertEquals("Should have one main query", 1, queryProfileResults.size()); + ProfileResult mainQuery = queryProfileResults.get(0); + assertEquals("Main query name should be preserved", "BooleanQuery", mainQuery.getQueryName()); + assertEquals("Main query time should be preserved", 100L, mainQuery.getTime()); + assertEquals("Should have query children", 2, mainQuery.getProfiledChildren().size()); + + AggregationProfileShardResult aggResult = mergedProfile.getAggregationProfileResults(); + assertNotNull("Aggregation profile should be preserved", aggResult); + assertEquals("Should have one aggregation profile", 1, aggResult.getProfileResults().size()); + ProfileResult aggProfile = aggResult.getProfileResults().get(0); + assertEquals("Aggregation name should be preserved", "GlobalAggregator", aggProfile.getQueryName()); + assertEquals("Should have aggregation children", 1, aggProfile.getProfiledChildren().size()); + + FetchProfileShardResult fetchProfile = mergedProfile.getFetchProfileResult(); + assertNotNull("Fetch profile should be merged", fetchProfile); + assertEquals("Should have one fetch profile result", 1, fetchProfile.getFetchProfileResults().size()); + + ProfileResult fetchPhase = fetchProfile.getFetchProfileResults().get(0); + assertEquals("Fetch phase name should be correct", "FetchPhase", fetchPhase.getQueryName()); + assertEquals("Fetch phase time should be correct", 35L, fetchPhase.getTime()); + assertEquals("Should have fetch children", 2, fetchPhase.getProfiledChildren().size()); + + List fetchChildren = fetchPhase.getProfiledChildren(); + assertEquals("First fetch child should be LoadStoredFields", "LoadStoredFields", fetchChildren.get(0).getQueryName()); + assertEquals("First fetch child time should be correct", 12L, fetchChildren.get(0).getTime()); + assertEquals("Second fetch child should be LoadDocValues", "LoadDocValues", fetchChildren.get(1).getQueryName()); + assertEquals("Second fetch child time should be correct", 8L, fetchChildren.get(1).getTime()); + + NetworkTime networkTime = mergedProfile.getNetworkTime(); + assertNotNull("Network time should be preserved", networkTime); + assertEquals("Inbound network time should be from query phase", 5L, networkTime.getInboundNetworkTime()); + assertEquals("Outbound network time should be from query phase", 10L, networkTime.getOutboundNetworkTime()); + } + } + + public void testMergeFetchProfilesWithNullQueryProfiles() { + // Test that the caller properly handles null queryProfiles + SearchPhaseController searchPhaseController = new SearchPhaseController( + writableRegistry(), + r -> InternalAggregationTestCase.emptyReduceContextBuilder() + ); + + // Create some fetch results (though they won't be used since queryProfiles is null) + List fetchResults = new ArrayList<>(); + FetchSearchResult fetchResult = new FetchSearchResult(); + fetchResults.add(fetchResult); + + // The caller should handle null queryProfiles and not call mergeFetchProfiles + // This simulates what happens in buildResponse when shardResults is null + SearchProfileShardResults mergedProfiles = null != null ? searchPhaseController.mergeFetchProfiles(null, fetchResults) : null; + + assertNull("Merged profiles should be null when queryProfiles is null", mergedProfiles); + } + /** * Generate random query results received from the provided number of shards, including the provided * number of search hits and randomly generated completion suggestions based on the name and size of the provided ones. @@ -410,7 +680,11 @@ private static AtomicArray generateQueryResults( clusterAlias, OriginalIndices.NONE ); - QuerySearchResult querySearchResult = new QuerySearchResult(new ShardSearchContextId("", shardIndex), searchShardTarget, null); + QuerySearchResult querySearchResult = new QuerySearchResult( + new ShardSearchContextId("", shardIndex), + searchShardTarget, + createMockShardSearchRequest() + ); final TopDocs topDocs; float maxScore = 0; if (searchHitsSize == 0) { @@ -452,11 +726,99 @@ private static AtomicArray generateQueryResults( querySearchResult.size(searchHitsSize); querySearchResult.suggest(new Suggest(new ArrayList<>(shardSuggestion))); querySearchResult.setShardIndex(shardIndex); + + // Add query profile data + List queryChildren = Arrays.asList( + new ProfileResult( + "TermQuery", + "field:value", + Collections.emptyMap(), + Collections.emptyMap(), + randomIntBetween(10, 20), + Collections.emptyList() + ), + new ProfileResult( + "BooleanQuery", + "must clauses", + Collections.emptyMap(), + Collections.emptyMap(), + randomIntBetween(15, 30), + Collections.emptyList() + ) + ); + ProfileResult mainQuery = new ProfileResult( + "BooleanQuery", + "main query", + Collections.emptyMap(), + Collections.emptyMap(), + randomIntBetween(50, 100), + queryChildren + ); + + List collectorChildren = Arrays.asList( + new CollectorResult("SimpleTopScoreDocCollector", "collecting docs", randomIntBetween(20, 40), Collections.emptyList()) + ); + CollectorResult collectorResult = new CollectorResult( + "TotalHitCountCollector", + "counting hits", + randomIntBetween(30, 60), + collectorChildren + ); + + QueryProfileShardResult queryShardResult = new QueryProfileShardResult( + Arrays.asList(mainQuery), + randomIntBetween(5, 15), // rewrite time + collectorResult + ); + + // Create aggregation profile + List aggChildren = Arrays.asList( + new ProfileResult( + "TermsAggregator", + "terms on field", + Collections.emptyMap(), + Collections.emptyMap(), + randomIntBetween(25, 50), + Collections.emptyList() + ) + ); + ProfileResult aggProfile = new ProfileResult( + "GlobalAggregator", + "global agg", + Collections.emptyMap(), + Collections.emptyMap(), + randomIntBetween(40, 80), + aggChildren + ); + AggregationProfileShardResult aggResults = new AggregationProfileShardResult(Arrays.asList(aggProfile)); + + // Create empty fetch profile (will be replaced by merge) + FetchProfileShardResult emptyFetchResults = new FetchProfileShardResult(Collections.emptyList()); + + ProfileShardResult profileShardResult = new ProfileShardResult( + Arrays.asList(queryShardResult), + aggResults, + emptyFetchResults, + new NetworkTime(randomIntBetween(2, 8), randomIntBetween(3, 10)) + ); + + querySearchResult.profileResults(profileShardResult); + queryResults.set(shardIndex, querySearchResult); } return queryResults; } + // Helper method to create a mock ShardSearchRequest for testing + private static ShardSearchRequest createMockShardSearchRequest() { + // Create a minimal mock ShardSearchRequest with network timing + ShardSearchRequest request = new ShardSearchRequest(new ShardId("test", "test", 0), System.currentTimeMillis(), AliasFilter.EMPTY); + // Set network timing + request.setInboundNetworkTime(randomIntBetween(1, 5)); + request.setOutboundNetworkTime(randomIntBetween(1, 5)); + return request; + } + private static AtomicArray generateQueryResultsWithIntSortedField( int nShards, List suggestions, @@ -635,6 +997,46 @@ private static AtomicArray generateFetchResults(int nShards, } SearchHit[] hits = searchHits.toArray(new SearchHit[0]); fetchSearchResult.hits(new SearchHits(hits, new TotalHits(hits.length, Relation.EQUAL_TO), maxScore)); + + // Add fetch profile data + List fetchChildren = Arrays.asList( + new ProfileResult( + "LoadStoredFields", + "loading _source", + Collections.emptyMap(), + Collections.emptyMap(), + randomIntBetween(5, 15), + Collections.emptyList() + ), + new ProfileResult( + "LoadDocValues", + "loading doc values", + Collections.emptyMap(), + Collections.emptyMap(), + randomIntBetween(3, 10), + Collections.emptyList() + ) + ); + ProfileResult fetchPhase = new ProfileResult( + "FetchPhase", + "fetch documents", + Collections.emptyMap(), + Collections.emptyMap(), + randomIntBetween(20, 40), + fetchChildren + ); + + FetchProfileShardResult fetchProfileShardResult = new FetchProfileShardResult(Arrays.asList(fetchPhase)); + + ProfileShardResult fetchProfileResult = new ProfileShardResult( + Collections.emptyList(), // no query results in fetch + new AggregationProfileShardResult(Collections.emptyList()), // no agg results in fetch + fetchProfileShardResult, + new NetworkTime(randomIntBetween(1, 3), randomIntBetween(1, 3)) + ); + + fetchSearchResult.profileResults(fetchProfileResult); + fetchResults.set(shardIndex, fetchSearchResult); } return fetchResults; diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index 9cf0d543d0874..82b6bc3346524 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -1275,6 +1275,91 @@ public void testCreateReduceContext() { } } + public void testExecuteFetchPhaseWithProfiler() throws Exception { + createIndex("index"); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + SearchService service = getInstanceFromNode(SearchService.class); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index")); + IndexShard indexShard = indexService.getShard(0); + + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true).scroll(new Scroll(TimeValue.timeValueMinutes(1))); + searchRequest.source(new SearchSourceBuilder().profile(true)); + + ShardSearchRequest shardSearchRequest = new ShardSearchRequest( + OriginalIndices.NONE, + searchRequest, + indexShard.shardId(), + 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, + -1, + null, + null + ); + + PlainActionFuture queryFuture = new PlainActionFuture<>(); + SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); + service.executeQueryPhase(shardSearchRequest, randomBoolean(), task, queryFuture); + SearchPhaseResult queryResult = queryFuture.get(); + + List docIds = new ArrayList<>(); + docIds.add(0); + ShardFetchRequest fetchRequest = new ShardFetchRequest(queryResult.getContextId(), docIds, null); + + PlainActionFuture fetchFuture = new PlainActionFuture<>(); + service.executeFetchPhase(fetchRequest, task, fetchFuture); + FetchSearchResult fetchResult = fetchFuture.get(); + + assertNotNull("Profile results should be set when profiler is enabled", fetchResult.getProfileResults()); + assertNotNull("Profile results should contain fetch phase data", fetchResult.getProfileResults().getFetchProfileResult()); + + service.freeReaderContext(queryResult.getContextId()); + } + + public void testExecuteFetchPhaseWithoutProfiler() throws Exception { + createIndex("index"); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + SearchService service = getInstanceFromNode(SearchService.class); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index")); + IndexShard indexShard = indexService.getShard(0); + + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true).scroll(new Scroll(TimeValue.timeValueMinutes(1))); + searchRequest.source(new SearchSourceBuilder().profile(false)); + + ShardSearchRequest shardSearchRequest = new ShardSearchRequest( + OriginalIndices.NONE, + searchRequest, + indexShard.shardId(), + 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, + -1, + null, + null + ); + + PlainActionFuture queryFuture = new PlainActionFuture<>(); + SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); + service.executeQueryPhase(shardSearchRequest, randomBoolean(), task, queryFuture); + SearchPhaseResult queryResult = queryFuture.get(); + + List docIds = new ArrayList<>(); + docIds.add(0); + ShardFetchRequest fetchRequest = new ShardFetchRequest(queryResult.getContextId(), docIds, null); + + PlainActionFuture fetchFuture = new PlainActionFuture<>(); + service.executeFetchPhase(fetchRequest, task, fetchFuture); + FetchSearchResult fetchResult = fetchFuture.get(); + + assertNull("Profile results should be null when profiler is disabled", fetchResult.getProfileResults()); + + service.freeReaderContext(queryResult.getContextId()); + } + public void testCreateSearchContext() throws IOException { String index = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT); IndexService indexService = createIndex(index);