Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.List;

import static org.opensearch.common.util.FeatureFlags.STREAM_TRANSPORT;
import static org.opensearch.index.query.QueryBuilders.existsQuery;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;

Expand Down Expand Up @@ -290,8 +291,10 @@ private void assertAggregatorUsed(SearchResponse resp, Class<?> expectedAggregat
@LockFeatureFlag(STREAM_TRANSPORT)
public void testStreamingAggregationUsed() throws Exception {
// This test validates streaming aggregation with 3 shards, each with at least 3 segments
// Use existsQuery to avoid match-all optimization that would disable streaming
TermsAggregationBuilder agg = terms("agg1").field("field1").subAggregation(AggregationBuilders.max("agg2").field("field2"));
ActionFuture<SearchResponse> future = client().prepareStreamSearch("index")
.setQuery(existsQuery("field1"))
.addAggregation(agg)
.setSize(0)
.setRequestCache(false)
Expand Down Expand Up @@ -528,7 +531,9 @@ public void testOrderByMaxSubAggregationDescending() throws Exception {
.order(org.opensearch.search.aggregations.BucketOrder.aggregation("max_value", false))
.subAggregation(AggregationBuilders.max("max_value").field("value"));

// Use existsQuery to avoid match-all optimization that would disable streaming
SearchResponse resp = client().prepareStreamSearch("order_test")
.setQuery(existsQuery("category"))
.addAggregation(agg)
.setSize(0)
.setProfile(true)
Expand All @@ -555,7 +560,9 @@ public void testOrderByMaxSubAggregationAscending() throws Exception {
.order(org.opensearch.search.aggregations.BucketOrder.aggregation("max_value", true))
.subAggregation(AggregationBuilders.max("max_value").field("value"));

// Use existsQuery to avoid match-all optimization that would disable streaming
SearchResponse resp = client().prepareStreamSearch("order_test")
.setQuery(existsQuery("category"))
.addAggregation(agg)
.setSize(0)
.setProfile(true)
Expand All @@ -582,7 +589,9 @@ public void testOrderByCardinalitySubAggregationDescending() throws Exception {
.order(org.opensearch.search.aggregations.BucketOrder.aggregation("unique_users", false))
.subAggregation(AggregationBuilders.cardinality("unique_users").field("user_id"));

// Use existsQuery to avoid match-all optimization that would disable streaming
SearchResponse resp = client().prepareStreamSearch("order_test")
.setQuery(existsQuery("category"))
.addAggregation(agg)
.setSize(0)
.setProfile(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@

package org.opensearch.search.aggregations.bucket.terms;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.opensearch.core.ParseField;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.search.DocValueFormat;
Expand Down Expand Up @@ -63,6 +67,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

Expand Down Expand Up @@ -709,13 +714,84 @@ public StreamingCostMetrics estimateStreamingCost(SearchContext searchContext) {
return StreamingCostMetrics.nonStreamable();
}

if (valuesSource instanceof WithOrdinals || valuesSource instanceof ValuesSource.Numeric) {
if (valuesSource instanceof WithOrdinals ordinalsValuesSource) {
if (shouldDisableStreamingForOrdinals(searchContext, ordinalsValuesSource)) {
return StreamingCostMetrics.nonStreamable();
}
return new StreamingCostMetrics(true, segmentTopN);
}

if (valuesSource instanceof ValuesSource.Numeric) {
return new StreamingCostMetrics(true, segmentTopN);
}

return StreamingCostMetrics.nonStreamable();
}

/**
* Determines if streaming should be disabled for ordinal-based string terms aggregation.
*
* <p>Streaming is disabled when:
* <ul>
* <li>Low cardinality: max cardinality across segments is below the minimum bucket threshold</li>
* <li>Match-all optimization eligible: query is match-all and the majority of docs are in
* segments without deletions, allowing the traditional aggregator to use its
* term frequency optimization</li>
* </ul>
*/
private boolean shouldDisableStreamingForOrdinals(SearchContext searchContext, WithOrdinals valuesSource) {
List<LeafReaderContext> leaves = searchContext.searcher().getIndexReader().leaves();
if (leaves.isEmpty()) {
return false;
}

long minBucketCount = searchContext.getStreamingMinEstimatedBucketCount();
long maxCardinality = 0;
long totalDocs = 0;
// A segment is "clean" if it has no deletions
long docsInCleanSegments = 0;

for (LeafReaderContext leaf : leaves) {
try {
SortedSetDocValues docValues = valuesSource.ordinalsValues(leaf);
if (docValues != null) {
maxCardinality = Math.max(maxCardinality, docValues.getValueCount());
}
} catch (IOException e) {
// If we can't read doc values, skip this check
return false;
}

int segmentDocs = leaf.reader().maxDoc();
totalDocs += segmentDocs;

if (!leaf.reader().hasDeletions()) {
docsInCleanSegments += segmentDocs;
}
}

// Check 1: Low cardinality - streaming overhead not worth it
if (maxCardinality < minBucketCount) {
return true;
}

// Check 2: Match-all query with the majority of docs in clean segments
// Traditional aggregator can use term frequency optimization for these segments
if (isMatchAllQuery(searchContext.query())) {
double cleanRatio = totalDocs > 0 ? (double) docsInCleanSegments / totalDocs : 0;
return cleanRatio > 0.8;
}

return false;
}

/**
* Checks if the query is a match-all query.
*/
private static boolean isMatchAllQuery(Query query) {
return query instanceof MatchAllDocsQuery;
}

@Override
protected boolean supportsConcurrentSegmentSearch() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.opensearch.core.common.breaker.CircuitBreaker;
Expand All @@ -35,6 +41,7 @@
import java.io.IOException;

import static org.opensearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS;
import static org.mockito.Mockito.when;

/**
* Tests for factory-level streaming cost estimation.
Expand Down Expand Up @@ -519,6 +526,147 @@ public void testSiblingAggregationCostCombination() throws IOException {
}
}

// ========================================
// Streaming Fallback Tests
// ========================================

/**
* Test that string terms aggregation falls back to non-streamable when cardinality is low.
* Low cardinality means maxCardinality is less than minEstimatedBucketCount setting.
*/
public void testTermsFactoryFallbackLowCardinality() throws IOException {
try (Directory directory = newDirectory()) {
try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig())) {
// Create index with low cardinality (only 5 unique terms)
for (int i = 0; i < 100; i++) {
Document doc = new Document();
doc.add(new SortedSetDocValuesField("category", new BytesRef("cat_" + (i % 5))));
writer.addDocument(doc);
}

try (IndexReader reader = DirectoryReader.open(writer)) {
IndexSearcher searcher = newIndexSearcher(reader);
MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("category");

TermsAggregationBuilder termsBuilder = new TermsAggregationBuilder("terms").field("category").size(5);

// Create context with high minBucketCount so cardinality (5) < minBucketCount (1000)
FactoryAndContext result = createAggregatorFactoryWithMinBucketCount(termsBuilder, searcher, 1000L, fieldType);
StreamingCostMetrics metrics = ((StreamingCostEstimable) result.factory).estimateStreamingCost(result.searchContext);

assertFalse("Low cardinality should NOT be streamable", metrics.streamable());
}
}
}
}

/**
* Test that string terms aggregation falls back to non-streamable for match-all query
* when majority of segments have no deleted docs (traditional aggregator can use term frequency optimization).
*/
public void testTermsFactoryFallbackMatchAllWithCleanSegments() throws IOException {
try (Directory directory = newDirectory()) {
try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig())) {
// Create index with high cardinality (more than default minBucketCount)
for (int i = 0; i < 5000; i++) {
Document doc = new Document();
doc.add(new SortedSetDocValuesField("category", new BytesRef("cat_" + i)));
writer.addDocument(doc);
}

try (IndexReader reader = DirectoryReader.open(writer)) {
IndexSearcher searcher = newIndexSearcher(reader);
MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("category");

TermsAggregationBuilder termsBuilder = new TermsAggregationBuilder("terms").field("category").size(10);

// Create context with MatchAllDocsQuery - should fall back due to term frequency optimization
FactoryAndContext result = createAggregatorFactoryWithQuery(termsBuilder, searcher, new MatchAllDocsQuery(), fieldType);
StreamingCostMetrics metrics = ((StreamingCostEstimable) result.factory).estimateStreamingCost(result.searchContext);

assertFalse("Match-all with clean segments should NOT be streamable", metrics.streamable());
}
}
}
}

/**
* Test that string terms with match-all but deleted docs IS streamable
* (term frequency optimization won't work when segments have deletions).
*/
public void testTermsFactoryStreamableMatchAllWithDeletedDocs() throws IOException {
try (Directory directory = newDirectory()) {
// Use IndexWriterConfig that doesn't merge away deletions
IndexWriterConfig config = new IndexWriterConfig();
config.setMergePolicy(org.apache.lucene.index.NoMergePolicy.INSTANCE);

try (IndexWriter writer = new IndexWriter(directory, config)) {
// Create index with high cardinality in multiple batches to create multiple segments
for (int batch = 0; batch < 5; batch++) {
for (int i = 0; i < 1000; i++) {
Document doc = new Document();
String catValue = "cat_" + (batch * 1000 + i);
doc.add(new SortedSetDocValuesField("category", new BytesRef(catValue)));
// Add indexed field for deletion
doc.add(new org.apache.lucene.document.StringField("id", catValue, org.apache.lucene.document.Field.Store.NO));
writer.addDocument(doc);
}
writer.commit(); // Create a segment per batch
}

// Delete documents from all segments to make them "dirty"
for (int batch = 0; batch < 5; batch++) {
writer.deleteDocuments(new org.apache.lucene.index.Term("id", "cat_" + (batch * 1000)));
}
writer.commit();

try (IndexReader reader = DirectoryReader.open(writer)) {
IndexSearcher searcher = newIndexSearcher(reader);
MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("category");

TermsAggregationBuilder termsBuilder = new TermsAggregationBuilder("terms").field("category").size(10);

// Match-all but with deleted docs - should be streamable
FactoryAndContext result = createAggregatorFactoryWithQuery(termsBuilder, searcher, new MatchAllDocsQuery(), fieldType);
StreamingCostMetrics metrics = ((StreamingCostEstimable) result.factory).estimateStreamingCost(result.searchContext);

// With deletions in all segments, cleanRatio will be 0, so streaming is preferred
assertTrue("Match-all with deleted docs should be streamable", metrics.streamable());
}
}
}
}

/**
* Test that non-match-all query with high cardinality IS streamable.
*/
public void testTermsFactoryStreamableNonMatchAllQuery() throws IOException {
try (Directory directory = newDirectory()) {
try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig())) {
// Create index with high cardinality
for (int i = 0; i < 5000; i++) {
Document doc = new Document();
doc.add(new SortedSetDocValuesField("category", new BytesRef("cat_" + i)));
writer.addDocument(doc);
}

try (IndexReader reader = DirectoryReader.open(writer)) {
IndexSearcher searcher = newIndexSearcher(reader);
MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("category");

TermsAggregationBuilder termsBuilder = new TermsAggregationBuilder("terms").field("category").size(10);

// Non-match-all query (using a term query) - should be streamable even with clean segments
Query termQuery = new TermQuery(new Term("category", "cat_100"));
FactoryAndContext result = createAggregatorFactoryWithQuery(termsBuilder, searcher, termQuery, fieldType);
StreamingCostMetrics metrics = ((StreamingCostEstimable) result.factory).estimateStreamingCost(result.searchContext);

assertTrue("Non-match-all query should be streamable", metrics.streamable());
}
}
}
}

