diff --git a/CHANGELOG.md b/CHANGELOG.md index 51e725413c01f..d3a31cfe421f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Introduce concurrent translog recovery to accelerate segment replication primary promotion ([#20251](https://github.com/opensearch-project/OpenSearch/pull/20251)) - Update to `almalinux:10` ([#20482](https://github.com/opensearch-project/OpenSearch/pull/20482)) - Add X-Request-Id to uniquely identify a search request ([#19798](https://github.com/opensearch-project/OpenSearch/pull/19798)) +- Added TopN selection logic for streaming terms aggregations ([#20481](https://github.com/opensearch-project/OpenSearch/pull/20481)) ### Changed - Handle custom metadata files in subdirectory-store ([#20157](https://github.com/opensearch-project/OpenSearch/pull/20157)) diff --git a/plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java b/plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java index f4f6a5b726a48..846af582db40e 100644 --- a/plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java +++ b/plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java @@ -170,6 +170,88 @@ public void setUp() throws Exception { indexShardSegments.getShards()[0].getSegments().size() >= MIN_SEGMENTS_PER_SHARD ); }); + + // Create order_test index for string field sub-aggregation ordering tests + Settings orderIndexSettings = Settings.builder() + .put("index.number_of_shards", NUM_SHARDS) + .put("index.number_of_replicas", 0) + .put("index.search.concurrent_segment_search.mode", "none") + .put("index.merge.policy.max_merged_segment", "1kb") + .put("index.merge.policy.segments_per_tier", "20") + .put("index.merge.scheduler.max_thread_count", "1") + .build(); + + CreateIndexRequest orderIndexRequest = new CreateIndexRequest("order_test").settings(orderIndexSettings); + orderIndexRequest.mapping( + "{\"properties\":{\"category\":{\"type\":\"keyword\"},\"value\":{\"type\":\"integer\"},\"user_id\":{\"type\":\"keyword\"}}}", + XContentType.JSON + ); + client().admin().indices().create(orderIndexRequest).actionGet(); + client().admin().cluster().prepareHealth("order_test").setWaitForGreenStatus().setTimeout(TimeValue.timeValueSeconds(30)).get(); + + for (int seg = 0; seg < 3; seg++) { + BulkRequest orderBulkRequest = new BulkRequest(); + for (int i = 0; i < 10; i++) { + int uniqueUsers = (i + 1) * 2; + int docsPerSegment = uniqueUsers / 3 + (seg < uniqueUsers % 3 ? 1 : 0); + for (int j = 0; j < docsPerSegment; j++) { + orderBulkRequest.add( + new IndexRequest("order_test").source( + XContentType.JSON, + "category", + "cat_" + i, + "value", + (i + 1) * 100, + "user_id", + "user_" + (i * 100 + seg * 100 + j) + ) + ); + } + } + BulkResponse orderBulkResponse = client().bulk(orderBulkRequest).actionGet(); + assertFalse(orderBulkResponse.hasFailures()); + client().admin().indices().flush(new FlushRequest("order_test").force(true)).actionGet(); + client().admin().indices().refresh(new RefreshRequest("order_test")).actionGet(); + } + + // Create numeric_order_test index for numeric field sub-aggregation ordering tests + CreateIndexRequest numericOrderIndexRequest = new CreateIndexRequest("numeric_order_test").settings(orderIndexSettings); + numericOrderIndexRequest.mapping( + "{\"properties\":{\"category\":{\"type\":\"integer\"},\"value\":{\"type\":\"integer\"},\"user_id\":{\"type\":\"keyword\"}}}", + XContentType.JSON + ); + client().admin().indices().create(numericOrderIndexRequest).actionGet(); + client().admin() + .cluster() + .prepareHealth("numeric_order_test") + .setWaitForGreenStatus() + .setTimeout(TimeValue.timeValueSeconds(30)) + .get(); + + for (int seg = 0; seg < 3; seg++) { + BulkRequest numericBulkRequest = new BulkRequest(); + for (int i = 0; i < 10; i++) { + int uniqueUsers = (i + 1) * 2; + int docsPerSegment = uniqueUsers / 3 + (seg < uniqueUsers % 3 ? 1 : 0); + for (int j = 0; j < docsPerSegment; j++) { + numericBulkRequest.add( + new IndexRequest("numeric_order_test").source( + XContentType.JSON, + "category", + i, + "value", + (i + 1) * 100, + "user_id", + "user_" + (i * 100 + seg * 100 + j) + ) + ); + } + } + BulkResponse numericBulkResponse = client().bulk(numericBulkRequest).actionGet(); + assertFalse(numericBulkResponse.hasFailures()); + client().admin().indices().flush(new FlushRequest("numeric_order_test").force(true)).actionGet(); + client().admin().indices().refresh(new RefreshRequest("numeric_order_test")).actionGet(); + } } @Override @@ -188,6 +270,28 @@ public void tearDown() throws Exception { super.tearDown(); } + private void assertStreamingTermsUsed(SearchResponse resp, String expectedStrategy) { + assertNotNull("Profile response should be present", resp.getProfileResults()); + boolean foundStreaming = false; + for (var shardProfile : resp.getProfileResults().values()) { + List aggProfileResults = shardProfile.getAggregationProfileResults().getProfileResults(); + for (var profileResult : aggProfileResults) { + String queryName = profileResult.getQueryName(); + if (StreamStringTermsAggregator.class.getSimpleName().equals(queryName) + || StreamNumericTermsAggregator.class.getSimpleName().equals(queryName)) { + var debug = profileResult.getDebugInfo(); + if (debug != null && expectedStrategy.equals(debug.get("result_strategy"))) { + foundStreaming = true; + assertTrue("streaming_enabled should be true", (Boolean) debug.get("streaming_enabled")); + break; + } + } + } + if (foundStreaming) break; + } + assertTrue("Expected to find " + expectedStrategy + " in profile", foundStreaming); + } + @LockFeatureFlag(STREAM_TRANSPORT) public void testStreamingAggregationUsed() throws Exception { // This test validates streaming aggregation with 3 shards, each with at least 3 segments @@ -568,4 +672,215 @@ public void testStreamingCardinalityAsSubAggregation() throws Exception { ); } } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testOrderByMaxSubAggregationDescending() throws Exception { + TermsAggregationBuilder agg = terms("categories").field("category") + .size(3) + .order(org.opensearch.search.aggregations.BucketOrder.aggregation("max_value", false)) + .subAggregation(AggregationBuilders.max("max_value").field("value")); + + SearchResponse resp = client().prepareStreamSearch("order_test") + .addAggregation(agg) + .setSize(0) + .setProfile(true) + .execute() + .actionGet(); + + assertStreamingTermsUsed(resp, "streaming_terms"); + + StringTerms termsAgg = resp.getAggregations().get("categories"); + List buckets = termsAgg.getBuckets(); + assertEquals(3, buckets.size()); + assertEquals("cat_9", buckets.get(0).getKeyAsString()); + assertEquals(1000.0, ((Max) buckets.get(0).getAggregations().get("max_value")).getValue(), 0.001); + assertEquals("cat_8", buckets.get(1).getKeyAsString()); + assertEquals(900.0, ((Max) buckets.get(1).getAggregations().get("max_value")).getValue(), 0.001); + assertEquals("cat_7", buckets.get(2).getKeyAsString()); + assertEquals(800.0, ((Max) buckets.get(2).getAggregations().get("max_value")).getValue(), 0.001); + } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testOrderByMaxSubAggregationAscending() throws Exception { + TermsAggregationBuilder agg = terms("categories").field("category") + .size(3) + .order(org.opensearch.search.aggregations.BucketOrder.aggregation("max_value", true)) + .subAggregation(AggregationBuilders.max("max_value").field("value")); + + SearchResponse resp = client().prepareStreamSearch("order_test") + .addAggregation(agg) + .setSize(0) + .setProfile(true) + .execute() + .actionGet(); + + assertStreamingTermsUsed(resp, "streaming_terms"); + + StringTerms termsAgg = resp.getAggregations().get("categories"); + List buckets = termsAgg.getBuckets(); + assertEquals(3, buckets.size()); + assertEquals("cat_0", buckets.get(0).getKeyAsString()); + assertEquals(100.0, ((Max) buckets.get(0).getAggregations().get("max_value")).getValue(), 0.001); + assertEquals("cat_1", buckets.get(1).getKeyAsString()); + assertEquals(200.0, ((Max) buckets.get(1).getAggregations().get("max_value")).getValue(), 0.001); + assertEquals("cat_2", buckets.get(2).getKeyAsString()); + assertEquals(300.0, ((Max) buckets.get(2).getAggregations().get("max_value")).getValue(), 0.001); + } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testOrderByCardinalitySubAggregationDescending() throws Exception { + TermsAggregationBuilder agg = terms("categories").field("category") + .size(5) + .order(org.opensearch.search.aggregations.BucketOrder.aggregation("unique_users", false)) + .subAggregation(AggregationBuilders.cardinality("unique_users").field("user_id")); + + SearchResponse resp = client().prepareStreamSearch("order_test") + .addAggregation(agg) + .setSize(0) + .setProfile(true) + .execute() + .actionGet(); + + assertStreamingTermsUsed(resp, "streaming_terms"); + + StringTerms termsAgg = resp.getAggregations().get("categories"); + List buckets = termsAgg.getBuckets(); + assertEquals(5, buckets.size()); + assertEquals("cat_9", buckets.get(0).getKeyAsString()); + assertEquals("cat_8", buckets.get(1).getKeyAsString()); + assertEquals("cat_7", buckets.get(2).getKeyAsString()); + assertEquals("cat_6", buckets.get(3).getKeyAsString()); + assertEquals("cat_5", buckets.get(4).getKeyAsString()); + } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testNoSortOrderWithSubAgg() throws Exception { + // Default order: by doc count DESC, then by key ASC + // Expected top 5: cat_9(20 docs), cat_8(18 docs), cat_7(16 docs), cat_6(14 docs), cat_5(12 docs) + TermsAggregationBuilder agg = terms("categories").field("category") + .size(5) + .subAggregation(AggregationBuilders.max("max_value").field("value")); + + SearchResponse resp = client().prepareStreamSearch("order_test").addAggregation(agg).setSize(0).execute().actionGet(); + + StringTerms termsAgg = resp.getAggregations().get("categories"); + List buckets = termsAgg.getBuckets(); + assertEquals(5, buckets.size()); + + // Verify ordered by doc count DESC + assertEquals("cat_9", buckets.get(0).getKeyAsString()); + assertEquals(20, buckets.get(0).getDocCount()); + assertEquals("cat_8", buckets.get(1).getKeyAsString()); + assertEquals(18, buckets.get(1).getDocCount()); + assertEquals("cat_7", buckets.get(2).getKeyAsString()); + assertEquals(16, buckets.get(2).getDocCount()); + assertEquals("cat_6", buckets.get(3).getKeyAsString()); + assertEquals(14, buckets.get(3).getDocCount()); + assertEquals("cat_5", buckets.get(4).getKeyAsString()); + assertEquals(12, buckets.get(4).getDocCount()); + } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testNumericOrderByMaxSubAggregationDescending() throws Exception { + TermsAggregationBuilder agg = terms("categories").field("category") + .size(3) + .order(org.opensearch.search.aggregations.BucketOrder.aggregation("max_value", false)) + .subAggregation(AggregationBuilders.max("max_value").field("value")); + + SearchResponse resp = client().prepareStreamSearch("numeric_order_test") + .addAggregation(agg) + .setSize(0) + .setProfile(true) + .execute() + .actionGet(); + + assertStreamingTermsUsed(resp, "stream_long_terms"); + + LongTerms termsAgg = resp.getAggregations().get("categories"); + List buckets = termsAgg.getBuckets(); + assertEquals(3, buckets.size()); + assertEquals(9L, buckets.get(0).getKeyAsNumber().longValue()); + assertEquals(1000.0, ((Max) buckets.get(0).getAggregations().get("max_value")).getValue(), 0.001); + assertEquals(8L, buckets.get(1).getKeyAsNumber().longValue()); + assertEquals(900.0, ((Max) buckets.get(1).getAggregations().get("max_value")).getValue(), 0.001); + assertEquals(7L, buckets.get(2).getKeyAsNumber().longValue()); + assertEquals(800.0, ((Max) buckets.get(2).getAggregations().get("max_value")).getValue(), 0.001); + } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testNumericOrderByMaxSubAggregationAscending() throws Exception { + TermsAggregationBuilder agg = terms("categories").field("category") + .size(3) + .order(org.opensearch.search.aggregations.BucketOrder.aggregation("max_value", true)) + .subAggregation(AggregationBuilders.max("max_value").field("value")); + + SearchResponse resp = client().prepareStreamSearch("numeric_order_test") + .addAggregation(agg) + .setSize(0) + .setProfile(true) + .execute() + .actionGet(); + + assertStreamingTermsUsed(resp, "stream_long_terms"); + + LongTerms termsAgg = resp.getAggregations().get("categories"); + List buckets = termsAgg.getBuckets(); + assertEquals(3, buckets.size()); + assertEquals(0L, buckets.get(0).getKeyAsNumber().longValue()); + assertEquals(100.0, ((Max) buckets.get(0).getAggregations().get("max_value")).getValue(), 0.001); + assertEquals(1L, buckets.get(1).getKeyAsNumber().longValue()); + assertEquals(200.0, ((Max) buckets.get(1).getAggregations().get("max_value")).getValue(), 0.001); + assertEquals(2L, buckets.get(2).getKeyAsNumber().longValue()); + assertEquals(300.0, ((Max) buckets.get(2).getAggregations().get("max_value")).getValue(), 0.001); + } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testNumericOrderByCardinalitySubAggregationDescending() throws Exception { + TermsAggregationBuilder agg = terms("categories").field("category") + .size(5) + .order(org.opensearch.search.aggregations.BucketOrder.aggregation("unique_users", false)) + .subAggregation(AggregationBuilders.cardinality("unique_users").field("user_id")); + + SearchResponse resp = client().prepareStreamSearch("numeric_order_test") + .addAggregation(agg) + .setSize(0) + .setProfile(true) + .execute() + .actionGet(); + + assertStreamingTermsUsed(resp, "stream_long_terms"); + + LongTerms termsAgg = resp.getAggregations().get("categories"); + List buckets = termsAgg.getBuckets(); + assertEquals(5, buckets.size()); + assertEquals(9L, buckets.get(0).getKeyAsNumber().longValue()); + assertEquals(8L, buckets.get(1).getKeyAsNumber().longValue()); + assertEquals(7L, buckets.get(2).getKeyAsNumber().longValue()); + assertEquals(6L, buckets.get(3).getKeyAsNumber().longValue()); + assertEquals(5L, buckets.get(4).getKeyAsNumber().longValue()); + } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testNumericNoSortOrderWithSubAgg() throws Exception { + TermsAggregationBuilder agg = terms("categories").field("category") + .size(5) + .subAggregation(AggregationBuilders.max("max_value").field("value")); + + SearchResponse resp = client().prepareStreamSearch("numeric_order_test").addAggregation(agg).setSize(0).execute().actionGet(); + + LongTerms termsAgg = resp.getAggregations().get("categories"); + List buckets = termsAgg.getBuckets(); + assertEquals(5, buckets.size()); + + assertEquals(9L, buckets.get(0).getKeyAsNumber().longValue()); + assertEquals(20, buckets.get(0).getDocCount()); + assertEquals(8L, buckets.get(1).getKeyAsNumber().longValue()); + assertEquals(18, buckets.get(1).getDocCount()); + assertEquals(7L, buckets.get(2).getKeyAsNumber().longValue()); + assertEquals(16, buckets.get(2).getDocCount()); + assertEquals(6L, buckets.get(3).getKeyAsNumber().longValue()); + assertEquals(14, buckets.get(3).getDocCount()); + assertEquals(5L, buckets.get(4).getKeyAsNumber().longValue()); + assertEquals(12, buckets.get(4).getDocCount()); + } } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 2ea37077a4dbc..b049608117df1 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -171,6 +171,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.MAX_SLICES_PER_SCROLL, IndexSettings.MAX_SLICES_PER_PIT, IndexSettings.MAX_REGEX_LENGTH_SETTING, + IndexSettings.STREAMING_AGGREGATION_MIN_SHARD_SIZE_SETTING, ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING, ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING, ShardsLimitAllocationDecider.INDEX_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index d358171eb1295..b407ed5ccb772 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -633,6 +633,19 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); + /** + * Minimum shard size for streaming aggregations to ensure accuracy. + * This applies per-segment in streaming mode, not per-shard. + * Default is 1000. Can be adjusted based on accuracy requirements. + */ + public static final Setting STREAMING_AGGREGATION_MIN_SHARD_SIZE_SETTING = Setting.intSetting( + "index.aggregation.streaming.min_shard_size", + 1000, + 1, + Property.Dynamic, + Property.IndexScope + ); + public static final Setting DEFAULT_PIPELINE = new Setting<>( "index.default_pipeline", IngestService.NOOP_PIPELINE_NAME, @@ -998,6 +1011,10 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { * The maximum length of regex string allowed in a regexp query. */ private volatile int maxRegexLength; + /** + * The minimum shard size for streaming aggregations. + */ + private volatile int streamingAggregationMinShardSize; /** * The max amount of time to wait for merges @@ -1167,6 +1184,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti maxTermsCount = scopedSettings.get(MAX_TERMS_COUNT_SETTING); maxNestedQueryDepth = scopedSettings.get(MAX_NESTED_QUERY_DEPTH_SETTING); maxRegexLength = scopedSettings.get(MAX_REGEX_LENGTH_SETTING); + streamingAggregationMinShardSize = scopedSettings.get(STREAMING_AGGREGATION_MIN_SHARD_SIZE_SETTING); this.tieredMergePolicyProvider = new TieredMergePolicyProvider(logger, this); this.logByteSizeMergePolicyProvider = new LogByteSizeMergePolicyProvider(logger, this); this.indexSortConfig = new IndexSortConfig(this); @@ -1299,6 +1317,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields); scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter); scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength); + scopedSettings.addSettingsUpdateConsumer(STREAMING_AGGREGATION_MIN_SHARD_SIZE_SETTING, this::setStreamingAggregationMinShardSize); scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline); scopedSettings.addSettingsUpdateConsumer(FINAL_PIPELINE, this::setRequiredPipeline); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations); @@ -2052,6 +2071,17 @@ private void setMaxRegexLength(int maxRegexLength) { this.maxRegexLength = maxRegexLength; } + /** + * Returns the minimum shard size for streaming aggregations. + */ + public int getStreamingAggregationMinShardSize() { + return streamingAggregationMinShardSize; + } + + private void setStreamingAggregationMinShardSize(int streamingAggregationMinShardSize) { + this.streamingAggregationMinShardSize = streamingAggregationMinShardSize; + } + /** * Returns the index sort config that should be used for this index. */ diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java index 8ba8fe5ca35b5..f5e4761d3a02f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java @@ -8,13 +8,17 @@ package org.opensearch.search.aggregations.bucket.terms; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.util.IntroSelector; import org.apache.lucene.util.NumericUtils; import org.opensearch.common.Numbers; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; +import org.opensearch.common.util.IntArray; import org.opensearch.index.fielddata.FieldData; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.mapper.NumberFieldMapper.NumberFieldType; @@ -51,6 +55,7 @@ * @opensearch.internal */ public class StreamNumericTermsAggregator extends TermsAggregator implements Streamable { + private static final Logger logger = LogManager.getLogger(StreamNumericTermsAggregator.class); private final ResultStrategy resultStrategy; private final ValuesSource.Numeric valuesSource; private final IncludeExclude.LongFilter longFilter; @@ -79,6 +84,14 @@ public StreamNumericTermsAggregator( this.cardinality = cardinality; } + /** + * Returns the bucket order for this aggregator. + * @return the bucket order + */ + public BucketOrder getBucketOrder() { + return order; + } + @Override public void doReset() { super.doReset(); @@ -124,12 +137,27 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I return resultStrategy.buildAggregationsBatch(owningBucketOrds); } + /** + * Get segment size for TopN filtering at segment level. + * Returns max of requested shard_size and index-level min_shard_size setting. + */ + protected int getSegmentSize() { + int requestedShardSize = bucketCountThresholds.getShardSize(); + int minShardSize = context.indexShard().indexSettings().getStreamingAggregationMinShardSize(); + return Math.max(requestedShardSize, minShardSize); + } + /** * Strategy for building results. */ public abstract class ResultStrategy implements Releasable { + protected IntArray reusableIndices; + protected Aggregator.BucketComparator ordinalComparator; + protected B tempBucket1; + protected B tempBucket2; + private InternalAggregation[] buildAggregationsBatch(long[] owningBucketOrds) throws IOException { if (bucketOrds == null) { // no data collected InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; @@ -140,35 +168,151 @@ private InternalAggregation[] buildAggregationsBatch(long[] owningBucketOrds) th } LocalBucketCountThresholds localBucketCountThresholds = context.asLocalBucketCountThresholds(bucketCountThresholds); B[][] topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.length); - long[] otherDocCounts = new long[owningBucketOrds.length]; + long[] otherDocCount = new long[owningBucketOrds.length]; + int segmentSize = getSegmentSize(); + for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { checkCancelled(); collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]); LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); - List bucketsPerOwningOrd = new ArrayList<>(); - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts[ordIdx] += docCount; - if (docCount < localBucketCountThresholds.getMinDocCount()) { - continue; - } - B finalBucket = buildFinalBucket(ordsEnum, docCount, owningBucketOrds[ordIdx]); - bucketsPerOwningOrd.add(finalBucket); - } - topBucketsPerOrd[ordIdx] = buildBuckets(bucketsPerOwningOrd.size()); + long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); + logger.debug("Cardinality post collection for ordIdx {}: {}", ordIdx, bucketsInOrd); + + SelectionResult selectionResult = selectTopBuckets( + ordsEnum, + bucketsInOrd, + segmentSize, + bucketCountThresholds, + owningBucketOrds[ordIdx] + ); + + otherDocCount[ordIdx] = selectionResult.otherDocCount; + topBucketsPerOrd[ordIdx] = buildBuckets(selectionResult.buckets.size()); for (int i = 0; i < topBucketsPerOrd[ordIdx].length; i++) { - topBucketsPerOrd[ordIdx][i] = bucketsPerOwningOrd.get(i); + topBucketsPerOrd[ordIdx][i] = selectionResult.buckets.get(i); } } buildSubAggs(topBucketsPerOrd); InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx], topBucketsPerOrd[ordIdx]); + result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCount[ordIdx], topBucketsPerOrd[ordIdx]); } return result; } + private void prepareIndicesArray(long valueCount) { + if (reusableIndices == null) { + reusableIndices = context.bigArrays().newIntArray(valueCount, false); + } else if (reusableIndices.size() < valueCount) { + reusableIndices = context.bigArrays().grow(reusableIndices, valueCount); + } + } + + protected void ensureOrdinalComparator() { + // Override in subclasses if needed + } + + abstract B createTempBucket(); + + private static class SelectionResult { + final List buckets; + final long otherDocCount; + + SelectionResult(List buckets, long otherDocCount) { + this.buckets = buckets; + this.otherDocCount = otherDocCount; + } + } + + private SelectionResult selectTopBuckets( + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum, + long totalBuckets, + int segmentSize, + BucketCountThresholds thresholds, + long owningBucketOrd + ) throws IOException { + prepareIndicesArray(totalBuckets); + + int candidateCount = 0; + long totalDocCount = 0; + while (ordsEnum.next()) { + long docCount = StreamNumericTermsAggregator.this.bucketDocCount(ordsEnum.ord()); + totalDocCount += docCount; + if (docCount >= thresholds.getMinDocCount()) { + reusableIndices.set(candidateCount++, (int) ordsEnum.ord()); + } + } + + segmentSize = Math.min(segmentSize, candidateCount); + + if (candidateCount <= segmentSize) { + ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); + List result = new ArrayList<>(candidateCount); + long selectedDocCount = 0; + while (ordsEnum.next()) { + long docCount = StreamNumericTermsAggregator.this.bucketDocCount(ordsEnum.ord()); + if (docCount >= thresholds.getMinDocCount()) { + result.add(buildFinalBucket(ordsEnum.ord(), ordsEnum.value(), docCount, owningBucketOrd)); + selectedDocCount += docCount; + } + } + return new SelectionResult<>(result, totalDocCount - selectedDocCount); + } + + ensureOrdinalComparator(); + + IntroSelector selector = new IntroSelector() { + int pivotOrdinal; + + @Override + protected void swap(int i, int j) { + int temp = reusableIndices.get(i); + reusableIndices.set(i, reusableIndices.get(j)); + reusableIndices.set(j, temp); + } + + @Override + protected void setPivot(int i) { + pivotOrdinal = reusableIndices.get(i); + } + + @Override + protected int comparePivot(int j) { + long leftOrd = reusableIndices.get(j); + long rightOrd = pivotOrdinal; + if (ordinalComparator != null) { + return -ordinalComparator.compare(leftOrd, rightOrd); + } + // Fallback to doc count for _count ordering + long leftDocCount = StreamNumericTermsAggregator.this.bucketDocCount(leftOrd); + long rightDocCount = StreamNumericTermsAggregator.this.bucketDocCount(rightOrd); + return Long.compare(leftDocCount, rightDocCount); + } + }; + + selector.select(0, candidateCount, segmentSize); + + // Build result directly from selected ordinals (O(segmentSize) instead of O(totalBuckets * segmentSize)) + List result = new ArrayList<>(segmentSize); + long selectedDocCount = 0; + for (int i = 0; i < segmentSize; i++) { + int selectedOrd = reusableIndices.get(i); + long value = bucketOrds.get(selectedOrd); + long docCount = StreamNumericTermsAggregator.this.bucketDocCount(selectedOrd); + result.add(buildFinalBucket(selectedOrd, value, docCount, owningBucketOrd)); + selectedDocCount += docCount; + } + + return new SelectionResult<>(result, totalDocCount - selectedDocCount); + } + + @Override + public final void close() { + Releasables.close(reusableIndices); + reusableIndices = null; + } + /** * Short description of the collection mechanism added to the profile * output to help with debugging. @@ -223,7 +367,7 @@ private InternalAggregation[] buildAggregationsBatch(long[] owningBucketOrds) th /** * Build a final bucket directly with the provided data, skipping temporary bucket creation. */ - abstract B buildFinalBucket(LongKeyedBucketOrds.BucketOrdsEnum ordinal, long docCount, long owningBucketOrd) throws IOException; + abstract B buildFinalBucket(long ord, long value, long docCount, long owningBucketOrd) throws IOException; } abstract class StandardTermsResultStrategy, B extends InternalTerms.Bucket> extends @@ -234,6 +378,28 @@ abstract class StandardTermsResultStrategy, this.showTermDocCountError = showTermDocCountError; } + @Override + protected void ensureOrdinalComparator() { + if (ordinalComparator == null) { + if (isKeyOrder(order)) { + throw new IllegalArgumentException( + "Streaming aggregation does not support key-based ordering for numeric fields. " + + "Use traditional aggregation approach instead." + ); + } else if (partiallyBuiltBucketComparator != null) { + tempBucket1 = createTempBucket(); + tempBucket2 = createTempBucket(); + ordinalComparator = (leftOrd, rightOrd) -> { + tempBucket1.bucketOrd = leftOrd; + tempBucket1.docCount = StreamNumericTermsAggregator.this.bucketDocCount(leftOrd); + tempBucket2.bucketOrd = rightOrd; + tempBucket2.docCount = StreamNumericTermsAggregator.this.bucketDocCount(rightOrd); + return partiallyBuiltBucketComparator.compare(tempBucket1, tempBucket2); + }; + } + } + } + @Override final LeafBucketCollector wrapCollector(LeafBucketCollector primary) { return primary; @@ -268,9 +434,6 @@ final void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOExceptio } } } - - @Override - public final void close() {} } /** @@ -283,6 +446,20 @@ public LongTermsResults(boolean showTermDocCountError) { super(showTermDocCountError); } + @Override + LongTerms.Bucket createTempBucket() { + return new LongTerms.Bucket(0, 0, null, showTermDocCountError, 0, format) { + @Override + public int compareKey(LongTerms.Bucket other) { + // For tie-breaking when sub-aggregation values are equal, compare actual bucket values + // instead of ordinals. Ordinals are assigned dynamically and don't guarantee numeric order. + long thisValue = bucketOrds.get(this.bucketOrd); + long otherValue = bucketOrds.get(other.bucketOrd); + return Long.compare(thisValue, otherValue); + } + }; + } + @Override String describe() { return "stream_long_terms"; @@ -345,9 +522,9 @@ LongTerms buildEmptyResult() { } @Override - LongTerms.Bucket buildFinalBucket(LongKeyedBucketOrds.BucketOrdsEnum ordsEnum, long docCount, long owningBucketOrd) { - LongTerms.Bucket result = new LongTerms.Bucket(ordsEnum.value(), docCount, null, showTermDocCountError, 0, format); - result.bucketOrd = ordsEnum.ord(); + LongTerms.Bucket buildFinalBucket(long ord, long value, long docCount, long owningBucketOrd) { + LongTerms.Bucket result = new LongTerms.Bucket(value, docCount, null, showTermDocCountError, 0, format); + result.bucketOrd = ord; result.setDocCountError(0); return result; } @@ -364,6 +541,20 @@ public DoubleTermsResults(boolean showTermDocCountError) { super(showTermDocCountError); } + @Override + DoubleTerms.Bucket createTempBucket() { + return new DoubleTerms.Bucket(0.0, 0, null, showTermDocCountError, 0, format) { + @Override + public int compareKey(DoubleTerms.Bucket other) { + // For tie-breaking when sub-aggregation values are equal, compare actual bucket values + // instead of ordinals. Ordinals are assigned dynamically and don't guarantee numeric order. + long thisValue = bucketOrds.get(this.bucketOrd); + long otherValue = bucketOrds.get(other.bucketOrd); + return Double.compare(NumericUtils.sortableLongToDouble(thisValue), NumericUtils.sortableLongToDouble(otherValue)); + } + }; + } + @Override String describe() { return "stream_double_terms"; @@ -426,16 +617,16 @@ DoubleTerms buildEmptyResult() { } @Override - DoubleTerms.Bucket buildFinalBucket(LongKeyedBucketOrds.BucketOrdsEnum ordsEnum, long docCount, long owningBucketOrd) { + DoubleTerms.Bucket buildFinalBucket(long ord, long value, long docCount, long owningBucketOrd) { DoubleTerms.Bucket result = new DoubleTerms.Bucket( - NumericUtils.sortableLongToDouble(ordsEnum.value()), + NumericUtils.sortableLongToDouble(value), docCount, null, showTermDocCountError, 0, format ); - result.bucketOrd = ordsEnum.ord(); + result.bucketOrd = ord; result.setDocCountError(0); return result; } @@ -451,6 +642,20 @@ public UnsignedLongTermsResults(boolean showTermDocCountError) { super(showTermDocCountError); } + @Override + UnsignedLongTerms.Bucket createTempBucket() { + return new UnsignedLongTerms.Bucket(Numbers.toUnsignedBigInteger(0), 0, null, showTermDocCountError, 0, format) { + @Override + public int compareKey(UnsignedLongTerms.Bucket other) { + // For tie-breaking when sub-aggregation values are equal, compare actual bucket values + // instead of ordinals. Ordinals are assigned dynamically and don't guarantee numeric order. + long thisValue = bucketOrds.get(this.bucketOrd); + long otherValue = bucketOrds.get(other.bucketOrd); + return Long.compareUnsigned(thisValue, otherValue); + } + }; + } + @Override String describe() { return "stream_unsigned_long_terms"; @@ -513,16 +718,16 @@ UnsignedLongTerms buildEmptyResult() { } @Override - UnsignedLongTerms.Bucket buildFinalBucket(LongKeyedBucketOrds.BucketOrdsEnum ordsEnum, long docCount, long owningBucketOrd) { + UnsignedLongTerms.Bucket buildFinalBucket(long ord, long value, long docCount, long owningBucketOrd) { UnsignedLongTerms.Bucket result = new UnsignedLongTerms.Bucket( - Numbers.toUnsignedBigInteger(ordsEnum.value()), + Numbers.toUnsignedBigInteger(value), docCount, null, showTermDocCountError, 0, format ); - result.bucketOrd = ordsEnum.ord(); + result.bucketOrd = ord; result.setDocCountError(0); return result; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java index cb95b0efc2e75..6d149a1c5ee15 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java @@ -8,12 +8,17 @@ package org.opensearch.search.aggregations.bucket.terms; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.DocValues; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IntroSelector; import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; +import org.opensearch.common.util.IntArray; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -43,6 +48,7 @@ * Stream search terms aggregation */ public class StreamStringTermsAggregator extends AbstractStringTermsAggregator implements Streamable { + private static final Logger logger = LogManager.getLogger(StreamStringTermsAggregator.class); private SortedSetDocValues sortedDocValuesPerBatch; private long valueCount; private final ValuesSource.Bytes.WithOrdinals valuesSource; @@ -51,6 +57,10 @@ public class StreamStringTermsAggregator extends AbstractStringTermsAggregator i protected final ResultStrategy resultStrategy; private boolean leafCollectorCreated = false; + private Aggregator.BucketComparator ordinalComparator; + private StringTerms.Bucket tempBucket1; + private StringTerms.Bucket tempBucket2; + public StreamStringTermsAggregator( String name, AggregatorFactories factories, @@ -76,6 +86,43 @@ public void doReset() { valueCount = 0; sortedDocValuesPerBatch = null; this.leafCollectorCreated = false; + this.ordinalComparator = null; + this.tempBucket1 = null; + this.tempBucket2 = null; + } + + private void ensureOrdinalComparator() { + if (ordinalComparator == null) { + if (isKeyOrder(order)) { + // For key-based ordering, compare ordinals directly (alphabetical order) + // Reverse comparison for descending order + boolean ascending = InternalOrder.isKeyAsc(order); + ordinalComparator = (leftOrd, rightOrd) -> { + return ascending ? Long.compare(leftOrd, rightOrd) : Long.compare(rightOrd, leftOrd); + }; + } else if (partiallyBuiltBucketComparator != null) { + // For sub-aggregation ordering, use bucket comparator + tempBucket1 = new StringTerms.Bucket(null, 0, null, false, 0, format) { + @Override + public int compareKey(StringTerms.Bucket other) { + return Long.compare(this.bucketOrd, other.bucketOrd); + } + }; + tempBucket2 = new StringTerms.Bucket(null, 0, null, false, 0, format) { + @Override + public int compareKey(StringTerms.Bucket other) { + return Long.compare(this.bucketOrd, other.bucketOrd); + } + }; + ordinalComparator = (leftOrd, rightOrd) -> { + tempBucket1.bucketOrd = leftOrd; + tempBucket1.docCount = bucketDocCount(leftOrd); + tempBucket2.bucketOrd = rightOrd; + tempBucket2.docCount = bucketDocCount(rightOrd); + return partiallyBuiltBucketComparator.compare(tempBucket1, tempBucket2); + }; + } + } } @Override @@ -83,6 +130,12 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I return resultStrategy.buildAggregationsBatch(owningBucketOrds); } + protected int getSegmentSize() { + int requestedShardSize = bucketCountThresholds.getShardSize(); + int minShardSize = context.indexShard().indexSettings().getStreamingAggregationMinShardSize(); + return Math.max(requestedShardSize, minShardSize); + } + @Override public InternalAggregation buildEmptyAggregation() { return resultStrategy.buildEmptyResult(); @@ -98,8 +151,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol this.leafCollectorCreated = true; } this.sortedDocValuesPerBatch = valuesSource.ordinalsValues(ctx); - this.valueCount = sortedDocValuesPerBatch.getValueCount(); // for streaming case, the value count is reset to per batch - // cardinality + this.valueCount = sortedDocValuesPerBatch.getValueCount(); if (docCounts == null) { this.docCounts = context.bigArrays().newLongArray(valueCount, true); } else { @@ -174,10 +226,21 @@ public abstract class ResultStrategy bucketsPerOwningOrd = new ArrayList<>(); - for (long ordinal = 0; ordinal < valueCount; ordinal++) { - long docCount = bucketDocCount(ordinal); - if (bucketCountThresholds.getMinDocCount() == 0 || docCount > 0) { - if (docCount >= localBucketCountThresholds.getMinDocCount()) { - B finalBucket = buildFinalBucket(ordinal, docCount); - bucketsPerOwningOrd.add(finalBucket); - } - } - } - - // Get the top buckets - // ordered contains the top buckets for the owning bucket - topBucketsPerOwningOrd[ordIdx] = buildBuckets(bucketsPerOwningOrd.size()); + logger.debug("Cardinality post collection for ordIdx {}: {}", ordIdx, valueCount); + // using bucketCountThresholds since we don't do reduce across slice + // and send results per segment to coordinator + SelectionResult selectionResult = selectTopBuckets(segmentSize, bucketCountThresholds); + topBucketsPerOwningOrd[ordIdx] = buildBuckets(selectionResult.buckets.size()); for (int i = 0; i < topBucketsPerOwningOrd[ordIdx].length; i++) { - topBucketsPerOwningOrd[ordIdx][i] = bucketsPerOwningOrd.get(i); + topBucketsPerOwningOrd[ordIdx][i] = selectionResult.buckets.get(i); } + otherDocCount[ordIdx] = selectionResult.otherDocCount; } buildSubAggs(topBucketsPerOwningOrd); @@ -221,10 +278,103 @@ InternalAggregation[] buildAggregationsBatch(long[] owningBucketOrds) throws IOE return results; } - /** - * Short description of the collection mechanism added to the profile - * output to help with debugging. - */ + private static class SelectionResult { + final List buckets; + final long otherDocCount; + + SelectionResult(List buckets, long otherDocCount) { + this.buckets = buckets; + this.otherDocCount = otherDocCount; + } + } + + private SelectionResult selectTopBuckets(int segmentSize, BucketCountThresholds thresholds) throws IOException { + prepareIndicesArray(valueCount); + + int cnt = 0; + long totalDocCount = 0; + for (int i = 0; i < valueCount; i++) { + long docCount = bucketDocCount(i); + totalDocCount += docCount; + if (docCount >= thresholds.getMinDocCount()) { + reusableIndices.set(cnt++, i); + } + } + + segmentSize = Math.min(segmentSize, cnt); + + if (cnt <= segmentSize) { + List result = new ArrayList<>(); + long selectedDocCount = 0; + for (int i = 0; i < cnt; i++) { + long docCount = bucketDocCount(reusableIndices.get(i)); + result.add(buildFinalBucket(reusableIndices.get(i), docCount)); + selectedDocCount += docCount; + } + return new SelectionResult<>(result, totalDocCount - selectedDocCount); + } + + IntroSelector selector = new IntroSelector() { + int pivotOrdinal; + + @Override + protected void swap(int i, int j) { + int temp = reusableIndices.get(i); + reusableIndices.set(i, reusableIndices.get(j)); + reusableIndices.set(j, temp); + } + + @Override + protected void setPivot(int i) { + pivotOrdinal = reusableIndices.get(i); + } + + @Override + protected int comparePivot(int j) { + long leftOrd = reusableIndices.get(j); + long rightOrd = pivotOrdinal; + if (ordinalComparator != null) { + return -ordinalComparator.compare(leftOrd, rightOrd); + } + // Fallback to doc count for _count ordering + long leftDocCount = bucketDocCount(leftOrd); + long rightDocCount = bucketDocCount(rightOrd); + return Long.compare(leftDocCount, rightDocCount); + } + }; + + ensureOrdinalComparator(); + selector.select(0, cnt, segmentSize); + + int[] selected = new int[segmentSize]; + for (int i = 0; i < segmentSize; i++) { + selected[i] = reusableIndices.get(i); + } + + reusableIndices.fill(0, valueCount, 0); + for (int i = 0; i < segmentSize; i++) { + reusableIndices.set(selected[i], 1); + } + + List result = new ArrayList<>(segmentSize); + long selectedDocCount = 0; + for (int ordinal = 0; ordinal < valueCount; ordinal++) { + if (reusableIndices.get(ordinal) == 1) { + long docCount = bucketDocCount(ordinal); + result.add(buildFinalBucket(ordinal, docCount)); + selectedDocCount += docCount; + } + } + + return new SelectionResult<>(result, totalDocCount - selectedDocCount); + } + + @Override + public void close() { + Releasables.close(reusableIndices); + reusableIndices = null; + } + abstract String describe(); /** @@ -348,9 +498,6 @@ StringTerms.Bucket buildFinalBucket(long ordinal, long docCount) throws IOExcept result.setDocCountError(0); return result; } - - @Override - public void close() {} } @Override @@ -367,4 +514,9 @@ public void collectDebugInfo(BiConsumer add) { add.accept("streaming_estimated_docs", metrics.estimatedDocCount()); add.accept("streaming_segment_count", metrics.segmentCount()); } + + @Override + public void doClose() { + Releasables.close(resultStrategy); + } } diff --git a/server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java b/server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java index d06d533f5f960..8253367a42836 100644 --- a/server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java +++ b/server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java @@ -16,8 +16,11 @@ import org.opensearch.common.settings.Setting; import org.opensearch.search.aggregations.AggregatorBase; import org.opensearch.search.aggregations.MultiBucketCollector; +import org.opensearch.search.aggregations.bucket.terms.StreamNumericTermsAggregator; import org.opensearch.search.profile.aggregation.ProfilingAggregator; +import static org.opensearch.search.aggregations.InternalOrder.isKeyOrder; + /** * Analyzes collector trees to determine optimal {@link FlushMode} for streaming aggregations. * @@ -125,6 +128,10 @@ private static StreamingCostMetrics collectMetrics(Collector collector) { if (!nodeMetrics.isStreamable()) { return StreamingCostMetrics.nonStreamable(); } + // Reject numeric aggregators with key-based ordering + if (collector instanceof StreamNumericTermsAggregator numericAgg && isKeyOrder(numericAgg.getBucketOrder())) { + return StreamingCostMetrics.nonStreamable(); + } } else { return StreamingCostMetrics.nonStreamable(); } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregatorTests.java index 34585ac7c7777..b96ad2a2fc743 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregatorTests.java @@ -11,6 +11,7 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; @@ -19,12 +20,17 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.NumericUtils; +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.MockBigArrays; import org.opensearch.common.util.MockPageCacheRecycler; import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.mapper.KeywordFieldMapper; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.mapper.NumberFieldMapper; import org.opensearch.search.aggregations.AggregatorTestCase; @@ -33,6 +39,8 @@ import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.metrics.Avg; import org.opensearch.search.aggregations.metrics.AvgAggregationBuilder; +import org.opensearch.search.aggregations.metrics.Cardinality; +import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder; import org.opensearch.search.aggregations.metrics.InternalSum; import org.opensearch.search.aggregations.metrics.Max; import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder; @@ -1760,4 +1768,1216 @@ public void testCollectDebugInfo() throws IOException { } } } + + public void testOrderByMaxSubAggregationDescending() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create 10 categories with varying max values (size=3, total=10) + for (int i = 0; i < 10; i++) { + Document doc = new Document(); + doc.add(new NumericDocValuesField("category", i)); + doc.add(new NumericDocValuesField("value", (i + 1) * 100)); + indexWriter.addDocument(doc); + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new NumberFieldMapper.NumberFieldType( + "category", + NumberFieldMapper.NumberType.LONG + ); + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category") + .size(3) + .shardSize(3) + .order(BucketOrder.aggregation("max_value", false)) + .subAggregation(new MaxAggregationBuilder("max_value").field("value")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType, + valueFieldType + ); + + aggregator.preCollection(); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + LongTerms result = (LongTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + List buckets = result.getBuckets(); + assertThat(buckets.size(), equalTo(3)); + + assertThat(buckets.get(0).getKeyAsNumber().longValue(), equalTo(7L)); + assertThat(buckets.get(1).getKeyAsNumber().longValue(), equalTo(8L)); + assertThat(buckets.get(2).getKeyAsNumber().longValue(), equalTo(9L)); + + Max max0 = buckets.get(0).getAggregations().get("max_value"); + Max max1 = buckets.get(1).getAggregations().get("max_value"); + Max max2 = buckets.get(2).getAggregations().get("max_value"); + assertThat(max0.getValue(), equalTo(800.0)); + assertThat(max1.getValue(), equalTo(900.0)); + assertThat(max2.getValue(), equalTo(1000.0)); + + // Verify otherDocCount: 10 categories * 1 doc = 10 total, selected 3*1=3, other=7 + assertThat(result.getSumOfOtherDocCounts(), equalTo(7L)); + } + } + } + } + + public void testOrderByMaxSubAggregationAscending() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create 10 categories where TOP 3 by max ASC won't be alphabetically first + for (int i = 0; i < 10; i++) { + int numDocs = 5; + for (int j = 0; j < numDocs; j++) { + Document doc = new Document(); + doc.add(new NumericDocValuesField("category", i)); + doc.add(new NumericDocValuesField("value", (i + 1) * 100 + j)); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new NumberFieldMapper.NumberFieldType( + "category", + NumberFieldMapper.NumberType.LONG + ); + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category") + .size(3) + .shardSize(3) + .order(BucketOrder.aggregation("max_value", true)) + .subAggregation(new MaxAggregationBuilder("max_value").field("value")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType, + valueFieldType + ); + + aggregator.preCollection(); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + LongTerms result = (LongTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + List buckets = result.getBuckets(); + assertThat(buckets.size(), equalTo(3)); + + assertThat(buckets.get(0).getKeyAsNumber().longValue(), equalTo(0L)); + assertThat(buckets.get(1).getKeyAsNumber().longValue(), equalTo(1L)); + assertThat(buckets.get(2).getKeyAsNumber().longValue(), equalTo(2L)); + + Max max0 = buckets.get(0).getAggregations().get("max_value"); + Max max1 = buckets.get(1).getAggregations().get("max_value"); + Max max2 = buckets.get(2).getAggregations().get("max_value"); + assertThat(max0.getValue(), equalTo(104.0)); + assertThat(max1.getValue(), equalTo(204.0)); + assertThat(max2.getValue(), equalTo(304.0)); + + // Verify otherDocCount: 10 categories * 5 docs = 50 total, selected 3*5=15, other=35 + assertThat(result.getSumOfOtherDocCounts(), equalTo(35L)); + } + } + } + } + + public void testOrderByMinSubAggregation() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + for (int i = 0; i < 4; i++) { + for (int j = 0; j < 3; j++) { + Document doc = new Document(); + doc.add(new NumericDocValuesField("category", i)); + doc.add(new NumericDocValuesField("value", (i + 1) * 10 + j)); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new NumberFieldMapper.NumberFieldType( + "category", + NumberFieldMapper.NumberType.LONG + ); + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category") + .size(2) + .shardSize(2) + .order(BucketOrder.aggregation("min_value", true)) + .subAggregation(new MinAggregationBuilder("min_value").field("value")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType, + valueFieldType + ); + + aggregator.preCollection(); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + LongTerms result = (LongTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + assertThat(result.getBuckets().size(), equalTo(2)); + + List buckets = result.getBuckets(); + assertEquals(0L, buckets.get(0).getKeyAsNumber().longValue()); + assertEquals(1L, buckets.get(1).getKeyAsNumber().longValue()); + + assertEquals(6L, result.getSumOfOtherDocCounts()); + } + } + } + } + + public void testNoSortOrderWithSubAggregation() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create 10 categories where alphabetical order != count order + // 9=10 docs (highest), 8=9, 7=8, 6=7, 5=6, 4=5, 3=4, 2=3, 1=2, 0=1 (lowest) + for (int i = 0; i < 10; i++) { + int numDocs = 10 - i; + for (int j = 0; j < numDocs; j++) { + Document doc = new Document(); + doc.add(new NumericDocValuesField("category", 9 - i)); + doc.add(new NumericDocValuesField("value", (9 - i + 1) * 100 + j)); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new NumberFieldMapper.NumberFieldType( + "category", + NumberFieldMapper.NumberType.LONG + ); + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category") + .size(5) + .shardSize(5) + .subAggregation(new MaxAggregationBuilder("max_value").field("value")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType, + valueFieldType + ); + + aggregator.preCollection(); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + LongTerms result = (LongTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + List buckets = result.getBuckets(); + assertThat(buckets.size(), equalTo(5)); + + // Default order is count DESC, so top 5 should be 9, 8, 7, 6, 5 (highest counts) + // Returned in alphabetical order at shard level + assertThat(buckets.get(0).getKeyAsNumber().longValue(), equalTo(5L)); + assertThat(buckets.get(0).getDocCount(), equalTo(6L)); + assertThat(buckets.get(1).getKeyAsNumber().longValue(), equalTo(6L)); + assertThat(buckets.get(1).getDocCount(), equalTo(7L)); + assertThat(buckets.get(2).getKeyAsNumber().longValue(), equalTo(7L)); + assertThat(buckets.get(2).getDocCount(), equalTo(8L)); + assertThat(buckets.get(3).getKeyAsNumber().longValue(), equalTo(8L)); + assertThat(buckets.get(3).getDocCount(), equalTo(9L)); + assertThat(buckets.get(4).getKeyAsNumber().longValue(), equalTo(9L)); + assertThat(buckets.get(4).getDocCount(), equalTo(10L)); + + // Verify otherDocCount: total=55 docs (10+9+8+7+6+5+4+3+2+1), selected=40 (10+9+8+7+6), other=15 + assertThat(result.getSumOfOtherDocCounts(), equalTo(15L)); + } + } + } + } + + public void testOrderByAvgSubAggregation() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Category 0: values 10, 20 (avg=15) + Document doc = new Document(); + doc.add(new NumericDocValuesField("category", 0)); + doc.add(new NumericDocValuesField("value", 10)); + indexWriter.addDocument(doc); + doc = new Document(); + doc.add(new NumericDocValuesField("category", 0)); + doc.add(new NumericDocValuesField("value", 20)); + indexWriter.addDocument(doc); + + // Category 1: values 30, 40 (avg=35) + doc = new Document(); + doc.add(new NumericDocValuesField("category", 1)); + doc.add(new NumericDocValuesField("value", 30)); + indexWriter.addDocument(doc); + doc = new Document(); + doc.add(new NumericDocValuesField("category", 1)); + doc.add(new NumericDocValuesField("value", 40)); + indexWriter.addDocument(doc); + + // Category 2: values 50, 60 (avg=55) + doc = new Document(); + doc.add(new NumericDocValuesField("category", 2)); + doc.add(new NumericDocValuesField("value", 50)); + indexWriter.addDocument(doc); + doc = new Document(); + doc.add(new NumericDocValuesField("category", 2)); + doc.add(new NumericDocValuesField("value", 60)); + indexWriter.addDocument(doc); + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new NumberFieldMapper.NumberFieldType( + "category", + NumberFieldMapper.NumberType.LONG + ); + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category") + .size(2) + .shardSize(2) + .order(BucketOrder.aggregation("avg_value", false)) + .subAggregation(new AvgAggregationBuilder("avg_value").field("value")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType, + valueFieldType + ); + + aggregator.preCollection(); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + LongTerms result = (LongTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + assertThat(result.getBuckets().size(), equalTo(2)); + + List buckets = result.getBuckets(); + assertEquals(1L, buckets.get(0).getKeyAsNumber().longValue()); + assertEquals(35.0, ((Avg) buckets.get(0).getAggregations().get("avg_value")).getValue(), 0.001); + assertEquals(2L, buckets.get(1).getKeyAsNumber().longValue()); + assertEquals(55.0, ((Avg) buckets.get(1).getAggregations().get("avg_value")).getValue(), 0.001); + + assertEquals(2L, result.getSumOfOtherDocCounts()); + } + } + } + } + + public void testOrderBySumSubAggregation() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Category 0: 3 docs with value 10 each (sum=30) + for (int i = 0; i < 3; i++) { + Document doc = new Document(); + doc.add(new NumericDocValuesField("category", 0)); + doc.add(new NumericDocValuesField("value", 10)); + indexWriter.addDocument(doc); + } + + // Category 1: 2 docs with value 25 each (sum=50) + for (int i = 0; i < 2; i++) { + Document doc = new Document(); + doc.add(new NumericDocValuesField("category", 1)); + doc.add(new NumericDocValuesField("value", 25)); + indexWriter.addDocument(doc); + } + + // Category 2: 4 docs with value 20 each (sum=80) + for (int i = 0; i < 4; i++) { + Document doc = new Document(); + doc.add(new NumericDocValuesField("category", 2)); + doc.add(new NumericDocValuesField("value", 20)); + indexWriter.addDocument(doc); + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new NumberFieldMapper.NumberFieldType( + "category", + NumberFieldMapper.NumberType.LONG + ); + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category") + .size(2) + .shardSize(2) + .order(BucketOrder.aggregation("sum_value", false)) + .subAggregation(new SumAggregationBuilder("sum_value").field("value")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType, + valueFieldType + ); + + aggregator.preCollection(); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + LongTerms result = (LongTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + assertThat(result.getBuckets().size(), equalTo(2)); + + List buckets = result.getBuckets(); + assertEquals(1L, buckets.get(0).getKeyAsNumber().longValue()); + assertEquals(50.0, ((InternalSum) buckets.get(0).getAggregations().get("sum_value")).getValue(), 0.001); + assertEquals(2L, buckets.get(1).getKeyAsNumber().longValue()); + assertEquals(80.0, ((InternalSum) buckets.get(1).getAggregations().get("sum_value")).getValue(), 0.001); + + assertEquals(3L, result.getSumOfOtherDocCounts()); + } + } + } + } + + public void testMinDocCount() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create categories with varying doc counts: 0=1, 1=2, 2=3, 3=4, 4=5 + for (int i = 0; i < 5; i++) { + for (int j = 0; j <= i; j++) { + Document doc = new Document(); + doc.add(new NumericDocValuesField("category", i)); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new NumberFieldMapper.NumberFieldType( + "category", + NumberFieldMapper.NumberType.LONG + ); + + // Test with minDocCount=3, should only return categories 2, 3, 4 + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category").minDocCount(3); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + createIndexSettings(), + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType + ); + + aggregator.preCollection(); + assertEquals("strictly single segment", 1, indexSearcher.getIndexReader().leaves().size()); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + LongTerms result = (LongTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + List buckets = result.getBuckets(); + assertThat(buckets.size(), equalTo(3)); + + assertThat(buckets.get(0).getKeyAsNumber().longValue(), equalTo(2L)); + assertThat(buckets.get(0).getDocCount(), equalTo(3L)); + assertThat(buckets.get(1).getKeyAsNumber().longValue(), equalTo(3L)); + assertThat(buckets.get(1).getDocCount(), equalTo(4L)); + assertThat(buckets.get(2).getKeyAsNumber().longValue(), equalTo(4L)); + assertThat(buckets.get(2).getDocCount(), equalTo(5L)); + + // Verify otherDocCount: category 0=1 + category 1=2 = 3 docs excluded + assertThat(result.getSumOfOtherDocCounts(), equalTo(3L)); + } + } + } + } + + public void testOrderByCardinalitySubAggregationDescending() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create 10 categories with varying cardinality (size=5, total=10) + for (int i = 0; i < 10; i++) { + int uniqueUsers = (i + 1) * 2; // cat_0=2, cat_1=4, ..., cat_9=20 + for (int j = 0; j < uniqueUsers; j++) { + Document doc = new Document(); + doc.add(new NumericDocValuesField("category", i)); + doc.add(new SortedSetDocValuesField("user_id", new BytesRef("user_" + (i * 100 + j)))); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new NumberFieldMapper.NumberFieldType( + "category", + NumberFieldMapper.NumberType.LONG + ); + MappedFieldType userIdFieldType = new KeywordFieldMapper.KeywordFieldType("user_id"); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category") + .size(5) + .shardSize(5) + .order(BucketOrder.aggregation("unique_users", false)) + .subAggregation(new CardinalityAggregationBuilder("unique_users").field("user_id")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType, + userIdFieldType + ); + + aggregator.preCollection(); + assertEquals("strictly single segment", 1, indexSearcher.getIndexReader().leaves().size()); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + LongTerms result = (LongTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + List buckets = result.getBuckets(); + assertThat(buckets.size(), equalTo(5)); + + assertThat(buckets.get(0).getKeyAsNumber().longValue(), equalTo(5L)); + assertThat(buckets.get(1).getKeyAsNumber().longValue(), equalTo(6L)); + assertThat(buckets.get(2).getKeyAsNumber().longValue(), equalTo(7L)); + assertThat(buckets.get(3).getKeyAsNumber().longValue(), equalTo(8L)); + assertThat(buckets.get(4).getKeyAsNumber().longValue(), equalTo(9L)); + + Cardinality card0 = buckets.get(0).getAggregations().get("unique_users"); + Cardinality card1 = buckets.get(1).getAggregations().get("unique_users"); + Cardinality card2 = buckets.get(2).getAggregations().get("unique_users"); + Cardinality card3 = buckets.get(3).getAggregations().get("unique_users"); + Cardinality card4 = buckets.get(4).getAggregations().get("unique_users"); + assertThat(card0.getValue(), equalTo(12L)); + assertThat(card1.getValue(), equalTo(14L)); + assertThat(card2.getValue(), equalTo(16L)); + assertThat(card3.getValue(), equalTo(18L)); + assertThat(card4.getValue(), equalTo(20L)); + + // Verify otherDocCount: total docs = 2+4+6+8+10+12+14+16+18+20=110, selected=12+14+16+18+20=80, other=30 + assertThat(result.getSumOfOtherDocCounts(), equalTo(30L)); + } + } + } + } + + public void testOrderByCardinalitySubAggregationAscending() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create 10 categories where TOP 3 by cardinality ASC won't be alphabetically first + for (int i = 0; i < 10; i++) { + int uniqueUsers = (i + 1) * 2; + for (int j = 0; j < uniqueUsers; j++) { + Document doc = new Document(); + doc.add(new NumericDocValuesField("category", i)); + doc.add(new SortedSetDocValuesField("user_id", new BytesRef("user_" + (i * 100 + j)))); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new NumberFieldMapper.NumberFieldType( + "category", + NumberFieldMapper.NumberType.LONG + ); + MappedFieldType userIdFieldType = new KeywordFieldMapper.KeywordFieldType("user_id"); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category") + .size(3) + .shardSize(3) + .order(BucketOrder.aggregation("unique_users", true)) + .subAggregation(new CardinalityAggregationBuilder("unique_users").field("user_id")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType, + userIdFieldType + ); + + aggregator.preCollection(); + assertEquals("strictly single segment", 1, indexSearcher.getIndexReader().leaves().size()); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + LongTerms result = (LongTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + List buckets = result.getBuckets(); + assertThat(buckets.size(), equalTo(3)); + + assertThat(buckets.get(0).getKeyAsNumber().longValue(), equalTo(0L)); + assertThat(buckets.get(1).getKeyAsNumber().longValue(), equalTo(1L)); + assertThat(buckets.get(2).getKeyAsNumber().longValue(), equalTo(2L)); + + Cardinality card0 = buckets.get(0).getAggregations().get("unique_users"); + Cardinality card1 = buckets.get(1).getAggregations().get("unique_users"); + Cardinality card2 = buckets.get(2).getAggregations().get("unique_users"); + assertThat(card0.getValue(), equalTo(2L)); + assertThat(card1.getValue(), equalTo(4L)); + assertThat(card2.getValue(), equalTo(6L)); + + // Verify otherDocCount: total docs = 2+4+6+8+10+12+14+16+18+20=110, selected=2+4+6=12, other=98 + assertThat(result.getSumOfOtherDocCounts(), equalTo(98L)); + } + } + } + } + + public void testDoubleTermsWithSubAggregationOrdering() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + for (int i = 0; i < 3; i++) { + Document doc = new Document(); + doc.add(new NumericDocValuesField("price", NumericUtils.doubleToSortableLong(10.0 + i))); + doc.add(new NumericDocValuesField("quantity", (i + 1) * 10)); + indexWriter.addDocument(doc); + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType priceFieldType = new NumberFieldMapper.NumberFieldType("price", NumberFieldMapper.NumberType.DOUBLE); + MappedFieldType quantityFieldType = new NumberFieldMapper.NumberFieldType( + "quantity", + NumberFieldMapper.NumberType.LONG + ); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("prices").field("price") + .size(2) + .shardSize(2) + .order(BucketOrder.aggregation("max_qty", false)) + .subAggregation(new MaxAggregationBuilder("max_qty").field("quantity")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + priceFieldType, + quantityFieldType + ); + + aggregator.preCollection(); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + DoubleTerms result = (DoubleTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + assertThat(result.getBuckets().size(), equalTo(2)); + + List buckets = result.getBuckets(); + assertThat(buckets.get(0).getKeyAsNumber().doubleValue(), equalTo(11.0)); + assertThat(buckets.get(1).getKeyAsNumber().doubleValue(), equalTo(12.0)); + + Max max0 = buckets.get(0).getAggregations().get("max_qty"); + Max max1 = buckets.get(1).getAggregations().get("max_qty"); + assertThat(max0.getValue(), equalTo(20.0)); + assertThat(max1.getValue(), equalTo(30.0)); + } + } + } + } + + public void testUnsignedLongTermsWithSubAggregationOrdering() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + for (int i = 0; i < 3; i++) { + Document doc = new Document(); + doc.add(new NumericDocValuesField("id", Long.MAX_VALUE - i)); + doc.add(new NumericDocValuesField("score", (i + 1) * 100)); + indexWriter.addDocument(doc); + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType idFieldType = new NumberFieldMapper.NumberFieldType("id", NumberFieldMapper.NumberType.UNSIGNED_LONG); + MappedFieldType scoreFieldType = new NumberFieldMapper.NumberFieldType("score", NumberFieldMapper.NumberType.LONG); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("ids").field("id") + .size(2) + .shardSize(2) + .order(BucketOrder.aggregation("max_score", false)) + .subAggregation(new MaxAggregationBuilder("max_score").field("score")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + idFieldType, + scoreFieldType + ); + + aggregator.preCollection(); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + UnsignedLongTerms result = (UnsignedLongTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + assertThat(result.getBuckets().size(), equalTo(2)); + + List buckets = result.getBuckets(); + assertThat(buckets.get(0).getKeyAsNumber().longValue(), equalTo(Long.MAX_VALUE - 2)); + assertThat(buckets.get(1).getKeyAsNumber().longValue(), equalTo(Long.MAX_VALUE - 1)); + + Max max0 = buckets.get(0).getAggregations().get("max_score"); + Max max1 = buckets.get(1).getAggregations().get("max_score"); + assertThat(max0.getValue(), equalTo(300.0)); + assertThat(max1.getValue(), equalTo(200.0)); + } + } + } + } + + public void testKeyOrderWithSizeLimitDropsCorrectBuckets() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create terms where top 5 by count != first 5 numerically + long[] terms = { 95, 94, 93, 92, 91, 1, 2, 3, 4, 5 }; + int[] counts = { 100, 90, 80, 70, 60, 50, 40, 30, 20, 10 }; + for (int i = 0; i < terms.length; i++) { + for (int j = 0; j < counts[i]; j++) { + Document doc = new Document(); + doc.add(new NumericDocValuesField("field", terms[i])); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("field", NumberFieldMapper.NumberType.LONG); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("test").field("field") + .size(5) + .shardSize(5) + .order(BucketOrder.key(true)); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + fieldType + ); + + aggregator.preCollection(); + assertEquals("strictly single segment", 1, indexSearcher.getIndexReader().leaves().size()); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + // Streaming aggregation does not support key-based ordering for numeric fields + IllegalArgumentException exception = expectThrows( + IllegalArgumentException.class, + () -> aggregator.buildAggregations(new long[] { 0 }) + ); + assertThat( + exception.getMessage(), + equalTo( + "Streaming aggregation does not support key-based ordering for numeric fields. Use traditional aggregation approach instead." + ) + ); + } + } + } + } + + public void testKeyOrderDescendingWithSizeLimitDropsCorrectBuckets() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create terms where top 5 by count != last 5 numerically + long[] terms = { 95, 94, 93, 92, 91, 1, 2, 3, 4, 5 }; + int[] counts = { 100, 90, 80, 70, 60, 50, 40, 30, 20, 10 }; + for (int i = 0; i < terms.length; i++) { + for (int j = 0; j < counts[i]; j++) { + Document doc = new Document(); + doc.add(new NumericDocValuesField("field", terms[i])); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("field", NumberFieldMapper.NumberType.LONG); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("test").field("field") + .size(5) + .shardSize(5) + .order(BucketOrder.key(false)); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + fieldType + ); + + aggregator.preCollection(); + assertEquals("strictly single segment", 1, indexSearcher.getIndexReader().leaves().size()); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + // Streaming aggregation does not support key-based ordering for numeric fields + IllegalArgumentException exception = expectThrows( + IllegalArgumentException.class, + () -> aggregator.buildAggregations(new long[] { 0 }) + ); + assertThat( + exception.getMessage(), + equalTo( + "Streaming aggregation does not support key-based ordering for numeric fields. Use traditional aggregation approach instead." + ) + ); + } + } + } + } + + public void testLongTermsSubAggregationTieBreaking() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create buckets where all have same doc count and same max value + // This forces tie-breaking by key during quickselect + for (long key = 0; key < 20; key++) { + for (int doc = 0; doc < 5; doc++) { + Document d = new Document(); + d.add(new SortedNumericDocValuesField("number", key)); + d.add(new SortedNumericDocValuesField("value", 100)); // Same max for all + indexWriter.addDocument(d); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType numberFieldType = new NumberFieldMapper.NumberFieldType("number", NumberFieldMapper.NumberType.LONG); + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + .build(); + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("test").settings(settings).numberOfShards(1).numberOfReplicas(0).build(), + Settings.EMPTY + ); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("test").field("number") + .size(5) + .shardSize(5) + .subAggregation(new MaxAggregationBuilder("max_value").field("value")) + .order(BucketOrder.aggregation("max_value", false)); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + numberFieldType, + valueFieldType + ); + + aggregator.preCollection(); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + LongTerms result = (LongTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + List buckets = result.getBuckets(); + + // Should get 5 buckets, and they should be sorted by key after coordinator reduce + assertEquals(5, buckets.size()); + for (int i = 0; i < buckets.size() - 1; i++) { + assertTrue(buckets.get(i).getKeyAsNumber().longValue() < buckets.get(i + 1).getKeyAsNumber().longValue()); + } + } + } + } + } + + public void testDoubleTermsSubAggregationTieBreaking() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + for (long key = 0; key < 20; key++) { + for (int doc = 0; doc < 5; doc++) { + Document d = new Document(); + d.add(new SortedNumericDocValuesField("number", NumericUtils.doubleToSortableLong(key + 0.5))); + d.add(new SortedNumericDocValuesField("value", NumericUtils.doubleToSortableLong(100.0))); + indexWriter.addDocument(d); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType numberFieldType = new NumberFieldMapper.NumberFieldType("number", NumberFieldMapper.NumberType.DOUBLE); + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.DOUBLE); + + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + .build(); + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("test").settings(settings).numberOfShards(1).numberOfReplicas(0).build(), + Settings.EMPTY + ); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("test").field("number") + .size(5) + .shardSize(5) + .subAggregation(new MaxAggregationBuilder("max_value").field("value")) + .order(BucketOrder.aggregation("max_value", false)); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + numberFieldType, + valueFieldType + ); + + aggregator.preCollection(); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + DoubleTerms result = (DoubleTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + List buckets = result.getBuckets(); + + assertEquals(5, buckets.size()); + for (int i = 0; i < buckets.size() - 1; i++) { + assertTrue(buckets.get(i).getKeyAsNumber().doubleValue() < buckets.get(i + 1).getKeyAsNumber().doubleValue()); + } + } + } + } + } + + public void testUnsignedLongTermsSubAggregationTieBreaking() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + for (long key = 0; key < 20; key++) { + for (int doc = 0; doc < 5; doc++) { + Document d = new Document(); + d.add(new SortedNumericDocValuesField("number", key)); + d.add(new SortedNumericDocValuesField("value", 100)); + indexWriter.addDocument(d); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType numberFieldType = new NumberFieldMapper.NumberFieldType( + "number", + NumberFieldMapper.NumberType.UNSIGNED_LONG + ); + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType( + "value", + NumberFieldMapper.NumberType.UNSIGNED_LONG + ); + + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + .build(); + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("test").settings(settings).numberOfShards(1).numberOfReplicas(0).build(), + Settings.EMPTY + ); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("test").field("number") + .size(5) + .shardSize(5) + .subAggregation(new MaxAggregationBuilder("max_value").field("value")) + .order(BucketOrder.aggregation("max_value", false)); + + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + numberFieldType, + valueFieldType + ); + + aggregator.preCollection(); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + UnsignedLongTerms result = (UnsignedLongTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + List buckets = result.getBuckets(); + + assertEquals(5, buckets.size()); + for (int i = 0; i < buckets.size() - 1; i++) { + assertTrue(buckets.get(i).getKeyAsNumber().longValue() < buckets.get(i + 1).getKeyAsNumber().longValue()); + } + } + } + } + } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregatorTests.java index 112a47527d0f0..af9e7d492be16 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregatorTests.java @@ -20,9 +20,11 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.util.BytesRef; +import org.opensearch.Version; import org.opensearch.action.OriginalIndices; import org.opensearch.action.search.SearchShardTask; import org.opensearch.action.support.StreamSearchChannelListener; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.MockBigArrays; import org.opensearch.common.util.MockPageCacheRecycler; @@ -30,6 +32,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.index.IndexSettings; import org.opensearch.index.mapper.KeywordFieldMapper; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.mapper.NumberFieldMapper; @@ -42,6 +45,8 @@ import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.metrics.Avg; import org.opensearch.search.aggregations.metrics.AvgAggregationBuilder; +import org.opensearch.search.aggregations.metrics.Cardinality; +import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder; import org.opensearch.search.aggregations.metrics.InternalSum; import org.opensearch.search.aggregations.metrics.Max; import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder; @@ -294,11 +299,8 @@ public void testBuildAggregationsBatchWithSize() throws Exception { StringTerms result = (StringTerms) aggregator.buildAggregations(new long[] { 0 })[0]; assertThat(result, notNullValue()); - // For streaming aggregator, size limitation may not be applied at buildAggregations level - // but rather handled during the reduce phase. Test that we get all terms for this batch. assertThat(result.getBuckets().size(), equalTo(10)); - // Verify each term appears exactly twice (20 docs / 10 unique terms) for (StringTerms.Bucket bucket : result.getBuckets()) { assertThat(bucket.getDocCount(), equalTo(2L)); assertThat(bucket.getKeyAsString().startsWith("term_"), equalTo(true)); @@ -1056,7 +1058,6 @@ public void testReduceSimple() throws Exception { StringTerms terms = (StringTerms) reduced; assertThat(terms.getBuckets().size(), equalTo(3)); - // Check that electronics bucket has count 2 (from both aggregations) StringTerms.Bucket electronicsBucket = terms.getBuckets() .stream() .filter(bucket -> bucket.getKeyAsString().equals("electronics")) @@ -1065,7 +1066,6 @@ public void testReduceSimple() throws Exception { assertThat(electronicsBucket, notNullValue()); assertThat(electronicsBucket.getDocCount(), equalTo(2L)); - // Check that books and clothing buckets each have count 1 StringTerms.Bucket booksBucket = terms.getBuckets() .stream() .filter(bucket -> bucket.getKeyAsString().equals("books")) @@ -1155,11 +1155,10 @@ public void testReduceWithSubAggregations() throws Exception { StringTerms.Bucket electronicsBucket = terms.getBuckets().get(0); assertThat(electronicsBucket.getKeyAsString(), equalTo("electronics")); - assertThat(electronicsBucket.getDocCount(), equalTo(3L)); // 2 from first + 1 from second + assertThat(electronicsBucket.getDocCount(), equalTo(3L)); - // Check that sub-aggregation values are properly reduced InternalSum totalPrice = electronicsBucket.getAggregations().get("total_price"); - assertThat(totalPrice.getValue(), equalTo(450.0)); // 100 + 200 + 150 + assertThat(totalPrice.getValue(), equalTo(450.0)); } } @@ -1216,10 +1215,8 @@ public void testReduceWithSizeLimit() throws Exception { StringTerms terms = (StringTerms) reduced; - // Size limit should be applied during reduce phase assertThat(terms.getBuckets().size(), equalTo(3)); - // Check that overlapping terms (cat_3, cat_4) have doc count 2 for (StringTerms.Bucket bucket : terms.getBuckets()) { if (bucket.getKeyAsString().equals("cat_3") || bucket.getKeyAsString().equals("cat_4")) { assertThat(bucket.getDocCount(), equalTo(2L)); @@ -1304,21 +1301,16 @@ public void testReduceSingleAggregation() throws Exception { List buckets = reduced.getBuckets(); - // Verify the buckets are sorted by count (descending) - // electronics: 2 docs, books: 2 docs, clothing: 1 doc StringTerms.Bucket firstBucket = buckets.get(0); StringTerms.Bucket secondBucket = buckets.get(1); StringTerms.Bucket thirdBucket = buckets.get(2); - // First two buckets should have count 2 (electronics and books) assertThat(firstBucket.getDocCount(), equalTo(2L)); assertThat(secondBucket.getDocCount(), equalTo(2L)); assertThat(thirdBucket.getDocCount(), equalTo(1L)); - // Third bucket should be clothing with count 1 assertThat(thirdBucket.getKeyAsString(), equalTo("clothing")); - // Verify that electronics and books are the first two (order may vary for equal counts) assertTrue( "First two buckets should be electronics and books", (firstBucket.getKeyAsString().equals("electronics") || firstBucket.getKeyAsString().equals("books")) @@ -1326,7 +1318,6 @@ public void testReduceSingleAggregation() throws Exception { && !firstBucket.getKeyAsString().equals(secondBucket.getKeyAsString()) ); - // Verify total document count across all buckets long totalDocs = buckets.stream().mapToLong(StringTerms.Bucket::getDocCount).sum(); assertThat(totalDocs, equalTo(5L)); } @@ -1361,8 +1352,8 @@ public void testThrowOnManySegments() throws Exception { fieldType ); - // Execute the aggregator aggregator.preCollection(); + assertThrows(IllegalStateException.class, () -> { searcher.search(new MatchAllDocsQuery(), aggregator); }); } } @@ -1516,4 +1507,706 @@ public void testCollectDebugInfo() throws IOException { } } } + + public void testOrderByMaxSubAggregationDescending() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create 10 categories with varying max values (size=3, total=10) + for (int i = 0; i < 10; i++) { + Document doc = new Document(); + doc.add(new SortedSetDocValuesField("category", new BytesRef("cat_" + i))); + doc.add(new NumericDocValuesField("value", (i + 1) * 100)); + indexWriter.addDocument(doc); + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new KeywordFieldMapper.KeywordFieldType("category"); + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category") + .size(3) + .shardSize(3) + .order(BucketOrder.aggregation("max_value", false)) + .subAggregation(new MaxAggregationBuilder("max_value").field("value")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamStringTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType, + valueFieldType + ); + + aggregator.preCollection(); + assertEquals("strictly single segment", 1, indexSearcher.getIndexReader().leaves().size()); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + StringTerms result = (StringTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + List buckets = result.getBuckets(); + assertThat(buckets.size(), equalTo(3)); + + assertThat(buckets.get(0).getKeyAsString(), equalTo("cat_7")); + assertThat(buckets.get(1).getKeyAsString(), equalTo("cat_8")); + assertThat(buckets.get(2).getKeyAsString(), equalTo("cat_9")); + + Max max0 = buckets.get(0).getAggregations().get("max_value"); + Max max1 = buckets.get(1).getAggregations().get("max_value"); + Max max2 = buckets.get(2).getAggregations().get("max_value"); + assertThat(max0.getValue(), equalTo(800.0)); + assertThat(max1.getValue(), equalTo(900.0)); + assertThat(max2.getValue(), equalTo(1000.0)); + + // Verify otherDocCount: 10 categories * 1 doc = 10 total, selected 3*1=3, other=7 + assertThat(result.getSumOfOtherDocCounts(), equalTo(7L)); + } + } + } + } + + public void testOrderByCardinalitySubAggregationDescending() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create 10 categories with varying cardinality (size=5, total=10) + for (int i = 0; i < 10; i++) { + int uniqueUsers = (i + 1) * 2; // cat_0=2, cat_1=4, ..., cat_9=20 + for (int j = 0; j < uniqueUsers; j++) { + Document doc = new Document(); + doc.add(new SortedSetDocValuesField("category", new BytesRef("cat_" + i))); + doc.add(new SortedSetDocValuesField("user_id", new BytesRef("user_" + (i * 100 + j)))); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new KeywordFieldMapper.KeywordFieldType("category"); + MappedFieldType userIdFieldType = new KeywordFieldMapper.KeywordFieldType("user_id"); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category") + .size(5) + .shardSize(5) + .order(BucketOrder.aggregation("unique_users", false)) + .subAggregation(new CardinalityAggregationBuilder("unique_users").field("user_id")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamStringTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType, + userIdFieldType + ); + + aggregator.preCollection(); + assertEquals("strictly single segment", 1, indexSearcher.getIndexReader().leaves().size()); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + StringTerms result = (StringTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + List buckets = result.getBuckets(); + assertThat(buckets.size(), equalTo(5)); + + assertThat(buckets.get(0).getKeyAsString(), equalTo("cat_5")); + assertThat(buckets.get(1).getKeyAsString(), equalTo("cat_6")); + assertThat(buckets.get(2).getKeyAsString(), equalTo("cat_7")); + assertThat(buckets.get(3).getKeyAsString(), equalTo("cat_8")); + assertThat(buckets.get(4).getKeyAsString(), equalTo("cat_9")); + + Cardinality card0 = buckets.get(0).getAggregations().get("unique_users"); + Cardinality card1 = buckets.get(1).getAggregations().get("unique_users"); + Cardinality card2 = buckets.get(2).getAggregations().get("unique_users"); + Cardinality card3 = buckets.get(3).getAggregations().get("unique_users"); + Cardinality card4 = buckets.get(4).getAggregations().get("unique_users"); + assertThat(card0.getValue(), equalTo(12L)); + assertThat(card1.getValue(), equalTo(14L)); + assertThat(card2.getValue(), equalTo(16L)); + assertThat(card3.getValue(), equalTo(18L)); + assertThat(card4.getValue(), equalTo(20L)); + + // Verify otherDocCount: total docs = 2+4+6+8+10+12+14+16+18+20=110, selected=12+14+16+18+20=80, other=30 + assertThat(result.getSumOfOtherDocCounts(), equalTo(30L)); + } + } + } + } + + public void testOrderByCardinalitySubAggregationAscending() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create 10 categories where TOP 3 by cardinality ASC won't be alphabetically first + // cat_z=2, cat_y=4, cat_x=6, cat_a=8, cat_b=10, cat_c=12, cat_d=14, cat_e=16, cat_f=18, cat_g=20 + String[] names = { "cat_z", "cat_y", "cat_x", "cat_a", "cat_b", "cat_c", "cat_d", "cat_e", "cat_f", "cat_g" }; + for (int i = 0; i < 10; i++) { + int uniqueUsers = (i + 1) * 2; + for (int j = 0; j < uniqueUsers; j++) { + Document doc = new Document(); + doc.add(new SortedSetDocValuesField("category", new BytesRef(names[i]))); + doc.add(new SortedSetDocValuesField("user_id", new BytesRef("user_" + (i * 100 + j)))); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new KeywordFieldMapper.KeywordFieldType("category"); + MappedFieldType userIdFieldType = new KeywordFieldMapper.KeywordFieldType("user_id"); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category") + .size(3) + .shardSize(3) + .order(BucketOrder.aggregation("unique_users", true)) + .subAggregation(new CardinalityAggregationBuilder("unique_users").field("user_id")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamStringTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType, + userIdFieldType + ); + + aggregator.preCollection(); + assertEquals("strictly single segment", 1, indexSearcher.getIndexReader().leaves().size()); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + StringTerms result = (StringTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + List buckets = result.getBuckets(); + assertThat(buckets.size(), equalTo(3)); + + assertThat(buckets.get(0).getKeyAsString(), equalTo("cat_x")); + assertThat(buckets.get(1).getKeyAsString(), equalTo("cat_y")); + assertThat(buckets.get(2).getKeyAsString(), equalTo("cat_z")); + + Cardinality card0 = buckets.get(0).getAggregations().get("unique_users"); + Cardinality card1 = buckets.get(1).getAggregations().get("unique_users"); + Cardinality card2 = buckets.get(2).getAggregations().get("unique_users"); + assertThat(card0.getValue(), equalTo(6L)); + assertThat(card1.getValue(), equalTo(4L)); + assertThat(card2.getValue(), equalTo(2L)); + + // Verify otherDocCount: total docs = 2+4+6+8+10+12+14+16+18+20=110, selected=6+4+2=12, other=98 + assertThat(result.getSumOfOtherDocCounts(), equalTo(98L)); + } + } + } + } + + public void testNoSortOrder() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create categories where alphabetical order != count order + // cat_z=10 docs (highest), cat_y=9, cat_x=8, cat_a=7, cat_b=6, cat_c=5, cat_d=4, cat_e=3, cat_f=2, cat_g=1 (lowest) + String[] names = { "cat_z", "cat_y", "cat_x", "cat_a", "cat_b", "cat_c", "cat_d", "cat_e", "cat_f", "cat_g" }; + for (int i = 0; i < 10; i++) { + int numDocs = 10 - i; + for (int j = 0; j < numDocs; j++) { + Document doc = new Document(); + doc.add(new SortedSetDocValuesField("category", new BytesRef(names[i]))); + doc.add(new NumericDocValuesField("value", (i + 1) * 100 + j)); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new KeywordFieldMapper.KeywordFieldType("category"); + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category") + .size(5) + .shardSize(5) + .subAggregation(new MaxAggregationBuilder("max_value").field("value")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamStringTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType, + valueFieldType + ); + + aggregator.preCollection(); + assertEquals("strictly single segment", 1, indexSearcher.getIndexReader().leaves().size()); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + StringTerms result = (StringTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + List buckets = result.getBuckets(); + assertThat(buckets.size(), equalTo(5)); + + // Default order is count DESC, so top 5 should be cat_z, cat_y, cat_x, cat_a, cat_b (highest counts) + // Returned in alphabetical order at shard level + assertThat(buckets.get(0).getKeyAsString(), equalTo("cat_a")); + assertThat(buckets.get(0).getDocCount(), equalTo(7L)); + assertThat(buckets.get(1).getKeyAsString(), equalTo("cat_b")); + assertThat(buckets.get(1).getDocCount(), equalTo(6L)); + assertThat(buckets.get(2).getKeyAsString(), equalTo("cat_x")); + assertThat(buckets.get(2).getDocCount(), equalTo(8L)); + assertThat(buckets.get(3).getKeyAsString(), equalTo("cat_y")); + assertThat(buckets.get(3).getDocCount(), equalTo(9L)); + assertThat(buckets.get(4).getKeyAsString(), equalTo("cat_z")); + assertThat(buckets.get(4).getDocCount(), equalTo(10L)); + + // Verify otherDocCount: total=55 docs (10+9+8+7+6+5+4+3+2+1), selected=40 (10+9+8+7+6), other=15 + assertThat(result.getSumOfOtherDocCounts(), equalTo(15L)); + } + } + } + } + + public void testOrderByMaxAscending() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create 10 categories where TOP 3 by max ASC won't be alphabetically first + // cat_z=100, cat_y=200, cat_x=300, cat_a=400, ... + String[] names = { "cat_z", "cat_y", "cat_x", "cat_a", "cat_b", "cat_c", "cat_d", "cat_e", "cat_f", "cat_g" }; + for (int i = 0; i < 10; i++) { + int numDocs = 5; + for (int j = 0; j < numDocs; j++) { + Document doc = new Document(); + doc.add(new SortedSetDocValuesField("category", new BytesRef(names[i]))); + doc.add(new NumericDocValuesField("value", (i + 1) * 100 + j)); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new KeywordFieldMapper.KeywordFieldType("category"); + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category") + .size(3) + .shardSize(3) + .order(BucketOrder.aggregation("max_value", true)) + .subAggregation(new MaxAggregationBuilder("max_value").field("value")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamStringTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType, + valueFieldType + ); + + aggregator.preCollection(); + assertEquals("strictly single segment", 1, indexSearcher.getIndexReader().leaves().size()); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + StringTerms result = (StringTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + List buckets = result.getBuckets(); + assertThat(buckets.size(), equalTo(3)); + + assertThat(buckets.get(0).getKeyAsString(), equalTo("cat_x")); + assertThat(buckets.get(1).getKeyAsString(), equalTo("cat_y")); + assertThat(buckets.get(2).getKeyAsString(), equalTo("cat_z")); + + Max max0 = buckets.get(0).getAggregations().get("max_value"); + Max max1 = buckets.get(1).getAggregations().get("max_value"); + Max max2 = buckets.get(2).getAggregations().get("max_value"); + assertThat(max0.getValue(), equalTo(304.0)); + assertThat(max1.getValue(), equalTo(204.0)); + assertThat(max2.getValue(), equalTo(104.0)); + + // Verify otherDocCount: 10 categories * 5 docs = 50 total, selected 3*5=15, other=35 + assertThat(result.getSumOfOtherDocCounts(), equalTo(35L)); + } + } + } + } + + public void testOrderByMaxDescending() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create 10 categories with varying max values + for (int i = 0; i < 10; i++) { + int numDocs = 5; + for (int j = 0; j < numDocs; j++) { + Document doc = new Document(); + doc.add(new SortedSetDocValuesField("category", new BytesRef("cat_" + i))); + doc.add(new NumericDocValuesField("value", (i + 1) * 100 + j)); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new KeywordFieldMapper.KeywordFieldType("category"); + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.LONG); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category") + .size(3) + .shardSize(3) + .order(BucketOrder.aggregation("max_value", false)) + .subAggregation(new MaxAggregationBuilder("max_value").field("value")); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamStringTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType, + valueFieldType + ); + + aggregator.preCollection(); + assertEquals("strictly single segment", 1, indexSearcher.getIndexReader().leaves().size()); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + StringTerms result = (StringTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + List buckets = result.getBuckets(); + assertThat(buckets.size(), equalTo(3)); + + assertThat(buckets.get(0).getKeyAsString(), equalTo("cat_7")); + assertThat(buckets.get(1).getKeyAsString(), equalTo("cat_8")); + assertThat(buckets.get(2).getKeyAsString(), equalTo("cat_9")); + + Max max0 = buckets.get(0).getAggregations().get("max_value"); + Max max1 = buckets.get(1).getAggregations().get("max_value"); + Max max2 = buckets.get(2).getAggregations().get("max_value"); + assertThat(max0.getValue(), equalTo(804.0)); + assertThat(max1.getValue(), equalTo(904.0)); + assertThat(max2.getValue(), equalTo(1004.0)); + + // Verify otherDocCount: 10 categories * 5 docs = 50 total, selected 3*5=15, other=35 + assertThat(result.getSumOfOtherDocCounts(), equalTo(35L)); + } + } + } + } + + public void testMinDocCount() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create categories with varying doc counts: cat_0=1, cat_1=2, cat_2=3, cat_3=4, cat_4=5 + for (int i = 0; i < 5; i++) { + for (int j = 0; j <= i; j++) { + Document doc = new Document(); + doc.add(new SortedSetDocValuesField("category", new BytesRef("cat_" + i))); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType categoryFieldType = new KeywordFieldMapper.KeywordFieldType("category"); + + // Test with minDocCount=3, should only return cat_2, cat_3, cat_4 + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category").minDocCount(3); + + StreamStringTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + createIndexSettings(), + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + categoryFieldType + ); + + aggregator.preCollection(); + assertEquals("strictly single segment", 1, indexSearcher.getIndexReader().leaves().size()); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + StringTerms result = (StringTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + List buckets = result.getBuckets(); + assertThat(buckets.size(), equalTo(3)); + + assertThat(buckets.get(0).getKeyAsString(), equalTo("cat_2")); + assertThat(buckets.get(0).getDocCount(), equalTo(3L)); + assertThat(buckets.get(1).getKeyAsString(), equalTo("cat_3")); + assertThat(buckets.get(1).getDocCount(), equalTo(4L)); + assertThat(buckets.get(2).getKeyAsString(), equalTo("cat_4")); + assertThat(buckets.get(2).getDocCount(), equalTo(5L)); + + // Verify otherDocCount: cat_0=1 + cat_1=2 = 3 docs excluded + assertThat(result.getSumOfOtherDocCounts(), equalTo(3L)); + } + } + } + } + + public void testKeyOrderWithSizeLimitDropsCorrectBuckets() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create terms where top 5 by count != first 5 alphabetically + // Alphabetically: aaa, bbb, ccc, ddd, eee, fff, ggg, hhh, iii, jjj + // By count: zzz(100), yyy(90), xxx(80), www(70), vvv(60), aaa(50), bbb(40), ccc(30), ddd(20), eee(10) + String[] terms = { "zzz", "yyy", "xxx", "www", "vvv", "aaa", "bbb", "ccc", "ddd", "eee" }; + int[] counts = { 100, 90, 80, 70, 60, 50, 40, 30, 20, 10 }; + for (int i = 0; i < terms.length; i++) { + for (int j = 0; j < counts[i]; j++) { + Document doc = new Document(); + doc.add(new SortedSetDocValuesField("field", new BytesRef(terms[i]))); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("field"); + + // Request size=5 with key order ascending - should return first 5 alphabetically + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("test").field("field") + .size(5) + .shardSize(5) + .order(BucketOrder.key(true)); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamStringTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + fieldType + ); + + aggregator.preCollection(); + assertEquals("strictly single segment", 1, indexSearcher.getIndexReader().leaves().size()); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + StringTerms result = (StringTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + List buckets = result.getBuckets(); + assertThat(buckets.size(), equalTo(5)); + + // With key order ASC, should return first 5 alphabetically: aaa, bbb, ccc, ddd, eee + // NOT the top 5 by doc count (zzz, yyy, xxx, www, vvv) + assertThat(buckets.get(0).getKeyAsString(), equalTo("aaa")); + assertThat(buckets.get(1).getKeyAsString(), equalTo("bbb")); + assertThat(buckets.get(2).getKeyAsString(), equalTo("ccc")); + assertThat(buckets.get(3).getKeyAsString(), equalTo("ddd")); + assertThat(buckets.get(4).getKeyAsString(), equalTo("eee")); + } + } + } + } + + public void testKeyOrderDescendingWithSizeLimitDropsCorrectBuckets() throws Exception { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + // Create terms where top 5 by count != last 5 alphabetically + // Alphabetically: aaa, bbb, ccc, ddd, eee, fff, ggg, hhh, iii, jjj + // By count: jjj(100), iii(90), hhh(80), ggg(70), fff(60), aaa(50), bbb(40), ccc(30), ddd(20), eee(10) + String[] terms = { "jjj", "iii", "hhh", "ggg", "fff", "aaa", "bbb", "ccc", "ddd", "eee" }; + int[] counts = { 100, 90, 80, 70, 60, 50, 40, 30, 20, 10 }; + for (int i = 0; i < terms.length; i++) { + for (int j = 0; j < counts[i]; j++) { + Document doc = new Document(); + doc.add(new SortedSetDocValuesField("field", new BytesRef(terms[i]))); + indexWriter.addDocument(doc); + } + } + + try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("field"); + + // Request size=5 with key order descending - should return last 5 alphabetically + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("test").field("field") + .size(5) + .shardSize(5) + .order(BucketOrder.key(false)); + + IndexSettings indexSettings = new IndexSettings( + IndexMetadata.builder("_index") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put("index.aggregation.streaming.min_shard_size", 1) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + Settings.EMPTY + ); + + StreamStringTermsAggregator aggregator = createStreamAggregator( + null, + aggregationBuilder, + indexSearcher, + indexSettings, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + fieldType + ); + + aggregator.preCollection(); + assertEquals("strictly single segment", 1, indexSearcher.getIndexReader().leaves().size()); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + StringTerms result = (StringTerms) aggregator.buildAggregations(new long[] { 0 })[0]; + + assertThat(result, notNullValue()); + List buckets = result.getBuckets(); + assertThat(buckets.size(), equalTo(5)); + + // With key order DESC, should return last 5 alphabetically: jjj, iii, hhh, ggg, fff + // NOT the top 5 by doc count + assertThat(buckets.get(0).getKeyAsString(), equalTo("fff")); + assertThat(buckets.get(1).getKeyAsString(), equalTo("ggg")); + assertThat(buckets.get(2).getKeyAsString(), equalTo("hhh")); + assertThat(buckets.get(3).getKeyAsString(), equalTo("iii")); + assertThat(buckets.get(4).getKeyAsString(), equalTo("jjj")); + } + } + } + } } diff --git a/server/src/test/java/org/opensearch/search/streaming/FlushModeResolverTests.java b/server/src/test/java/org/opensearch/search/streaming/FlushModeResolverTests.java index 10befbe76aa70..0cf8dec1e488e 100644 --- a/server/src/test/java/org/opensearch/search/streaming/FlushModeResolverTests.java +++ b/server/src/test/java/org/opensearch/search/streaming/FlushModeResolverTests.java @@ -419,4 +419,72 @@ public void testSettingsDefaults() { assertEquals(0.01, FlushModeResolver.STREAMING_MIN_CARDINALITY_RATIO.getDefault(Settings.EMPTY).doubleValue(), 0.001); assertEquals(1000L, FlushModeResolver.STREAMING_MIN_ESTIMATED_BUCKET_COUNT.getDefault(Settings.EMPTY).longValue()); } + + public void testResolveWithNumericKeyOrderAscending() throws IOException { + withIndex(writer -> { + for (int i = 0; i < 100; i++) { + Document document = new Document(); + int value = i % 10; + document.add(new SortedNumericDocValuesField("number", value)); + document.add(new IntPoint("number", value)); + writer.addDocument(document); + } + }, searcher -> { + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("number", NumberFieldMapper.NumberType.INTEGER); + TermsAggregationBuilder builder = new TermsAggregationBuilder("numbers").field("number") + .order(org.opensearch.search.aggregations.BucketOrder.key(true)); + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + builder, + searcher, + createIndexSettings(), + createBucketConsumer(), + fieldType + ); + + FlushMode result = FlushModeResolver.resolve( + aggregator, + FlushMode.PER_SHARD, + SMALL_BUCKET_LIMIT, + HIGH_CARDINALITY_RATIO, + MIN_BUCKET_THRESHOLD + ); + + assertEquals(FlushMode.PER_SHARD, result); + }); + } + + public void testResolveWithNumericCountOrder() throws IOException { + withIndex(writer -> { + for (int i = 0; i < 100; i++) { + Document document = new Document(); + int value = i % 10; + document.add(new SortedNumericDocValuesField("number", value)); + document.add(new IntPoint("number", value)); + writer.addDocument(document); + } + }, searcher -> { + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("number", NumberFieldMapper.NumberType.INTEGER); + TermsAggregationBuilder builder = new TermsAggregationBuilder("numbers").field("number") + .order(org.opensearch.search.aggregations.BucketOrder.count(true)); + StreamNumericTermsAggregator aggregator = createStreamAggregator( + null, + builder, + searcher, + createIndexSettings(), + createBucketConsumer(), + fieldType + ); + + FlushMode result = FlushModeResolver.resolve( + aggregator, + FlushMode.PER_SHARD, + SMALL_BUCKET_LIMIT, + HIGH_CARDINALITY_RATIO, + MIN_BUCKET_THRESHOLD + ); + + assertEquals(FlushMode.PER_SEGMENT, result); + }); + } } diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index cb8efa2ad130c..198d1a28aabef 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -501,6 +501,7 @@ public boolean shouldCache(Query query) { when(searchContext.bitsetFilterCache()).thenReturn(new BitsetFilterCache(indexSettings, mock(Listener.class))); IndexShard indexShard = mock(IndexShard.class); when(indexShard.shardId()).thenReturn(new ShardId("test", "test", 0)); + when(indexShard.indexSettings()).thenReturn(indexSettings); when(searchContext.indexShard()).thenReturn(indexShard); SearchOperationListener searchOperationListener = new SearchOperationListener() { };