diff --git a/plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java b/plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java index 71ac9cf558006..e9aa5cfe8e2d4 100644 --- a/plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java +++ b/plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java @@ -47,6 +47,7 @@ import java.util.List; import static org.opensearch.common.util.FeatureFlags.STREAM_TRANSPORT; +import static org.opensearch.index.query.QueryBuilders.existsQuery; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.search.aggregations.AggregationBuilders.terms; @@ -290,8 +291,10 @@ private void assertAggregatorUsed(SearchResponse resp, Class expectedAggregat @LockFeatureFlag(STREAM_TRANSPORT) public void testStreamingAggregationUsed() throws Exception { // This test validates streaming aggregation with 3 shards, each with at least 3 segments + // Use existsQuery to avoid match-all optimization that would disable streaming TermsAggregationBuilder agg = terms("agg1").field("field1").subAggregation(AggregationBuilders.max("agg2").field("field2")); ActionFuture future = client().prepareStreamSearch("index") + .setQuery(existsQuery("field1")) .addAggregation(agg) .setSize(0) .setRequestCache(false) @@ -528,7 +531,9 @@ public void testOrderByMaxSubAggregationDescending() throws Exception { .order(org.opensearch.search.aggregations.BucketOrder.aggregation("max_value", false)) .subAggregation(AggregationBuilders.max("max_value").field("value")); + // Use existsQuery to avoid match-all optimization that would disable streaming SearchResponse resp = client().prepareStreamSearch("order_test") + .setQuery(existsQuery("category")) .addAggregation(agg) .setSize(0) .setProfile(true) @@ -555,7 +560,9 @@ public void testOrderByMaxSubAggregationAscending() throws Exception { .order(org.opensearch.search.aggregations.BucketOrder.aggregation("max_value", true)) .subAggregation(AggregationBuilders.max("max_value").field("value")); + // Use existsQuery to avoid match-all optimization that would disable streaming SearchResponse resp = client().prepareStreamSearch("order_test") + .setQuery(existsQuery("category")) .addAggregation(agg) .setSize(0) .setProfile(true) @@ -582,7 +589,9 @@ public void testOrderByCardinalitySubAggregationDescending() throws Exception { .order(org.opensearch.search.aggregations.BucketOrder.aggregation("unique_users", false)) .subAggregation(AggregationBuilders.cardinality("unique_users").field("user_id")); + // Use existsQuery to avoid match-all optimization that would disable streaming SearchResponse resp = client().prepareStreamSearch("order_test") + .setQuery(existsQuery("category")) .addAggregation(agg) .setSize(0) .setProfile(true) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 98c026ac03730..4759edff68168 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -32,7 +32,11 @@ package org.opensearch.search.aggregations.bucket.terms; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; import org.opensearch.core.ParseField; import org.opensearch.index.query.QueryShardContext; import org.opensearch.search.DocValueFormat; @@ -63,6 +67,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.function.Function; @@ -709,13 +714,84 @@ public StreamingCostMetrics estimateStreamingCost(SearchContext searchContext) { return StreamingCostMetrics.nonStreamable(); } - if (valuesSource instanceof WithOrdinals || valuesSource instanceof ValuesSource.Numeric) { + if (valuesSource instanceof WithOrdinals ordinalsValuesSource) { + if (shouldDisableStreamingForOrdinals(searchContext, ordinalsValuesSource)) { + return StreamingCostMetrics.nonStreamable(); + } + return new StreamingCostMetrics(true, segmentTopN); + } + + if (valuesSource instanceof ValuesSource.Numeric) { return new StreamingCostMetrics(true, segmentTopN); } return StreamingCostMetrics.nonStreamable(); } + /** + * Determines if streaming should be disabled for ordinal-based string terms aggregation. + * + *

Streaming is disabled when: + *

+ */ + private boolean shouldDisableStreamingForOrdinals(SearchContext searchContext, WithOrdinals valuesSource) { + List leaves = searchContext.searcher().getIndexReader().leaves(); + if (leaves.isEmpty()) { + return false; + } + + long minBucketCount = searchContext.getStreamingMinEstimatedBucketCount(); + long maxCardinality = 0; + long totalDocs = 0; + // A segment is "clean" if it has no deletions + long docsInCleanSegments = 0; + + for (LeafReaderContext leaf : leaves) { + try { + SortedSetDocValues docValues = valuesSource.ordinalsValues(leaf); + if (docValues != null) { + maxCardinality = Math.max(maxCardinality, docValues.getValueCount()); + } + } catch (IOException e) { + // If we can't read doc values, skip this check + return false; + } + + int segmentDocs = leaf.reader().maxDoc(); + totalDocs += segmentDocs; + + if (!leaf.reader().hasDeletions()) { + docsInCleanSegments += segmentDocs; + } + } + + // Check 1: Low cardinality - streaming overhead not worth it + if (maxCardinality < minBucketCount) { + return true; + } + + // Check 2: Match-all query with the majority of docs in clean segments + // Traditional aggregator can use term frequency optimization for these segments + if (isMatchAllQuery(searchContext.query())) { + double cleanRatio = totalDocs > 0 ? (double) docsInCleanSegments / totalDocs : 0; + return cleanRatio > 0.8; + } + + return false; + } + + /** + * Checks if the query is a match-all query. + */ + private static boolean isMatchAllQuery(Query query) { + return query instanceof MatchAllDocsQuery; + } + @Override protected boolean supportsConcurrentSegmentSearch() { return true; diff --git a/server/src/test/java/org/opensearch/search/aggregations/FactoryStreamingCostEstimationTests.java b/server/src/test/java/org/opensearch/search/aggregations/FactoryStreamingCostEstimationTests.java index 69e3f10afa8fc..0074a106d1cb1 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/FactoryStreamingCostEstimationTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/FactoryStreamingCostEstimationTests.java @@ -15,7 +15,13 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.opensearch.core.common.breaker.CircuitBreaker; @@ -35,6 +41,7 @@ import java.io.IOException; import static org.opensearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; +import static org.mockito.Mockito.when; /** * Tests for factory-level streaming cost estimation. @@ -519,6 +526,147 @@ public void testSiblingAggregationCostCombination() throws IOException { } } + // ======================================== + // Streaming Fallback Tests + // ======================================== + + /** + * Test that string terms aggregation falls back to non-streamable when cardinality is low. + * Low cardinality means maxCardinality is less than minEstimatedBucketCount setting. + */ + public void testTermsFactoryFallbackLowCardinality() throws IOException { + try (Directory directory = newDirectory()) { + try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig())) { + // Create index with low cardinality (only 5 unique terms) + for (int i = 0; i < 100; i++) { + Document doc = new Document(); + doc.add(new SortedSetDocValuesField("category", new BytesRef("cat_" + (i % 5)))); + writer.addDocument(doc); + } + + try (IndexReader reader = DirectoryReader.open(writer)) { + IndexSearcher searcher = newIndexSearcher(reader); + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("category"); + + TermsAggregationBuilder termsBuilder = new TermsAggregationBuilder("terms").field("category").size(5); + + // Create context with high minBucketCount so cardinality (5) < minBucketCount (1000) + FactoryAndContext result = createAggregatorFactoryWithMinBucketCount(termsBuilder, searcher, 1000L, fieldType); + StreamingCostMetrics metrics = ((StreamingCostEstimable) result.factory).estimateStreamingCost(result.searchContext); + + assertFalse("Low cardinality should NOT be streamable", metrics.streamable()); + } + } + } + } + + /** + * Test that string terms aggregation falls back to non-streamable for match-all query + * when majority of segments have no deleted docs (traditional aggregator can use term frequency optimization). + */ + public void testTermsFactoryFallbackMatchAllWithCleanSegments() throws IOException { + try (Directory directory = newDirectory()) { + try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig())) { + // Create index with high cardinality (more than default minBucketCount) + for (int i = 0; i < 5000; i++) { + Document doc = new Document(); + doc.add(new SortedSetDocValuesField("category", new BytesRef("cat_" + i))); + writer.addDocument(doc); + } + + try (IndexReader reader = DirectoryReader.open(writer)) { + IndexSearcher searcher = newIndexSearcher(reader); + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("category"); + + TermsAggregationBuilder termsBuilder = new TermsAggregationBuilder("terms").field("category").size(10); + + // Create context with MatchAllDocsQuery - should fall back due to term frequency optimization + FactoryAndContext result = createAggregatorFactoryWithQuery(termsBuilder, searcher, new MatchAllDocsQuery(), fieldType); + StreamingCostMetrics metrics = ((StreamingCostEstimable) result.factory).estimateStreamingCost(result.searchContext); + + assertFalse("Match-all with clean segments should NOT be streamable", metrics.streamable()); + } + } + } + } + + /** + * Test that string terms with match-all but deleted docs IS streamable + * (term frequency optimization won't work when segments have deletions). + */ + public void testTermsFactoryStreamableMatchAllWithDeletedDocs() throws IOException { + try (Directory directory = newDirectory()) { + // Use IndexWriterConfig that doesn't merge away deletions + IndexWriterConfig config = new IndexWriterConfig(); + config.setMergePolicy(org.apache.lucene.index.NoMergePolicy.INSTANCE); + + try (IndexWriter writer = new IndexWriter(directory, config)) { + // Create index with high cardinality in multiple batches to create multiple segments + for (int batch = 0; batch < 5; batch++) { + for (int i = 0; i < 1000; i++) { + Document doc = new Document(); + String catValue = "cat_" + (batch * 1000 + i); + doc.add(new SortedSetDocValuesField("category", new BytesRef(catValue))); + // Add indexed field for deletion + doc.add(new org.apache.lucene.document.StringField("id", catValue, org.apache.lucene.document.Field.Store.NO)); + writer.addDocument(doc); + } + writer.commit(); // Create a segment per batch + } + + // Delete documents from all segments to make them "dirty" + for (int batch = 0; batch < 5; batch++) { + writer.deleteDocuments(new org.apache.lucene.index.Term("id", "cat_" + (batch * 1000))); + } + writer.commit(); + + try (IndexReader reader = DirectoryReader.open(writer)) { + IndexSearcher searcher = newIndexSearcher(reader); + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("category"); + + TermsAggregationBuilder termsBuilder = new TermsAggregationBuilder("terms").field("category").size(10); + + // Match-all but with deleted docs - should be streamable + FactoryAndContext result = createAggregatorFactoryWithQuery(termsBuilder, searcher, new MatchAllDocsQuery(), fieldType); + StreamingCostMetrics metrics = ((StreamingCostEstimable) result.factory).estimateStreamingCost(result.searchContext); + + // With deletions in all segments, cleanRatio will be 0, so streaming is preferred + assertTrue("Match-all with deleted docs should be streamable", metrics.streamable()); + } + } + } + } + + /** + * Test that non-match-all query with high cardinality IS streamable. + */ + public void testTermsFactoryStreamableNonMatchAllQuery() throws IOException { + try (Directory directory = newDirectory()) { + try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig())) { + // Create index with high cardinality + for (int i = 0; i < 5000; i++) { + Document doc = new Document(); + doc.add(new SortedSetDocValuesField("category", new BytesRef("cat_" + i))); + writer.addDocument(doc); + } + + try (IndexReader reader = DirectoryReader.open(writer)) { + IndexSearcher searcher = newIndexSearcher(reader); + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("category"); + + TermsAggregationBuilder termsBuilder = new TermsAggregationBuilder("terms").field("category").size(10); + + // Non-match-all query (using a term query) - should be streamable even with clean segments + Query termQuery = new TermQuery(new Term("category", "cat_100")); + FactoryAndContext result = createAggregatorFactoryWithQuery(termsBuilder, searcher, termQuery, fieldType); + StreamingCostMetrics metrics = ((StreamingCostEstimable) result.factory).estimateStreamingCost(result.searchContext); + + assertTrue("Non-match-all query should be streamable", metrics.streamable()); + } + } + } + } + // ======================================== // Helper methods // ======================================== @@ -533,13 +681,55 @@ private MultiBucketConsumerService.MultiBucketConsumer createBucketConsumer() { /** * Creates an AggregatorFactory from an AggregationBuilder using the test infrastructure. * Returns the factory and the SearchContext used to create it. + * Uses a non-match-all query to avoid triggering the match-all optimization fallback. */ private FactoryAndContext createAggregatorFactory( AggregationBuilder aggregationBuilder, IndexSearcher searcher, MappedFieldType... fieldTypes + ) throws IOException { + // Use a BooleanQuery wrapper to avoid being detected as match-all + // This simulates a filtered query scenario + Query nonMatchAllQuery = new BooleanQuery.Builder().add(new MatchAllDocsQuery(), BooleanClause.Occur.MUST).build(); + SearchContext searchContext = createSearchContext( + searcher, + createIndexSettings(), + nonMatchAllQuery, + createBucketConsumer(), + fieldTypes + ); + QueryShardContext queryShardContext = searchContext.getQueryShardContext(); + AggregatorFactory factory = aggregationBuilder.rewrite(queryShardContext).build(queryShardContext, null); + return new FactoryAndContext(factory, searchContext); + } + + /** + * Creates an AggregatorFactory with a specific query. + */ + private FactoryAndContext createAggregatorFactoryWithQuery( + AggregationBuilder aggregationBuilder, + IndexSearcher searcher, + Query query, + MappedFieldType... fieldTypes + ) throws IOException { + SearchContext searchContext = createSearchContext(searcher, createIndexSettings(), query, createBucketConsumer(), fieldTypes); + QueryShardContext queryShardContext = searchContext.getQueryShardContext(); + AggregatorFactory factory = aggregationBuilder.rewrite(queryShardContext).build(queryShardContext, null); + return new FactoryAndContext(factory, searchContext); + } + + /** + * Creates an AggregatorFactory with a mocked minEstimatedBucketCount. + */ + private FactoryAndContext createAggregatorFactoryWithMinBucketCount( + AggregationBuilder aggregationBuilder, + IndexSearcher searcher, + long minBucketCount, + MappedFieldType... fieldTypes ) throws IOException { SearchContext searchContext = createSearchContext(searcher, createIndexSettings(), null, createBucketConsumer(), fieldTypes); + // Mock the minEstimatedBucketCount setting + when(searchContext.getStreamingMinEstimatedBucketCount()).thenReturn(minBucketCount); QueryShardContext queryShardContext = searchContext.getQueryShardContext(); AggregatorFactory factory = aggregationBuilder.rewrite(queryShardContext).build(queryShardContext, null); return new FactoryAndContext(factory, searchContext);