// ========================================
// Helper methods
// ========================================
Expand All @@ -533,13 +681,55 @@ private MultiBucketConsumerService.MultiBucketConsumer createBucketConsumer() {
/**
* Creates an AggregatorFactory from an AggregationBuilder using the test infrastructure.
* Returns the factory and the SearchContext used to create it.
* Uses a non-match-all query to avoid triggering the match-all optimization fallback.
*/
private FactoryAndContext createAggregatorFactory(
AggregationBuilder aggregationBuilder,
IndexSearcher searcher,
MappedFieldType... fieldTypes
) throws IOException {
// Use a BooleanQuery wrapper to avoid being detected as match-all
// This simulates a filtered query scenario
Query nonMatchAllQuery = new BooleanQuery.Builder().add(new MatchAllDocsQuery(), BooleanClause.Occur.MUST).build();
SearchContext searchContext = createSearchContext(
searcher,
createIndexSettings(),
nonMatchAllQuery,
createBucketConsumer(),
fieldTypes
);
QueryShardContext queryShardContext = searchContext.getQueryShardContext();
AggregatorFactory factory = aggregationBuilder.rewrite(queryShardContext).build(queryShardContext, null);
return new FactoryAndContext(factory, searchContext);
}

/**
* Creates an AggregatorFactory with a specific query.
*/
private FactoryAndContext createAggregatorFactoryWithQuery(
AggregationBuilder aggregationBuilder,
IndexSearcher searcher,
Query query,
MappedFieldType... fieldTypes
) throws IOException {
SearchContext searchContext = createSearchContext(searcher, createIndexSettings(), query, createBucketConsumer(), fieldTypes);
QueryShardContext queryShardContext = searchContext.getQueryShardContext();
AggregatorFactory factory = aggregationBuilder.rewrite(queryShardContext).build(queryShardContext, null);
return new FactoryAndContext(factory, searchContext);
}

/**
* Creates an AggregatorFactory with a mocked minEstimatedBucketCount.
*/
private FactoryAndContext createAggregatorFactoryWithMinBucketCount(
AggregationBuilder aggregationBuilder,
IndexSearcher searcher,
long minBucketCount,
MappedFieldType... fieldTypes
) throws IOException {
SearchContext searchContext = createSearchContext(searcher, createIndexSettings(), null, createBucketConsumer(), fieldTypes);
// Mock the minEstimatedBucketCount setting
when(searchContext.getStreamingMinEstimatedBucketCount()).thenReturn(minBucketCount);
QueryShardContext queryShardContext = searchContext.getQueryShardContext();
AggregatorFactory factory = aggregationBuilder.rewrite(queryShardContext).build(queryShardContext, null);
return new FactoryAndContext(factory, searchContext);
Expand Down
Loading