From 0809d162b62b6a48e5a1ac163a856e8bfb3c9d1a Mon Sep 17 00:00:00 2001 From: Wenting Wang Date: Thu, 19 Feb 2026 10:32:44 -0800 Subject: [PATCH 1/6] Populate HeadStats in _tsdb/stats endpoint Collect HeadStats from LiveSeriesIndexLeafReader in TSDBStatsAggregator, merge across shards by summing numSeries/chunkCount and taking min/max of time bounds, and include in the endpoint response. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Wenting Wang rebase for reviewing stacked diff, unit tests failed, will fix in the next commit Signed-off-by: Wenting Wang fix unit test due to rebase Signed-off-by: Wenting Wang add includeHeadStats in TSDBStatsAggregator Signed-off-by: Wenting Wang --- .../integrationTests/TSDBStatsRestIT.java | 24 +++ .../query/aggregator/InternalTSDBStats.java | 40 ++++- .../TSDBStatsAggregationBuilder.java | 45 ++++- .../query/aggregator/TSDBStatsAggregator.java | 41 ++++- .../TSDBStatsAggregatorFactory.java | 17 +- .../tsdb/query/rest/RestTSDBStatsAction.java | 10 +- .../query/aggregator/AggregatorTestUtils.java | 52 ++++++ .../TSDBStatsAggregationBuilderTests.java | 93 ++++++++--- .../TSDBStatsAggregatorFactoryTests.java | 3 +- .../aggregator/TSDBStatsAggregatorTests.java | 156 +++++++++++++++--- 10 files changed, 426 insertions(+), 55 deletions(-) diff --git a/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java b/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java index 336e25950..9f9116914 100644 --- a/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java +++ b/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java @@ -55,6 +55,12 @@ public void testBasicEndpoint() throws Exception { String expectedJson = """ { + "headStats": { + "numSeries": 10, + "chunkCount": 0, + "minTime": 0, + "maxTime": 9223372036854775807 + }, "labelStats": { "numSeries": 10, "name": { @@ -274,6 +280,12 @@ public void testFormatOptions() throws Exception { // Grouped format - same as default/testTSDBStatsEndpointExists String expectedGroupedJson = """ { + "headStats": { + "numSeries": 10, + "chunkCount": 0, + "minTime": 0, + "maxTime": 9223372036854775807 + }, "labelStats": { "numSeries": 10, "name": { @@ -348,6 +360,12 @@ public void testFormatOptions() throws Exception { // Flat format has arrays sorted by count descending, then name ascending String expectedFlatJson = """ { + "headStats": { + "numSeries": 10, + "chunkCount": 0, + "minTime": 0, + "maxTime": 9223372036854775807 + }, "seriesCountByMetricName": [ {"name": "http_requests_total", "value": 6}, {"name": "db_connections", "value": 2}, @@ -410,6 +428,12 @@ public void testQueryFiltering() throws Exception { // Filtered to service:api AND name:http_* (5 series - all api series have http_* names) String expectedJson = """ { + "headStats": { + "numSeries": 10, + "chunkCount": 0, + "minTime": 0, + "maxTime": 9223372036854775807 + }, "labelStats": { "numSeries": 5, "name": { diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java b/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java index d440ef092..be4afcbf9 100644 --- a/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java +++ b/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java @@ -280,6 +280,7 @@ public void writeTo(StreamOutput out) throws IOException { * Factory method for creating shard-level stats (with seriesIds ). * * @param name the name of the aggregation + * @param headStats the head statistics (null if not populated) * @param shardStats the shard-level statistics with seriesIds * @param metadata the aggregation metadata * @return InternalTSDBStats instance for shard-level phase @@ -442,6 +443,9 @@ private InternalTSDBStats reduceShardLevel(List aggregation Set mergedSeriesIds = new HashSet<>(); Map>> mergedLabelStats = new HashMap<>(); + // Merge HeadStats from all shard-level aggregations + HeadStats mergedHeadStats = mergeHeadStats(aggregations); + // Capture global includeValueStats flag from first shard (all shards have same value) boolean includeValueStats = false; if (!aggregations.isEmpty()) { @@ -538,7 +542,7 @@ private InternalTSDBStats reduceShardLevel(List aggregation // Return coordinator-level stats (seriesId converted to counts to save network bandwidth) CoordinatorLevelStats coordinatorStats = new CoordinatorLevelStats(totalSeries, finalLabelStats); - return forCoordinatorLevel(name, null, coordinatorStats, metadata); + return forCoordinatorLevel(name, mergedHeadStats, coordinatorStats, metadata); } /** @@ -548,7 +552,7 @@ private InternalTSDBStats reduceShardLevel(List aggregation * is needed because each time series exists on only one shard (guaranteed by routing).

*/ private InternalTSDBStats reduceCoordinatorLevel(List aggregations) { - HeadStats mergedHeadStats = null; // TODO: Merge HeadStats in future when populated + HeadStats mergedHeadStats = mergeHeadStats(aggregations); Long totalSeries = null; Map builders = new HashMap<>(); @@ -610,6 +614,38 @@ private static class LabelStatsBuilder { Map valueCounts = new LinkedHashMap<>(); } + /** + * Merges HeadStats from multiple aggregations by summing numSeries and chunkCount, + * taking the minimum minTime, and taking the maximum maxTime. + * + * @param aggregations the list of aggregations containing HeadStats to merge + * @return merged HeadStats, or null if no aggregation had HeadStats + */ + static HeadStats mergeHeadStats(List aggregations) { + long totalNumSeries = 0; + long totalChunkCount = 0; + long minTime = Long.MAX_VALUE; + long maxTime = Long.MIN_VALUE; + boolean hasAny = false; + + for (InternalAggregation agg : aggregations) { + InternalTSDBStats stats = (InternalTSDBStats) agg; + if (stats.headStats != null) { + hasAny = true; + totalNumSeries += stats.headStats.numSeries(); + totalChunkCount += stats.headStats.chunkCount(); + if (stats.headStats.minTime() < minTime) { + minTime = stats.headStats.minTime(); + } + if (stats.headStats.maxTime() > maxTime) { + maxTime = stats.headStats.maxTime(); + } + } + } + + return hasAny ? new HeadStats(totalNumSeries, totalChunkCount, minTime, maxTime) : null; + } + /** * Retrieves a property value based on the given path. * diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java index 441044952..ea24f8cc6 100644 --- a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java +++ b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java @@ -38,7 +38,7 @@ * *

Usage Example:

*
{@code
- * TSDBStatsAggregationBuilder builder = new TSDBStatsAggregationBuilder("tsdb_stats", 1000000L, 2000000L, true);
+ * TSDBStatsAggregationBuilder builder = new TSDBStatsAggregationBuilder("tsdb_stats", 1000000L, 2000000L, true, true);
  * }
* * @since 0.0.1 @@ -50,6 +50,7 @@ public class TSDBStatsAggregationBuilder extends AbstractAggregationBuilder> ordinalToSeriesIdsMap; + // HeadStats accumulator: tracks stats from LiveSeriesIndexLeafReader segments + private long headNumSeries; + private long headChunkCount; + private long headMinTime; + private long headMaxTime; + private boolean hasHeadStats; + /** * Creates a TSDB stats aggregator. * @@ -74,6 +83,7 @@ public class TSDBStatsAggregator extends MetricsAggregator { * @param minTimestamp The minimum timestamp for filtering * @param maxTimestamp The maximum timestamp for filtering * @param includeValueStats Whether to include per-value statistics + * @param includeHeadStats Whether to include head (in-memory) statistics * @param metadata The aggregation metadata * @throws IOException If an error occurs during initialization */ @@ -84,12 +94,14 @@ public TSDBStatsAggregator( long minTimestamp, long maxTimestamp, boolean includeValueStats, + boolean includeHeadStats, Map metadata ) throws IOException { super(name, context, parent, metadata); this.minTimestamp = minTimestamp; this.maxTimestamp = maxTimestamp; this.includeValueStats = includeValueStats; + this.includeHeadStats = includeHeadStats; this.seenSeriesIds = new HashSet<>(); @@ -97,6 +109,13 @@ public TSDBStatsAggregator( this.labelValuePairOrdinalMap = new BytesRefHash(pool); this.ordinalToSeriesIdsMap = includeValueStats ? new HashMap<>() : null; + + // Initialize HeadStats accumulator + this.headNumSeries = 0; + this.headChunkCount = 0; + this.headMinTime = Long.MAX_VALUE; + this.headMaxTime = Long.MIN_VALUE; + this.hasHeadStats = false; } @Override @@ -111,6 +130,21 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol return sub; } + // Collect HeadStats from LiveSeriesIndexLeafReader segments + if (includeHeadStats && tsdbLeafReader instanceof LiveSeriesIndexLeafReader) { + hasHeadStats = true; + // LiveSeriesIndex guarantees one doc per series, so numDocs == numSeries + headNumSeries += ctx.reader().numDocs(); + long leafMinTime = tsdbLeafReader.getMinIndexTimestamp(); + long leafMaxTime = tsdbLeafReader.getMaxIndexTimestamp(); + if (leafMinTime < headMinTime) { + headMinTime = leafMinTime; + } + if (leafMaxTime > headMaxTime) { + headMaxTime = leafMaxTime; + } + } + return new TSDBStatsLeafBucketCollector(ctx, tsdbLeafReader, sub); } @@ -206,8 +240,13 @@ public InternalAggregation buildAggregation(long bucket) throws IOException { includeValueStats // Pass global flag so coordinator knows if value stats were collected ); + // Build HeadStats if any LiveSeriesIndexLeafReader segments were encountered + InternalTSDBStats.HeadStats headStats = hasHeadStats + ? new InternalTSDBStats.HeadStats(headNumSeries, headChunkCount, headMinTime, headMaxTime) + : null; + // Return using factory method - return InternalTSDBStats.forShardLevel(name, null, shardStats, metadata()); + return InternalTSDBStats.forShardLevel(name, headStats, shardStats, metadata()); } @Override diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactory.java b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactory.java index f3ecb1f60..d68318200 100644 --- a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactory.java +++ b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactory.java @@ -44,6 +44,7 @@ public class TSDBStatsAggregatorFactory extends AggregatorFactory { private final long minTimestamp; private final long maxTimestamp; private final boolean includeValueStats; + private final boolean includeHeadStats; /** * Creates a TSDB stats aggregator factory. @@ -56,6 +57,7 @@ public class TSDBStatsAggregatorFactory extends AggregatorFactory { * @param minTimestamp The minimum timestamp for filtering * @param maxTimestamp The maximum timestamp for filtering * @param includeValueStats Whether to include per-value statistics + * @param includeHeadStats Whether to include head (in-memory) statistics * @throws IOException If an error occurs during initialization */ public TSDBStatsAggregatorFactory( @@ -66,12 +68,14 @@ public TSDBStatsAggregatorFactory( Map metadata, long minTimestamp, long maxTimestamp, - boolean includeValueStats + boolean includeValueStats, + boolean includeHeadStats ) throws IOException { super(name, queryShardContext, parent, subFactoriesBuilder, metadata); this.minTimestamp = minTimestamp; this.maxTimestamp = maxTimestamp; this.includeValueStats = includeValueStats; + this.includeHeadStats = includeHeadStats; } @Override @@ -81,7 +85,16 @@ public Aggregator createInternal( CardinalityUpperBound cardinality, Map metadata ) throws IOException { - return new TSDBStatsAggregator(name, searchContext, parent, minTimestamp, maxTimestamp, includeValueStats, metadata); + return new TSDBStatsAggregator( + name, + searchContext, + parent, + minTimestamp, + maxTimestamp, + includeValueStats, + includeHeadStats, + metadata + ); } @Override diff --git a/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java b/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java index fba92f292..366c350aa 100644 --- a/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java +++ b/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java @@ -401,13 +401,21 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli // Determine what statistics to include boolean includeValueStats = includeOptions.contains(TSDBStatsConstants.INCLUDE_ALL) || includeOptions.contains(TSDBStatsConstants.INCLUDE_VALUE_STATS); + boolean includeHeadStats = includeOptions.contains(TSDBStatsConstants.INCLUDE_ALL) + || includeOptions.contains(TSDBStatsConstants.INCLUDE_HEAD_STATS); try { // Build QueryBuilder from the already-parsed FetchPlanNode QueryBuilder filter = buildQueryFromFetch(fetchPlan, startMs, endMs); // Build aggregation - TSDBStatsAggregationBuilder aggBuilder = new TSDBStatsAggregationBuilder(AGGREGATION_NAME, startMs, endMs, includeValueStats); + TSDBStatsAggregationBuilder aggBuilder = new TSDBStatsAggregationBuilder( + AGGREGATION_NAME, + startMs, + endMs, + includeValueStats, + includeHeadStats + ); // Build search request SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(filter).aggregation(aggBuilder).size(0); diff --git a/src/test/java/org/opensearch/tsdb/query/aggregator/AggregatorTestUtils.java b/src/test/java/org/opensearch/tsdb/query/aggregator/AggregatorTestUtils.java index 212a1ff20..163c18b43 100644 --- a/src/test/java/org/opensearch/tsdb/query/aggregator/AggregatorTestUtils.java +++ b/src/test/java/org/opensearch/tsdb/query/aggregator/AggregatorTestUtils.java @@ -9,6 +9,7 @@ import org.apache.lucene.codecs.StoredFieldsReader; import org.apache.lucene.document.Document; +import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.CompositeReader; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexWriter; @@ -20,7 +21,12 @@ import org.apache.lucene.index.TermVectors; import org.apache.lucene.store.ByteBuffersDirectory; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; import org.opensearch.tsdb.core.chunk.ChunkIterator; +import org.opensearch.tsdb.core.index.live.LiveSeriesIndexLeafReader; +import org.opensearch.tsdb.core.index.live.MemChunkReader; +import org.opensearch.tsdb.core.mapping.Constants; +import org.opensearch.tsdb.core.mapping.LabelStorageType; import org.opensearch.tsdb.core.model.ByteLabels; import org.opensearch.tsdb.core.model.Labels; import org.opensearch.tsdb.core.reader.TSDBDocValues; @@ -32,6 +38,7 @@ import java.util.Map; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Shared test utilities for aggregator tests. @@ -239,4 +246,49 @@ public long getSumTotalTermFreq(String field) throws IOException { } }; } + + public static TSDBLeafReaderWithContext createLiveSeriesIndexLeafReaderWithLabels( + long minTimestamp, + long maxTimestamp, + long seriesId, + Map labelPairs + ) throws IOException { + Directory directory = new ByteBuffersDirectory(); + IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig()); + + Document doc = new Document(); + // Add REFERENCE field (LiveSeriesIndex field) + doc.add(new NumericDocValuesField(Constants.IndexSchema.REFERENCE, seriesId)); + // Add labels as binary doc values for LabelStorageType.BINARY + Labels labels = ByteLabels.fromMap(labelPairs); + doc.add( + new org.apache.lucene.document.BinaryDocValuesField( + Constants.IndexSchema.LABELS, + new BytesRef(((ByteLabels) labels).getRawBytes()) + ) + ); + indexWriter.addDocument(doc); + indexWriter.commit(); + + DirectoryReader tempReader = DirectoryReader.open(indexWriter); + LeafReader baseReader = tempReader.leaves().get(0).reader(); + + // Create a real LiveSeriesIndexLeafReader with mocked chunk reader + MemChunkReader mockChunkReader = mock(MemChunkReader.class); + when(mockChunkReader.getChunks(org.mockito.ArgumentMatchers.anyLong())).thenReturn(List.of()); + + LiveSeriesIndexLeafReader liveReader = new LiveSeriesIndexLeafReader( + baseReader, + mockChunkReader, + LabelStorageType.BINARY, + minTimestamp, + java.util.Collections.emptyMap() + ); + + CompositeReader compositeReader = createCompositeReaderWrapper(liveReader); + LeafReaderContext context = compositeReader.leaves().get(0); + + return new TSDBLeafReaderWithContext(liveReader, context, compositeReader, tempReader, indexWriter, directory); + } + } diff --git a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java index bc91dc0cd..6a95d1c7b 100644 --- a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java +++ b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java @@ -35,12 +35,13 @@ public class TSDBStatsAggregationBuilderTests extends OpenSearchTestCase { // ========== Constructor Tests ========== public void testConstructorWithValidParameters() { - TSDBStatsAggregationBuilder builder = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true); + TSDBStatsAggregationBuilder builder = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, true); assertEquals(TEST_NAME, builder.getName()); assertEquals(MIN_TIMESTAMP, builder.getMinTimestamp()); assertEquals(MAX_TIMESTAMP, builder.getMaxTimestamp()); assertTrue(builder.isIncludeValueStats()); + assertTrue(builder.isIncludeHeadStats()); assertEquals("tsdb_stats_agg", builder.getType()); assertEquals(AggregationBuilder.BucketCardinality.NONE, builder.bucketCardinality()); } @@ -49,14 +50,14 @@ public void testConstructorRejectsInvalidTimeRange() { // max < min IllegalArgumentException ex1 = expectThrows( IllegalArgumentException.class, - () -> new TSDBStatsAggregationBuilder(TEST_NAME, 2000L, 1000L, true) + () -> new TSDBStatsAggregationBuilder(TEST_NAME, 2000L, 1000L, true, true) ); assertTrue(ex1.getMessage().contains("maxTimestamp must be greater than minTimestamp")); // max == min IllegalArgumentException ex2 = expectThrows( IllegalArgumentException.class, - () -> new TSDBStatsAggregationBuilder(TEST_NAME, 1000L, 1000L, true) + () -> new TSDBStatsAggregationBuilder(TEST_NAME, 1000L, 1000L, true, true) ); assertTrue(ex2.getMessage().contains("maxTimestamp must be greater than minTimestamp")); } @@ -64,8 +65,8 @@ public void testConstructorRejectsInvalidTimeRange() { // ========== Serialization Tests ========== public void testSerializationRoundTrip() throws IOException { - // includeValueStats=true - TSDBStatsAggregationBuilder original = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true); + // includeValueStats=true, includeHeadStats=true + TSDBStatsAggregationBuilder original = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, true); TSDBStatsAggregationBuilder deserialized = serializeAndDeserialize(original); assertEquals(original, deserialized); @@ -74,16 +75,18 @@ public void testSerializationRoundTrip() throws IOException { assertEquals(original.getMinTimestamp(), deserialized.getMinTimestamp()); assertEquals(original.getMaxTimestamp(), deserialized.getMaxTimestamp()); assertEquals(original.isIncludeValueStats(), deserialized.isIncludeValueStats()); + assertEquals(original.isIncludeHeadStats(), deserialized.isIncludeHeadStats()); - // includeValueStats=false - TSDBStatsAggregationBuilder original2 = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, false); + // includeValueStats=false, includeHeadStats=false + TSDBStatsAggregationBuilder original2 = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, false, false); TSDBStatsAggregationBuilder deserialized2 = serializeAndDeserialize(original2); assertEquals(original2, deserialized2); assertFalse(deserialized2.isIncludeValueStats()); + assertFalse(deserialized2.isIncludeHeadStats()); } public void testSerializationWithEdgeCaseTimestamps() throws IOException { - TSDBStatsAggregationBuilder original = new TSDBStatsAggregationBuilder(TEST_NAME, Long.MIN_VALUE, Long.MAX_VALUE, true); + TSDBStatsAggregationBuilder original = new TSDBStatsAggregationBuilder(TEST_NAME, Long.MIN_VALUE, Long.MAX_VALUE, true, true); TSDBStatsAggregationBuilder deserialized = serializeAndDeserialize(original); assertEquals(original, deserialized); @@ -94,7 +97,7 @@ public void testSerializationWithEdgeCaseTimestamps() throws IOException { // ========== XContent Tests ========== public void testXContentGeneration() throws IOException { - TSDBStatsAggregationBuilder builder = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true); + TSDBStatsAggregationBuilder builder = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, true); XContentBuilder xContentBuilder = XContentFactory.jsonBuilder(); builder.internalXContent(xContentBuilder, null); @@ -103,13 +106,14 @@ public void testXContentGeneration() throws IOException { assertTrue(json.contains("\"min_timestamp\":" + MIN_TIMESTAMP)); assertTrue(json.contains("\"max_timestamp\":" + MAX_TIMESTAMP)); assertTrue(json.contains("\"include_value_stats\":true")); + assertTrue(json.contains("\"include_head_stats\":true")); } public void testXContentParsing() throws IOException { // Full round-trip: parse with all fields String json = String.format( Locale.ROOT, - "{\"min_timestamp\":%d,\"max_timestamp\":%d,\"include_value_stats\":true}", + "{\"min_timestamp\":%d,\"max_timestamp\":%d,\"include_value_stats\":true,\"include_head_stats\":true}", MIN_TIMESTAMP, MAX_TIMESTAMP ); @@ -122,12 +126,13 @@ public void testXContentParsing() throws IOException { assertEquals(MIN_TIMESTAMP, parsed.getMinTimestamp()); assertEquals(MAX_TIMESTAMP, parsed.getMaxTimestamp()); assertTrue(parsed.isIncludeValueStats()); + assertTrue(parsed.isIncludeHeadStats()); } - // With include_value_stats=false + // With include_value_stats=false, include_head_stats=false String json2 = String.format( Locale.ROOT, - "{\"min_timestamp\":%d,\"max_timestamp\":%d,\"include_value_stats\":false}", + "{\"min_timestamp\":%d,\"max_timestamp\":%d,\"include_value_stats\":false,\"include_head_stats\":false}", MIN_TIMESTAMP, MAX_TIMESTAMP ); @@ -136,12 +141,18 @@ public void testXContentParsing() throws IOException { parser.nextToken(); TSDBStatsAggregationBuilder parsed = TSDBStatsAggregationBuilder.parse(TEST_NAME, parser); assertFalse(parsed.isIncludeValueStats()); + assertFalse(parsed.isIncludeHeadStats()); } } public void testXContentParsingMissingRequiredFields() throws IOException { // Missing min_timestamp - try (XContentParser parser = createParser(XContentType.JSON.xContent(), "{\"max_timestamp\":2000,\"include_value_stats\":true}")) { + try ( + XContentParser parser = createParser( + XContentType.JSON.xContent(), + "{\"max_timestamp\":2000,\"include_value_stats\":true,\"include_head_stats\":true}" + ) + ) { parser.nextToken(); IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, @@ -151,7 +162,12 @@ public void testXContentParsingMissingRequiredFields() throws IOException { } // Missing max_timestamp - try (XContentParser parser = createParser(XContentType.JSON.xContent(), "{\"min_timestamp\":1000,\"include_value_stats\":true}")) { + try ( + XContentParser parser = createParser( + XContentType.JSON.xContent(), + "{\"min_timestamp\":1000,\"include_value_stats\":true,\"include_head_stats\":true}" + ) + ) { parser.nextToken(); IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, @@ -161,7 +177,12 @@ public void testXContentParsingMissingRequiredFields() throws IOException { } // Missing include_value_stats - try (XContentParser parser = createParser(XContentType.JSON.xContent(), "{\"min_timestamp\":1000,\"max_timestamp\":2000}")) { + try ( + XContentParser parser = createParser( + XContentType.JSON.xContent(), + "{\"min_timestamp\":1000,\"max_timestamp\":2000,\"include_head_stats\":true}" + ) + ) { parser.nextToken(); IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, @@ -169,6 +190,21 @@ public void testXContentParsingMissingRequiredFields() throws IOException { ); assertTrue(ex.getMessage().contains("Required parameter 'include_value_stats' is missing")); } + + // Missing include_head_stats + try ( + XContentParser parser = createParser( + XContentType.JSON.xContent(), + "{\"min_timestamp\":1000,\"max_timestamp\":2000,\"include_value_stats\":true}" + ) + ) { + parser.nextToken(); + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> TSDBStatsAggregationBuilder.parse(TEST_NAME, parser) + ); + assertTrue(ex.getMessage().contains("Required parameter 'include_head_stats' is missing")); + } } public void testXContentParsingWithUnknownFields() throws IOException { @@ -176,7 +212,7 @@ public void testXContentParsingWithUnknownFields() throws IOException { String json = String.format( Locale.ROOT, "{\"min_timestamp\":%d,\"unknown_num\":99,\"max_timestamp\":%d," - + "\"unknown_bool\":false,\"include_value_stats\":true," + + "\"unknown_bool\":false,\"include_value_stats\":true,\"include_head_stats\":true," + "\"nested\":{\"a\":1},\"arr\":[1,2],\"unknown_str\":\"val\"}", MIN_TIMESTAMP, MAX_TIMESTAMP @@ -195,15 +231,29 @@ public void testXContentParsingWithUnknownFields() throws IOException { // ========== Equals and HashCode Tests ========== public void testEqualsAndHashCode() { - TSDBStatsAggregationBuilder builder1 = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true); - TSDBStatsAggregationBuilder builder2 = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true); - TSDBStatsAggregationBuilder differentValueStats = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, false); - TSDBStatsAggregationBuilder differentMax = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, 3000L, true); + TSDBStatsAggregationBuilder builder1 = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, true); + TSDBStatsAggregationBuilder builder2 = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, true); + TSDBStatsAggregationBuilder differentValueStats = new TSDBStatsAggregationBuilder( + TEST_NAME, + MIN_TIMESTAMP, + MAX_TIMESTAMP, + false, + true + ); + TSDBStatsAggregationBuilder differentHeadStats = new TSDBStatsAggregationBuilder( + TEST_NAME, + MIN_TIMESTAMP, + MAX_TIMESTAMP, + true, + false + ); + TSDBStatsAggregationBuilder differentMax = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, 3000L, true, true); // equals assertEquals(builder1, builder2); assertEquals(builder1, builder1); assertNotEquals(builder1, differentValueStats); + assertNotEquals(builder1, differentHeadStats); assertNotEquals(builder1, differentMax); assertNotEquals(builder1, null); assertNotEquals(builder1, new Object()); @@ -215,7 +265,7 @@ public void testEqualsAndHashCode() { // ========== Shallow Copy Tests ========== public void testShallowCopy() { - TSDBStatsAggregationBuilder original = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true); + TSDBStatsAggregationBuilder original = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, true); org.opensearch.search.aggregations.AggregatorFactories.Builder subFactoriesBuilder = new org.opensearch.search.aggregations.AggregatorFactories.Builder(); @@ -225,6 +275,7 @@ public void testShallowCopy() { assertEquals(original.getMinTimestamp(), copy.getMinTimestamp()); assertEquals(original.getMaxTimestamp(), copy.getMaxTimestamp()); assertEquals(original.isIncludeValueStats(), copy.isIncludeValueStats()); + assertEquals(original.isIncludeHeadStats(), copy.isIncludeHeadStats()); assertNotSame(original, copy); // With metadata diff --git a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java index 913ff0e86..860b8652e 100644 --- a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java +++ b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java @@ -146,7 +146,8 @@ private TSDBStatsAggregatorFactory createFactory(String name, long minTimestamp, Map.of(), // metadata minTimestamp, maxTimestamp, - includeValueStats + includeValueStats, + true // includeHeadStats - default to true for existing tests ); } catch (Exception e) { throw new RuntimeException("Failed to create factory for test", e); diff --git a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorTests.java b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorTests.java index aea3ca6cc..f21ad9257 100644 --- a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorTests.java +++ b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorTests.java @@ -57,7 +57,7 @@ public class TSDBStatsAggregatorTests extends OpenSearchTestCase { public void testBuildEmptyAggregationBehavior() throws IOException { // Test with includeValueStats=true - TSDBStatsAggregator aggregator1 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true); + TSDBStatsAggregator aggregator1 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, false); InternalAggregation result1 = aggregator1.buildEmptyAggregation(); assertNotNull("Empty aggregation should not be null", result1); @@ -72,7 +72,7 @@ public void testBuildEmptyAggregationBehavior() throws IOException { aggregator1.close(); // Test with includeValueStats=false - TSDBStatsAggregator aggregator2 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, false); + TSDBStatsAggregator aggregator2 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, false, false); InternalAggregation result2 = aggregator2.buildEmptyAggregation(); assertNotNull("Empty aggregation should not be null", result2); @@ -81,7 +81,7 @@ public void testBuildEmptyAggregationBehavior() throws IOException { aggregator2.close(); // Test multiple empty aggregations (idempotency) - TSDBStatsAggregator aggregator3 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true); + TSDBStatsAggregator aggregator3 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, false); InternalAggregation firstResult = aggregator3.buildEmptyAggregation(); InternalAggregation secondResult = aggregator3.buildEmptyAggregation(); assertEquals("Multiple calls should return equivalent results", firstResult, secondResult); @@ -90,15 +90,15 @@ public void testBuildEmptyAggregationBehavior() throws IOException { public void testResourceCleanup() throws IOException { // Test close with includeValueStats=true - TSDBStatsAggregator aggregator1 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true); + TSDBStatsAggregator aggregator1 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, false); aggregator1.close(); // Test close with includeValueStats=false (ordinalFingerprintSets is null) - TSDBStatsAggregator aggregator2 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, false); + TSDBStatsAggregator aggregator2 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, false, false); aggregator2.close(); // Test close after buildEmptyAggregation - TSDBStatsAggregator aggregator3 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true); + TSDBStatsAggregator aggregator3 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, false); aggregator3.buildEmptyAggregation(); aggregator3.close(); } @@ -106,14 +106,14 @@ public void testResourceCleanup() throws IOException { public void testAggregatorWithMetadata() throws IOException { // Test with metadata Map metadata = Map.of("key1", "value1", "key2", 42); - TSDBStatsAggregator aggregator1 = createAggregatorWithMetadata(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, metadata); + TSDBStatsAggregator aggregator1 = createAggregatorWithMetadata(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, false, metadata); InternalAggregation result1 = aggregator1.buildEmptyAggregation(); assertNotNull("Result should have metadata", result1.getMetadata()); assertEquals("Metadata should match", metadata, result1.getMetadata()); aggregator1.close(); // Test without metadata - TSDBStatsAggregator aggregator2 = createAggregatorWithMetadata(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, null); + TSDBStatsAggregator aggregator2 = createAggregatorWithMetadata(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, false, null); InternalAggregation result2 = aggregator2.buildEmptyAggregation(); assertNull("Result should have null metadata", result2.getMetadata()); aggregator2.close(); @@ -125,7 +125,7 @@ public void testLeafCollectorBehavior() throws IOException { // Test 1: Non-TSDB LeafReader should throw exception long minTimestamp = 1000L; long maxTimestamp = 5000L; - TSDBStatsAggregator aggregator1 = createAggregator("test", minTimestamp, maxTimestamp, true); + TSDBStatsAggregator aggregator1 = createAggregator("test", minTimestamp, maxTimestamp, true, false); Directory directory = new ByteBuffersDirectory(); IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig()); @@ -147,7 +147,7 @@ public void testLeafCollectorBehavior() throws IOException { // Test 2: Non-overlapping time range (should prune) long queryMinTimestamp = 1000L; long queryMaxTimestamp = 5000L; - TSDBStatsAggregator aggregator2 = createAggregator("test", queryMinTimestamp, queryMaxTimestamp, true); + TSDBStatsAggregator aggregator2 = createAggregator("test", queryMinTimestamp, queryMaxTimestamp, true, false); long leafMinTimestamp = 6000L; long leafMaxTimestamp = 10000L; @@ -165,7 +165,7 @@ public void testLeafCollectorBehavior() throws IOException { aggregator2.close(); // Test 3: Overlapping time range (should return new collector) - TSDBStatsAggregator aggregator3 = createAggregator("test", queryMinTimestamp, queryMaxTimestamp, true); + TSDBStatsAggregator aggregator3 = createAggregator("test", queryMinTimestamp, queryMaxTimestamp, true, false); long leafMinTimestamp2 = 2000L; long leafMaxTimestamp2 = 6000L; @@ -184,7 +184,7 @@ public void testLeafCollectorBehavior() throws IOException { aggregator3.close(); // Test 4: Exact boundary (no overlap - should prune) - TSDBStatsAggregator aggregator4 = createAggregator("test", queryMinTimestamp, queryMaxTimestamp, true); + TSDBStatsAggregator aggregator4 = createAggregator("test", queryMinTimestamp, queryMaxTimestamp, true, false); long leafMinTimestamp3 = 0L; long leafMaxTimestamp3 = 999L; @@ -208,7 +208,7 @@ public void testCollectWithSingleSeries() throws IOException { // Arrange long minTimestamp = 1000L; long maxTimestamp = 5000L; - TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true); + TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, false); Map labels = new HashMap<>(); labels.put("service", "api"); @@ -250,7 +250,7 @@ public void testCollectWithDuplicateSeries() throws IOException { // Arrange: two segments with the same labels → same stableHash → second should be deduped long minTimestamp = 1000L; long maxTimestamp = 5000L; - TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true); + TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, false); // Same labels = same stableHash = dedup Map sameLabels = Map.of("service", "api"); @@ -296,7 +296,7 @@ public void testBuildAggregationWithValueStats() throws IOException { // Arrange long minTimestamp = 1000L; long maxTimestamp = 5000L; - TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true); + TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, false); Map labels = Map.of("service", "api", "region", "us-west"); @@ -327,7 +327,7 @@ public void testBuildAggregationWithoutValueStats() throws IOException { // Arrange long minTimestamp = 1000L; long maxTimestamp = 5000L; - TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, false); + TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, false, false); Map labels = Map.of("service", "api"); @@ -359,7 +359,7 @@ public void testFullCollectionLifecycle() throws IOException { // Arrange: two different series with different labels long minTimestamp = 1000L; long maxTimestamp = 5000L; - TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true); + TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, false); Map labels1 = Map.of("service", "api", "host", "server1"); Map labels2 = Map.of("service", "web", "host", "server2"); @@ -411,7 +411,7 @@ public void testCollectWithDuplicateLabelValues() throws IOException { // To get different stableHash (avoid dedup) but share a label value, use different labels overall. long minTimestamp = 1000L; long maxTimestamp = 5000L; - TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true); + TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, false); // Two series that share "service:api" but differ on another label → different stableHash, same ordinal for "service:api" Map labels1 = Map.of("service", "api", "host", "server1"); @@ -449,6 +449,103 @@ public void testCollectWithDuplicateLabelValues() throws IOException { } } + // ========== HeadStats Collection Tests ========== + + public void testCollectHeadStatsFromLiveSeriesIndexLeafReader() throws IOException { + // Arrange + long minTimestamp = 1000L; + long maxTimestamp = 5000L; + TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, true); + + Map labels = Map.of("service", "api"); + + AggregatorTestUtils.TSDBLeafReaderWithContext readerCtx = AggregatorTestUtils.createLiveSeriesIndexLeafReaderWithLabels( + minTimestamp, + maxTimestamp, + 12345L, + labels + ); + + try { + // Act + LeafBucketCollector collector = aggregator.getLeafCollector(readerCtx.context, LeafBucketCollector.NO_OP_COLLECTOR); + collector.collect(0, 0); + InternalAggregation result = aggregator.buildAggregation(0); + + // Assert + InternalTSDBStats stats = (InternalTSDBStats) result; + assertNotNull("Stats should not be null", stats); + // HeadStats should be populated since we used LiveSeriesIndexLeafReader with includeHeadStats=true + assertNotNull("HeadStats should be populated for LiveSeriesIndexLeafReader", stats.getHeadStats()); + assertEquals("HeadStats numSeries should match numDocs", 1L, stats.getHeadStats().numSeries()); + assertEquals("HeadStats minTime should match reader minTimestamp", minTimestamp, stats.getHeadStats().minTime()); + // LiveSeriesIndexLeafReader sets maxTimestamp to Long.MAX_VALUE (live/open-ended) + assertEquals("HeadStats maxTime should match LiveSeriesIndexLeafReader max", Long.MAX_VALUE, stats.getHeadStats().maxTime()); + } finally { + readerCtx.close(); + aggregator.close(); + } + } + + public void testNoHeadStatsForClosedChunkReader() throws IOException { + // Arrange - ClosedChunkIndex reader should NOT populate HeadStats + long minTimestamp = 1000L; + long maxTimestamp = 5000L; + TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, true); + + Map labels = Map.of("service", "api"); + + AggregatorTestUtils.TSDBLeafReaderWithContext readerCtx = AggregatorTestUtils.createMockTSDBLeafReaderWithLabels( + minTimestamp, + maxTimestamp, + labels + ); + + try { + LeafBucketCollector collector = aggregator.getLeafCollector(readerCtx.context, LeafBucketCollector.NO_OP_COLLECTOR); + collector.collect(0, 0); + InternalAggregation result = aggregator.buildAggregation(0); + + InternalTSDBStats stats = (InternalTSDBStats) result; + assertNotNull("Stats should not be null", stats); + // HeadStats should NOT be populated for non-LiveSeriesIndex readers + assertNull("HeadStats should be null for ClosedChunkIndex reader", stats.getHeadStats()); + } finally { + readerCtx.close(); + aggregator.close(); + } + } + + public void testNoHeadStatsWhenIncludeHeadStatsFalse() throws IOException { + // Arrange - Even LiveSeriesIndexLeafReader should NOT populate HeadStats when includeHeadStats=false + long minTimestamp = 1000L; + long maxTimestamp = 5000L; + TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, false); + + Map labels = Map.of("service", "api"); + + AggregatorTestUtils.TSDBLeafReaderWithContext readerCtx = AggregatorTestUtils.createLiveSeriesIndexLeafReaderWithLabels( + minTimestamp, + maxTimestamp, + 12345L, + labels + ); + + try { + LeafBucketCollector collector = aggregator.getLeafCollector(readerCtx.context, LeafBucketCollector.NO_OP_COLLECTOR); + collector.collect(0, 0); + InternalAggregation result = aggregator.buildAggregation(0); + + InternalTSDBStats stats = (InternalTSDBStats) result; + assertNotNull("Stats should not be null", stats); + // HeadStats should be null because includeHeadStats=false + assertNull("HeadStats should be null when includeHeadStats=false", stats.getHeadStats()); + } finally { + readerCtx.close(); + aggregator.close(); + } + } + // ========== Helper Methods ========== /** @@ -468,9 +565,14 @@ private InternalTSDBStats reduceToCoordinator(InternalAggregation shardResult) { return (InternalTSDBStats) shardResult.reduce(List.of(shardResult), reduceContext); } - private TSDBStatsAggregator createAggregator(String name, long minTimestamp, long maxTimestamp, boolean includeValueStats) - throws IOException { - return createAggregatorWithMetadata(name, minTimestamp, maxTimestamp, includeValueStats, Map.of()); + private TSDBStatsAggregator createAggregator( + String name, + long minTimestamp, + long maxTimestamp, + boolean includeValueStats, + boolean includeHeadStats + ) throws IOException { + return createAggregatorWithMetadata(name, minTimestamp, maxTimestamp, includeValueStats, includeHeadStats, Map.of()); } private TSDBStatsAggregator createAggregatorWithMetadata( @@ -478,6 +580,7 @@ private TSDBStatsAggregator createAggregatorWithMetadata( long minTimestamp, long maxTimestamp, boolean includeValueStats, + boolean includeHeadStats, Map metadata ) throws IOException { CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService(); @@ -486,6 +589,15 @@ private TSDBStatsAggregator createAggregatorWithMetadata( SearchContext searchContext = mock(SearchContext.class); when(searchContext.bigArrays()).thenReturn(bigArrays); - return new TSDBStatsAggregator(name, searchContext, null, minTimestamp, maxTimestamp, includeValueStats, metadata); + return new TSDBStatsAggregator( + name, + searchContext, + null, + minTimestamp, + maxTimestamp, + includeValueStats, + includeHeadStats, + metadata + ); } } From 8331508ac9297d4877321e2ef80685e003196f22 Mon Sep 17 00:00:00 2001 From: Wenting Wang Date: Wed, 4 Mar 2026 15:49:00 -0800 Subject: [PATCH 2/6] fix test coverage Signed-off-by: Wenting Wang --- .../integrationTests/TSDBStatsRestIT.java | 4 - .../query/aggregator/InternalTSDBStats.java | 17 +-- .../TSDBStatsAggregationBuilder.java | 4 +- .../query/aggregator/TSDBStatsAggregator.java | 4 +- .../tsdb/query/rest/RestTSDBStatsAction.java | 2 - .../query/rest/TSDBStatsResponseListener.java | 3 +- .../aggregator/InternalTSDBStatsTests.java | 143 +++++++++++++++++- .../TSDBStatsAggregationBuilderTests.java | 9 +- .../TSDBStatsAggregatorFactoryTests.java | 45 +++++- .../rest/TSDBStatsResponseListenerTests.java | 15 +- 10 files changed, 189 insertions(+), 57 deletions(-) diff --git a/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java b/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java index 9f9116914..dc4d097a1 100644 --- a/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java +++ b/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java @@ -57,7 +57,6 @@ public void testBasicEndpoint() throws Exception { { "headStats": { "numSeries": 10, - "chunkCount": 0, "minTime": 0, "maxTime": 9223372036854775807 }, @@ -282,7 +281,6 @@ public void testFormatOptions() throws Exception { { "headStats": { "numSeries": 10, - "chunkCount": 0, "minTime": 0, "maxTime": 9223372036854775807 }, @@ -362,7 +360,6 @@ public void testFormatOptions() throws Exception { { "headStats": { "numSeries": 10, - "chunkCount": 0, "minTime": 0, "maxTime": 9223372036854775807 }, @@ -430,7 +427,6 @@ public void testQueryFiltering() throws Exception { { "headStats": { "numSeries": 10, - "chunkCount": 0, "minTime": 0, "maxTime": 9223372036854775807 }, diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java b/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java index be4afcbf9..fd50a7442 100644 --- a/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java +++ b/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java @@ -61,11 +61,10 @@ public class InternalTSDBStats extends InternalAggregation { * Statistics for the head (in-memory time series). * * @param numSeries the number of active time series in the head - * @param chunkCount the total number of memory chunks currently held * @param minTime the minimum sample timestamp present in the head * @param maxTime the maximum sample timestamp present in the head */ - public record HeadStats(long numSeries, long chunkCount, long minTime, long maxTime) { + public record HeadStats(long numSeries, long minTime, long maxTime) { /** * Deserializes a {@code HeadStats} instance from a stream. @@ -74,7 +73,7 @@ public record HeadStats(long numSeries, long chunkCount, long minTime, long maxT * @throws IOException if an I/O error occurs during reading */ public HeadStats(StreamInput in) throws IOException { - this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong()); + this(in.readVLong(), in.readLong(), in.readLong()); } /** @@ -85,9 +84,8 @@ public HeadStats(StreamInput in) throws IOException { */ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(numSeries); - out.writeVLong(chunkCount); - out.writeVLong(minTime); - out.writeVLong(maxTime); + out.writeLong(minTime); + out.writeLong(maxTime); } } @@ -615,7 +613,7 @@ private static class LabelStatsBuilder { } /** - * Merges HeadStats from multiple aggregations by summing numSeries and chunkCount, + * Merges HeadStats from multiple aggregations by summing numSeries, * taking the minimum minTime, and taking the maximum maxTime. * * @param aggregations the list of aggregations containing HeadStats to merge @@ -623,7 +621,6 @@ private static class LabelStatsBuilder { */ static HeadStats mergeHeadStats(List aggregations) { long totalNumSeries = 0; - long totalChunkCount = 0; long minTime = Long.MAX_VALUE; long maxTime = Long.MIN_VALUE; boolean hasAny = false; @@ -633,7 +630,6 @@ static HeadStats mergeHeadStats(List aggregations) { if (stats.headStats != null) { hasAny = true; totalNumSeries += stats.headStats.numSeries(); - totalChunkCount += stats.headStats.chunkCount(); if (stats.headStats.minTime() < minTime) { minTime = stats.headStats.minTime(); } @@ -643,7 +639,7 @@ static HeadStats mergeHeadStats(List aggregations) { } } - return hasAny ? new HeadStats(totalNumSeries, totalChunkCount, minTime, maxTime) : null; + return hasAny ? new HeadStats(totalNumSeries, minTime, maxTime) : null; } /** @@ -716,7 +712,6 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th if (headStats != null) { builder.startObject("headStats"); builder.field("numSeries", headStats.numSeries()); - builder.field("chunkCount", headStats.chunkCount()); builder.field("minTime", headStats.minTime()); builder.field("maxTime", headStats.maxTime()); builder.endObject(); diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java index ea24f8cc6..42a59838c 100644 --- a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java +++ b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java @@ -185,9 +185,7 @@ public static TSDBStatsAggregationBuilder parse(String aggregationName, XContent ); } if (includeHeadStats == null) { - throw new IllegalArgumentException( - "Required parameter 'include_head_stats' is missing for aggregation '" + aggregationName + "'" - ); + includeHeadStats = false; } return new TSDBStatsAggregationBuilder(aggregationName, minTimestamp, maxTimestamp, includeValueStats, includeHeadStats); diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java index add830de1..5029193e2 100644 --- a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java +++ b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java @@ -69,7 +69,6 @@ public class TSDBStatsAggregator extends MetricsAggregator { // HeadStats accumulator: tracks stats from LiveSeriesIndexLeafReader segments private long headNumSeries; - private long headChunkCount; private long headMinTime; private long headMaxTime; private boolean hasHeadStats; @@ -112,7 +111,6 @@ public TSDBStatsAggregator( // Initialize HeadStats accumulator this.headNumSeries = 0; - this.headChunkCount = 0; this.headMinTime = Long.MAX_VALUE; this.headMaxTime = Long.MIN_VALUE; this.hasHeadStats = false; @@ -242,7 +240,7 @@ public InternalAggregation buildAggregation(long bucket) throws IOException { // Build HeadStats if any LiveSeriesIndexLeafReader segments were encountered InternalTSDBStats.HeadStats headStats = hasHeadStats - ? new InternalTSDBStats.HeadStats(headNumSeries, headChunkCount, headMinTime, headMaxTime) + ? new InternalTSDBStats.HeadStats(headNumSeries, headMinTime, headMaxTime) : null; // Return using factory method diff --git a/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java b/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java index 366c350aa..2c753bfb7 100644 --- a/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java +++ b/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java @@ -95,7 +95,6 @@ * { * "headStats": { * "numSeries": 508, - * "chunkCount": 937, * "minTime": 1591516800000, * "maxTime": 1598896800143 * }, @@ -130,7 +129,6 @@ * { * "headStats": { * "numSeries": 508, - * "chunkCount": 937, * "minTime": 1591516800000, * "maxTime": 1598896800143 * }, diff --git a/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java b/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java index 53ea7e112..c90cab74c 100644 --- a/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java +++ b/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java @@ -42,7 +42,7 @@ public class TSDBStatsResponseListener extends RestToXContentListener labelStats = createTestLabelStats(); // Act @@ -273,7 +273,7 @@ public void testForCoordinatorLevelXContent() throws IOException { // Test all XContent variations in one test // Variation 1: With all fields - InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(508L, 937L, 1591516800000L, 1598896800143L); + InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(508L, 1591516800000L, 1598896800143L); Map labelStats = new HashMap<>(); labelStats.put("cluster", new InternalTSDBStats.CoordinatorLevelStats.LabelStats(100L, Map.of("prod", 80L, "staging", 15L))); @@ -417,7 +417,7 @@ public void testGetPropertyBehavior() { } public void testEqualsAndHashCode() { - InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 200L, 1000L, 2000L); + InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 1000L, 2000L); Map labelStats = createTestLabelStats(); InternalTSDBStats stats1 = InternalTSDBStats.forCoordinatorLevel( @@ -460,7 +460,7 @@ public void testEqualsAndHashCode() { stats1, InternalTSDBStats.forCoordinatorLevel( TEST_NAME, - new InternalTSDBStats.HeadStats(999L, 200L, 1000L, 2000L), + new InternalTSDBStats.HeadStats(999L, 1000L, 2000L), new InternalTSDBStats.CoordinatorLevelStats(500L, labelStats), TEST_METADATA ) @@ -515,7 +515,7 @@ public void testTopLevelSerializationRoundTrip() throws IOException { assertNull(shardDeserialized.getHeadStats()); // Scenario 2: Coordinator-level with headStats - InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 200L, 1000L, 2000L); + InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 1000L, 2000L); InternalTSDBStats coordOriginal = InternalTSDBStats.forCoordinatorLevel( TEST_NAME, headStats, @@ -983,6 +983,137 @@ public void testReduceShardLevelWithEmptySeriesIdsForValue() { assertEquals(1L, resultLabelStats.get("cluster").valuesStats().get("staging").longValue()); } + // ============================================================================================ + // Section 9: mergeHeadStats — unit tests and HeadStats propagation through reduce + // ============================================================================================ + + public void testMergeHeadStatsWithMultipleInputs() { + // All three aggregations have HeadStats; verify sum, min, and max are taken correctly. + InternalTSDBStats agg1 = InternalTSDBStats.forCoordinatorLevel( + TEST_NAME, + new InternalTSDBStats.HeadStats(100L, 1000L, 3000L), + new InternalTSDBStats.CoordinatorLevelStats(100L, new HashMap<>()), + TEST_METADATA + ); + InternalTSDBStats agg2 = InternalTSDBStats.forCoordinatorLevel( + TEST_NAME, + new InternalTSDBStats.HeadStats(200L, 500L, 4000L), // smallest minTime, largest maxTime + new InternalTSDBStats.CoordinatorLevelStats(200L, new HashMap<>()), + TEST_METADATA + ); + InternalTSDBStats agg3 = InternalTSDBStats.forCoordinatorLevel( + TEST_NAME, + new InternalTSDBStats.HeadStats(50L, 2000L, 2500L), // middle minTime, smallest maxTime + new InternalTSDBStats.CoordinatorLevelStats(50L, new HashMap<>()), + TEST_METADATA + ); + + InternalTSDBStats.HeadStats merged = InternalTSDBStats.mergeHeadStats(List.of(agg1, agg2, agg3)); + + assertNotNull("mergeHeadStats should be non-null when any input has HeadStats", merged); + assertEquals("numSeries should be sum of all inputs", 350L, merged.numSeries()); + assertEquals("minTime should be minimum across all inputs", 500L, merged.minTime()); + assertEquals("maxTime should be maximum across all inputs", 4000L, merged.maxTime()); + } + + public void testMergeHeadStatsWithMixedNullAndNonNull() { + // One aggregation has HeadStats, one does not; only non-null contributes. + InternalTSDBStats withHead = InternalTSDBStats.forCoordinatorLevel( + TEST_NAME, + new InternalTSDBStats.HeadStats(75L, 2000L, 5000L), + new InternalTSDBStats.CoordinatorLevelStats(75L, new HashMap<>()), + TEST_METADATA + ); + InternalTSDBStats noHead = InternalTSDBStats.forCoordinatorLevel( + TEST_NAME, + null, + new InternalTSDBStats.CoordinatorLevelStats(100L, new HashMap<>()), + TEST_METADATA + ); + + InternalTSDBStats.HeadStats merged = InternalTSDBStats.mergeHeadStats(List.of(withHead, noHead)); + + assertNotNull("mergeHeadStats should be non-null when at least one input has HeadStats", merged); + assertEquals(75L, merged.numSeries()); + assertEquals(2000L, merged.minTime()); + assertEquals(5000L, merged.maxTime()); + } + + public void testMergeHeadStatsWithAllNull() { + InternalTSDBStats agg1 = InternalTSDBStats.forCoordinatorLevel( + TEST_NAME, + null, + new InternalTSDBStats.CoordinatorLevelStats(100L, new HashMap<>()), + TEST_METADATA + ); + InternalTSDBStats agg2 = InternalTSDBStats.forCoordinatorLevel( + TEST_NAME, + null, + new InternalTSDBStats.CoordinatorLevelStats(200L, new HashMap<>()), + TEST_METADATA + ); + + InternalTSDBStats.HeadStats merged = InternalTSDBStats.mergeHeadStats(List.of(agg1, agg2)); + + assertNull("mergeHeadStats should return null when all inputs have null HeadStats", merged); + } + + public void testMergeHeadStatsEmptyList() { + InternalTSDBStats.HeadStats merged = InternalTSDBStats.mergeHeadStats(List.of()); + assertNull("mergeHeadStats should return null for empty list", merged); + } + + public void testReduceShardLevelWithHeadStats() { + // HeadStats from shard-level aggregations should be merged (sum, min, max) in reduceShardLevel. + InternalTSDBStats agg1 = InternalTSDBStats.forShardLevel( + TEST_NAME, + new InternalTSDBStats.HeadStats(10L, 1000L, 5000L), + new InternalTSDBStats.ShardLevelStats(setOf(1L, 2L), new HashMap<>(), true), + TEST_METADATA + ); + InternalTSDBStats agg2 = InternalTSDBStats.forShardLevel( + TEST_NAME, + new InternalTSDBStats.HeadStats(20L, 500L, 6000L), + new InternalTSDBStats.ShardLevelStats(setOf(3L, 4L), new HashMap<>(), true), + TEST_METADATA + ); + + InternalAggregation result = agg1.reduce(List.of(agg1, agg2), createPartialReduceContext()); + InternalTSDBStats reducedStats = (InternalTSDBStats) result; + + InternalTSDBStats.HeadStats headStats = reducedStats.getHeadStats(); + assertNotNull("HeadStats should be merged from shard-level aggregations", headStats); + assertEquals("numSeries should be sum", 30L, headStats.numSeries()); + assertEquals("minTime should be minimum", 500L, headStats.minTime()); + assertEquals("maxTime should be maximum", 6000L, headStats.maxTime()); + } + + public void testReduceCoordinatorLevelWithHeadStats() { + // HeadStats should survive the coordinator-level reduce phase. + InternalTSDBStats agg1 = InternalTSDBStats.forCoordinatorLevel( + TEST_NAME, + new InternalTSDBStats.HeadStats(10L, 1000L, 5000L), + new InternalTSDBStats.CoordinatorLevelStats(100L, new HashMap<>()), + TEST_METADATA + ); + InternalTSDBStats agg2 = InternalTSDBStats.forCoordinatorLevel( + TEST_NAME, + new InternalTSDBStats.HeadStats(20L, 500L, 6000L), + new InternalTSDBStats.CoordinatorLevelStats(200L, new HashMap<>()), + TEST_METADATA + ); + + InternalAggregation result = agg1.reduce(List.of(agg1, agg2), createFinalReduceContext()); + InternalTSDBStats reducedStats = (InternalTSDBStats) result; + + InternalTSDBStats.HeadStats headStats = reducedStats.getHeadStats(); + assertNotNull("HeadStats should be preserved through coordinator-level reduce", headStats); + assertEquals(30L, headStats.numSeries()); + assertEquals(500L, headStats.minTime()); + assertEquals(6000L, headStats.maxTime()); + assertEquals(300L, reducedStats.getNumSeries().longValue()); + } + // ============================================================================================ // Helper Methods // ============================================================================================ diff --git a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java index 6a95d1c7b..eea273ce7 100644 --- a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java +++ b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java @@ -191,7 +191,7 @@ public void testXContentParsingMissingRequiredFields() throws IOException { assertTrue(ex.getMessage().contains("Required parameter 'include_value_stats' is missing")); } - // Missing include_head_stats + // Missing include_head_stats: should default to false (not throw) try ( XContentParser parser = createParser( XContentType.JSON.xContent(), @@ -199,11 +199,8 @@ public void testXContentParsingMissingRequiredFields() throws IOException { ) ) { parser.nextToken(); - IllegalArgumentException ex = expectThrows( - IllegalArgumentException.class, - () -> TSDBStatsAggregationBuilder.parse(TEST_NAME, parser) - ); - assertTrue(ex.getMessage().contains("Required parameter 'include_head_stats' is missing")); + TSDBStatsAggregationBuilder parsed = TSDBStatsAggregationBuilder.parse(TEST_NAME, parser); + assertFalse("include_head_stats should default to false when not provided", parsed.isIncludeHeadStats()); } } diff --git a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java index 860b8652e..210ad582e 100644 --- a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java +++ b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java @@ -33,8 +33,8 @@ public class TSDBStatsAggregatorFactoryTests extends OpenSearchTestCase { public void testSupportsConcurrentSegmentSearch() { // Arrange - Test with both true and false for includeValueStats - TSDBStatsAggregatorFactory factory1 = createFactory("test_css_true", 1000L, 2000L, true); - TSDBStatsAggregatorFactory factory2 = createFactory("test_css_false", 1000L, 2000L, false); + TSDBStatsAggregatorFactory factory1 = createFactory("test_css_true", 1000L, 2000L, true, false); + TSDBStatsAggregatorFactory factory2 = createFactory("test_css_false", 1000L, 2000L, false, false); // Act & Assert - CSS support should be unconditional assertTrue("TSDBStatsAggregatorFactory should support CSS", factory1.supportsConcurrentSegmentSearch()); @@ -50,7 +50,7 @@ public void testFactoryConfiguration() { long maxTimestamp = 10000L; // Act - TSDBStatsAggregatorFactory factory = createFactory("config_test", minTimestamp, maxTimestamp, true); + TSDBStatsAggregatorFactory factory = createFactory("config_test", minTimestamp, maxTimestamp, true, true); // Assert - Verify factory was created successfully with correct name assertNotNull("Factory should be created successfully", factory); @@ -62,7 +62,7 @@ public void testCreateInternal() throws Exception { long minTimestamp = 1000L; long maxTimestamp = 5000L; boolean includeValueStats = true; - TSDBStatsAggregatorFactory factory = createFactory("test_create", minTimestamp, maxTimestamp, includeValueStats); + TSDBStatsAggregatorFactory factory = createFactory("test_create", minTimestamp, maxTimestamp, includeValueStats, false); // Create mock SearchContext SearchContext searchContext = mock(SearchContext.class); @@ -86,7 +86,7 @@ public void testCreateInternal() throws Exception { public void testCreateInternalWithDifferentParameters() throws Exception { // Arrange - Test with includeValueStats=false and custom metadata - TSDBStatsAggregatorFactory factory = createFactory("test_no_values", 2000L, 6000L, false); + TSDBStatsAggregatorFactory factory = createFactory("test_no_values", 2000L, 6000L, false, false); SearchContext searchContext = mock(SearchContext.class); when(searchContext.bigArrays()).thenReturn(new BigArrays(null, new NoneCircuitBreakerService(), "test")); @@ -109,7 +109,7 @@ public void testCreateInternalWithDifferentParameters() throws Exception { public void testCreateInternalWithExtremeTimestamps() throws Exception { // Arrange - Test edge case with extreme timestamp values - TSDBStatsAggregatorFactory factory = createFactory("test_extreme", Long.MIN_VALUE, Long.MAX_VALUE, true); + TSDBStatsAggregatorFactory factory = createFactory("test_extreme", Long.MIN_VALUE, Long.MAX_VALUE, true, true); SearchContext searchContext = mock(SearchContext.class); when(searchContext.bigArrays()).thenReturn(new BigArrays(null, new NoneCircuitBreakerService(), "test")); @@ -129,11 +129,40 @@ public void testCreateInternalWithExtremeTimestamps() throws Exception { aggregator.close(); } + public void testCreateInternalWithHeadStatsDisabled() throws Exception { + // Arrange - explicitly disable head stats collection + TSDBStatsAggregatorFactory factory = createFactory("test_no_head", 1000L, 5000L, true, false); + + SearchContext searchContext = mock(SearchContext.class); + when(searchContext.bigArrays()).thenReturn(new BigArrays(null, new NoneCircuitBreakerService(), "test")); + + // Act + TSDBStatsAggregator aggregator = (TSDBStatsAggregator) factory.createInternal( + searchContext, + null, + CardinalityUpperBound.NONE, + Map.of() + ); + + // Assert + assertNotNull("Should create aggregator with head stats disabled", aggregator); + assertEquals("test_no_head", aggregator.name()); + + // Cleanup + aggregator.close(); + } + /** * Helper method to create TSDBStatsAggregatorFactory with minimal parameters. * Uses null for complex OpenSearch infrastructure components that aren't needed for basic tests. */ - private TSDBStatsAggregatorFactory createFactory(String name, long minTimestamp, long maxTimestamp, boolean includeValueStats) { + private TSDBStatsAggregatorFactory createFactory( + String name, + long minTimestamp, + long maxTimestamp, + boolean includeValueStats, + boolean includeHeadStats + ) { try { // Create an empty AggregatorFactories.Builder to satisfy the constructor AggregatorFactories.Builder subFactoriesBuilder = new AggregatorFactories.Builder(); @@ -147,7 +176,7 @@ private TSDBStatsAggregatorFactory createFactory(String name, long minTimestamp, minTimestamp, maxTimestamp, includeValueStats, - true // includeHeadStats - default to true for existing tests + includeHeadStats ); } catch (Exception e) { throw new RuntimeException("Failed to create factory for test", e); diff --git a/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java b/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java index 98c1ba64c..48e1b1703 100644 --- a/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java +++ b/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java @@ -37,16 +37,11 @@ public class TSDBStatsResponseListenerTests extends OpenSearchTestCase { // ========== Shared Test Fixtures ========== - /** HeadStats: 508 series, 937 chunks */ - private static final InternalTSDBStats.HeadStats HEAD_STATS = new InternalTSDBStats.HeadStats( - 508L, - 937L, - 1591516800000L, - 1598896800143L - ); + /** HeadStats: 508 series */ + private static final InternalTSDBStats.HeadStats HEAD_STATS = new InternalTSDBStats.HeadStats(508L, 1591516800000L, 1598896800143L); /** HeadStats: small numbers for simple tests */ - private static final InternalTSDBStats.HeadStats SIMPLE_HEAD_STATS = new InternalTSDBStats.HeadStats(100L, 200L, 1000L, 2000L); + private static final InternalTSDBStats.HeadStats SIMPLE_HEAD_STATS = new InternalTSDBStats.HeadStats(100L, 1000L, 2000L); /** Full stats: headStats + 2 labels (name, cluster) with multiple values, numSeries=25644 */ private static InternalTSDBStats createFullStats() { @@ -149,7 +144,6 @@ public void testGroupedFormatWithAllFields() throws IOException { { "headStats": { "numSeries": 508, - "chunkCount": 937, "minTime": 1591516800000, "maxTime": 1598896800143 }, @@ -232,7 +226,6 @@ public void testFlatFormatWithAllFields() throws IOException { { "headStats": { "numSeries": 508, - "chunkCount": 937, "minTime": 1591516800000, "maxTime": 1598896800143 }, @@ -324,7 +317,6 @@ public void testIncludeOnlyHeadStats() throws IOException { { "headStats": { "numSeries": 100, - "chunkCount": 200, "minTime": 1000, "maxTime": 2000 } @@ -357,7 +349,6 @@ public void testIncludeLabelStatsAndAll() throws IOException { { "headStats": { "numSeries": 100, - "chunkCount": 200, "minTime": 1000, "maxTime": 2000 }, From df14bdbe03937f94e6cec3794bc341654bc57915 Mon Sep 17 00:00:00 2001 From: Wenting Wang Date: Thu, 5 Mar 2026 15:27:35 -0800 Subject: [PATCH 3/6] add numChunks and fix integrationTest Signed-off-by: Wenting Wang --- .../framework/RestTSDBEngineIngestor.java | 18 +++++++++- .../integrationTests/TSDBStatsRestIT.java | 35 +++++++++++++++---- .../index/live/LiveSeriesIndexLeafReader.java | 19 ++++++++++ .../query/aggregator/InternalTSDBStats.java | 23 +++++------- .../query/aggregator/TSDBStatsAggregator.java | 12 +++++-- .../query/rest/TSDBStatsResponseListener.java | 2 ++ .../aggregator/InternalTSDBStatsTests.java | 32 +++++++++-------- .../rest/TSDBStatsResponseListenerTests.java | 13 +++++-- 8 files changed, 112 insertions(+), 42 deletions(-) diff --git a/src/javaRestTest/java/org/opensearch/tsdb/framework/RestTSDBEngineIngestor.java b/src/javaRestTest/java/org/opensearch/tsdb/framework/RestTSDBEngineIngestor.java index 83e8770a9..8fa4a190a 100644 --- a/src/javaRestTest/java/org/opensearch/tsdb/framework/RestTSDBEngineIngestor.java +++ b/src/javaRestTest/java/org/opensearch/tsdb/framework/RestTSDBEngineIngestor.java @@ -22,6 +22,9 @@ import org.opensearch.tsdb.utils.TSDBTestUtils; import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -61,6 +64,7 @@ public record RestTSDBEngineIngestor(RestClient restClient) { * Ingest time series data from multiple input data configurations into their respective indices. * Each InputDataConfig specifies which index its data should go to. * Uses bulk API with batching for efficient ingestion. + * Sorted all time series samples by timestamp for the same index, to avoid ooo event cutoff * * @param inputDataConfigs List of input data configurations with their target indices * @throws IOException If ingestion fails @@ -70,12 +74,24 @@ public void ingestDataToMultipleIndices(List inputDataConfigs) return; } + // Collect all samples per index, then sort each index's samples by timestamp. + // Sorting ensures that samples across multiple time series are interleaved chronologically + // before ingestion. Without this, series are emitted one-at-a-time: a shard may process + // series1 up to its maxTime (e.g. 07:00) and then receive series2's first sample (e.g. 00:00), + // which falls outside a short OOO cutoff window and gets rejected. + Map> samplesByIndex = new HashMap<>(); for (InputDataConfig inputDataConfig : inputDataConfigs) { if (inputDataConfig.indexName() == null || inputDataConfig.indexName().isEmpty()) { throw new IllegalArgumentException("InputDataConfig must specify an index_name for data ingestion"); } List samples = TimeSeriesSampleGenerator.generateSamples(inputDataConfig); - ingestInBulk(samples, inputDataConfig.indexName(), DEFAULT_BULK_SIZE); + samplesByIndex.computeIfAbsent(inputDataConfig.indexName(), k -> new ArrayList<>()).addAll(samples); + } + + for (Map.Entry> entry : samplesByIndex.entrySet()) { + List samples = entry.getValue(); + samples.sort(Comparator.comparing(TimeSeriesSample::timestamp)); + ingestInBulk(samples, entry.getKey(), DEFAULT_BULK_SIZE); } } diff --git a/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java b/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java index dc4d097a1..9a7a344eb 100644 --- a/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java +++ b/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java @@ -33,6 +33,19 @@ public class TSDBStatsRestIT extends RestTimeSeriesTestFramework { private static final String TEST_DATA_YAML = "test_cases/tsdb_stats_rest_it.yaml"; private boolean dataLoaded = false; + // OVERRIDE max_closeable_chunks_per_chunk_range_percentage = 100, otherwise only 15% of closed chunk will be dropped + // This is to test numChunks in HeadStats + private static final String CUSTOMIZED_INDEX_SETTINGS_YAML = """ + index.refresh_interval: "1s" + index.tsdb_engine.enabled: true + index.tsdb_engine.labels.storage_type: binary + index.tsdb_engine.lang.m3.default_step_size: "10s" + index.tsdb_engine.max_closeable_chunks_per_chunk_range_percentage: 100 + index.queries.cache.enabled: false + index.requests.cache.enable: false + index.translog.durability: async + index.translog.sync_interval: "1s" + """; /** * Lazily initialize test data on first use for each test. @@ -40,8 +53,11 @@ public class TSDBStatsRestIT extends RestTimeSeriesTestFramework { */ private void ensureDataLoaded() throws Exception { if (!dataLoaded) { - initializeTest(TEST_DATA_YAML); + initializeTest(TEST_DATA_YAML, CUSTOMIZED_INDEX_SETTINGS_YAML); setupTest(); + // Force CCI creation and MemChunk drop + Request flushRequest = new Request("POST", "/tsdb_stats_test/_flush?force=true"); + client().performRequest(flushRequest); dataLoaded = true; } } @@ -53,11 +69,13 @@ private void ensureDataLoaded() throws Exception { public void testBasicEndpoint() throws Exception { ensureDataLoaded(); + // "numChunks": 20, because 1 MemChunk(20m OOO cutoff window) + 1 open MemChunk String expectedJson = """ { "headStats": { "numSeries": 10, - "minTime": 0, + "numChunks": 20, + "minTime": 1735713600000, "maxTime": 9223372036854775807 }, "labelStats": { @@ -133,7 +151,7 @@ public void testBasicEndpoint() throws Exception { // Test POST request returns same result Request postRequest = new Request("POST", "/_tsdb/stats"); postRequest.addParameter("start", "1735689600000"); - postRequest.addParameter("end", "1735707600000"); + postRequest.addParameter("end", "1735714800000"); postRequest.setJsonEntity("{\"query\": \"fetch name:*\"}"); Response postResponse = client().performRequest(postRequest); @@ -281,7 +299,8 @@ public void testFormatOptions() throws Exception { { "headStats": { "numSeries": 10, - "minTime": 0, + "numChunks": 20, + "minTime": 1735713600000, "maxTime": 9223372036854775807 }, "labelStats": { @@ -360,7 +379,8 @@ public void testFormatOptions() throws Exception { { "headStats": { "numSeries": 10, - "minTime": 0, + "numChunks": 20, + "minTime": 1735713600000, "maxTime": 9223372036854775807 }, "seriesCountByMetricName": [ @@ -426,8 +446,9 @@ public void testQueryFiltering() throws Exception { String expectedJson = """ { "headStats": { - "numSeries": 10, - "minTime": 0, + "numSeries": 5, + "numChunks": 10, + "minTime": 1735713600000, "maxTime": 9223372036854775807 }, "labelStats": { diff --git a/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java b/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java index 4d7148ab7..dded2ca78 100644 --- a/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java +++ b/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java @@ -135,6 +135,25 @@ public List chunksForDoc(int docId, TSDBDocValues tsdbDocValues) return chunkIterators; } + /** + * Returns the number of MemChunks(open+OOO_cutoff) for the series corresponding to the given document. + * + * @param docId the document ID to look up + * @param tsdbDocValues the doc values reader for this leaf + * @return number of MemChunks for the series, or 0 if the doc has no series reference + * @throws IOException if an I/O error occurs + */ + public int numChunksForDoc(int docId, TSDBDocValues tsdbDocValues) throws IOException { + NumericDocValues seriesRefValue = tsdbDocValues.getChunkRefDocValues(); + if (!seriesRefValue.advanceExact(docId)) { + return 0; + } + long seriesRef = seriesRefValue.longValue(); + int total = memChunkReader.getChunks(seriesRef).size(); + int mmaped = mMappedChunks.getOrDefault(seriesRef, Collections.emptySet()).size(); + return total - mmaped; + } + @Override public Labels labelsForDoc(int docId, TSDBDocValues tsdbDocValues) throws IOException { return tsdbDocValues.getLabelsStorage().readLabels(docId); diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java b/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java index fd50a7442..37e01a358 100644 --- a/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java +++ b/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java @@ -61,29 +61,19 @@ public class InternalTSDBStats extends InternalAggregation { * Statistics for the head (in-memory time series). * * @param numSeries the number of active time series in the head + * @param numChunks total MemChunks across all series (≥ numSeries due to OOO or not-yet-dropped chunks) * @param minTime the minimum sample timestamp present in the head * @param maxTime the maximum sample timestamp present in the head */ - public record HeadStats(long numSeries, long minTime, long maxTime) { + public record HeadStats(long numSeries, long numChunks, long minTime, long maxTime) { - /** - * Deserializes a {@code HeadStats} instance from a stream. - * - * @param in the stream input to read from - * @throws IOException if an I/O error occurs during reading - */ public HeadStats(StreamInput in) throws IOException { - this(in.readVLong(), in.readLong(), in.readLong()); + this(in.readVLong(), in.readVLong(), in.readLong(), in.readLong()); } - /** - * Serializes this {@code HeadStats} instance to a stream. - * - * @param out the stream output to write to - * @throws IOException if an I/O error occurs during writing - */ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(numSeries); + out.writeVLong(numChunks); out.writeLong(minTime); out.writeLong(maxTime); } @@ -621,6 +611,7 @@ private static class LabelStatsBuilder { */ static HeadStats mergeHeadStats(List aggregations) { long totalNumSeries = 0; + long totalNumChunks = 0; long minTime = Long.MAX_VALUE; long maxTime = Long.MIN_VALUE; boolean hasAny = false; @@ -630,6 +621,7 @@ static HeadStats mergeHeadStats(List aggregations) { if (stats.headStats != null) { hasAny = true; totalNumSeries += stats.headStats.numSeries(); + totalNumChunks += stats.headStats.numChunks(); if (stats.headStats.minTime() < minTime) { minTime = stats.headStats.minTime(); } @@ -639,7 +631,7 @@ static HeadStats mergeHeadStats(List aggregations) { } } - return hasAny ? new HeadStats(totalNumSeries, minTime, maxTime) : null; + return hasAny ? new HeadStats(totalNumSeries, totalNumChunks, minTime, maxTime) : null; } /** @@ -712,6 +704,7 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th if (headStats != null) { builder.startObject("headStats"); builder.field("numSeries", headStats.numSeries()); + builder.field("numChunks", headStats.numChunks()); builder.field("minTime", headStats.minTime()); builder.field("maxTime", headStats.maxTime()); builder.endObject(); diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java index 5029193e2..38da2b138 100644 --- a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java +++ b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java @@ -69,6 +69,7 @@ public class TSDBStatsAggregator extends MetricsAggregator { // HeadStats accumulator: tracks stats from LiveSeriesIndexLeafReader segments private long headNumSeries; + private long headNumChunks; private long headMinTime; private long headMaxTime; private boolean hasHeadStats; @@ -111,6 +112,7 @@ public TSDBStatsAggregator( // Initialize HeadStats accumulator this.headNumSeries = 0; + this.headNumChunks = 0; this.headMinTime = Long.MAX_VALUE; this.headMaxTime = Long.MIN_VALUE; this.hasHeadStats = false; @@ -131,8 +133,6 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol // Collect HeadStats from LiveSeriesIndexLeafReader segments if (includeHeadStats && tsdbLeafReader instanceof LiveSeriesIndexLeafReader) { hasHeadStats = true; - // LiveSeriesIndex guarantees one doc per series, so numDocs == numSeries - headNumSeries += ctx.reader().numDocs(); long leafMinTime = tsdbLeafReader.getMinIndexTimestamp(); long leafMaxTime = tsdbLeafReader.getMaxIndexTimestamp(); if (leafMinTime < headMinTime) { @@ -172,6 +172,12 @@ public void collect(int doc, long bucket) throws IOException { return; } + // Accumulate per-doc HeadStats for live series segments + if (includeHeadStats && tsdbLeafReader instanceof LiveSeriesIndexLeafReader) { + headNumSeries++; + headNumChunks += ((LiveSeriesIndexLeafReader) tsdbLeafReader).numChunksForDoc(doc, tsdbDocValues); + } + // TODO process each label using BytesRef directly (avoid String conversion) BytesRef[] keyValuePairs = labels.toKeyValueBytesRefs(); for (BytesRef keyValue : keyValuePairs) { @@ -240,7 +246,7 @@ public InternalAggregation buildAggregation(long bucket) throws IOException { // Build HeadStats if any LiveSeriesIndexLeafReader segments were encountered InternalTSDBStats.HeadStats headStats = hasHeadStats - ? new InternalTSDBStats.HeadStats(headNumSeries, headMinTime, headMaxTime) + ? new InternalTSDBStats.HeadStats(headNumSeries, headNumChunks, headMinTime, headMaxTime) : null; // Return using factory method diff --git a/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java b/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java index c90cab74c..2f28a8bfe 100644 --- a/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java +++ b/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java @@ -42,6 +42,7 @@ public class TSDBStatsResponseListener extends RestToXContentListener labelStats = createTestLabelStats(); // Act @@ -273,7 +273,7 @@ public void testForCoordinatorLevelXContent() throws IOException { // Test all XContent variations in one test // Variation 1: With all fields - InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(508L, 1591516800000L, 1598896800143L); + InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(508L, 508L, 1591516800000L, 1598896800143L); Map labelStats = new HashMap<>(); labelStats.put("cluster", new InternalTSDBStats.CoordinatorLevelStats.LabelStats(100L, Map.of("prod", 80L, "staging", 15L))); @@ -417,7 +417,7 @@ public void testGetPropertyBehavior() { } public void testEqualsAndHashCode() { - InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 1000L, 2000L); + InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 100L, 1000L, 2000L); Map labelStats = createTestLabelStats(); InternalTSDBStats stats1 = InternalTSDBStats.forCoordinatorLevel( @@ -460,7 +460,7 @@ public void testEqualsAndHashCode() { stats1, InternalTSDBStats.forCoordinatorLevel( TEST_NAME, - new InternalTSDBStats.HeadStats(999L, 1000L, 2000L), + new InternalTSDBStats.HeadStats(999L, 999L, 1000L, 2000L), new InternalTSDBStats.CoordinatorLevelStats(500L, labelStats), TEST_METADATA ) @@ -515,7 +515,7 @@ public void testTopLevelSerializationRoundTrip() throws IOException { assertNull(shardDeserialized.getHeadStats()); // Scenario 2: Coordinator-level with headStats - InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 1000L, 2000L); + InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 100L, 1000L, 2000L); InternalTSDBStats coordOriginal = InternalTSDBStats.forCoordinatorLevel( TEST_NAME, headStats, @@ -991,19 +991,19 @@ public void testMergeHeadStatsWithMultipleInputs() { // All three aggregations have HeadStats; verify sum, min, and max are taken correctly. InternalTSDBStats agg1 = InternalTSDBStats.forCoordinatorLevel( TEST_NAME, - new InternalTSDBStats.HeadStats(100L, 1000L, 3000L), + new InternalTSDBStats.HeadStats(100L, 50L, 1000L, 3000L), new InternalTSDBStats.CoordinatorLevelStats(100L, new HashMap<>()), TEST_METADATA ); InternalTSDBStats agg2 = InternalTSDBStats.forCoordinatorLevel( TEST_NAME, - new InternalTSDBStats.HeadStats(200L, 500L, 4000L), // smallest minTime, largest maxTime + new InternalTSDBStats.HeadStats(200L, 100L, 500L, 4000L), // smallest minTime, largest maxTime new InternalTSDBStats.CoordinatorLevelStats(200L, new HashMap<>()), TEST_METADATA ); InternalTSDBStats agg3 = InternalTSDBStats.forCoordinatorLevel( TEST_NAME, - new InternalTSDBStats.HeadStats(50L, 2000L, 2500L), // middle minTime, smallest maxTime + new InternalTSDBStats.HeadStats(50L, 25L, 2000L, 2500L), // middle minTime, smallest maxTime new InternalTSDBStats.CoordinatorLevelStats(50L, new HashMap<>()), TEST_METADATA ); @@ -1012,6 +1012,7 @@ public void testMergeHeadStatsWithMultipleInputs() { assertNotNull("mergeHeadStats should be non-null when any input has HeadStats", merged); assertEquals("numSeries should be sum of all inputs", 350L, merged.numSeries()); + assertEquals("numChunks should be sum of all inputs", 175L, merged.numChunks()); assertEquals("minTime should be minimum across all inputs", 500L, merged.minTime()); assertEquals("maxTime should be maximum across all inputs", 4000L, merged.maxTime()); } @@ -1020,7 +1021,7 @@ public void testMergeHeadStatsWithMixedNullAndNonNull() { // One aggregation has HeadStats, one does not; only non-null contributes. InternalTSDBStats withHead = InternalTSDBStats.forCoordinatorLevel( TEST_NAME, - new InternalTSDBStats.HeadStats(75L, 2000L, 5000L), + new InternalTSDBStats.HeadStats(75L, 40L, 2000L, 5000L), new InternalTSDBStats.CoordinatorLevelStats(75L, new HashMap<>()), TEST_METADATA ); @@ -1035,6 +1036,7 @@ public void testMergeHeadStatsWithMixedNullAndNonNull() { assertNotNull("mergeHeadStats should be non-null when at least one input has HeadStats", merged); assertEquals(75L, merged.numSeries()); + assertEquals(40L, merged.numChunks()); assertEquals(2000L, merged.minTime()); assertEquals(5000L, merged.maxTime()); } @@ -1067,13 +1069,13 @@ public void testReduceShardLevelWithHeadStats() { // HeadStats from shard-level aggregations should be merged (sum, min, max) in reduceShardLevel. InternalTSDBStats agg1 = InternalTSDBStats.forShardLevel( TEST_NAME, - new InternalTSDBStats.HeadStats(10L, 1000L, 5000L), + new InternalTSDBStats.HeadStats(10L, 10L, 1000L, 5000L), new InternalTSDBStats.ShardLevelStats(setOf(1L, 2L), new HashMap<>(), true), TEST_METADATA ); InternalTSDBStats agg2 = InternalTSDBStats.forShardLevel( TEST_NAME, - new InternalTSDBStats.HeadStats(20L, 500L, 6000L), + new InternalTSDBStats.HeadStats(20L, 20L, 500L, 6000L), new InternalTSDBStats.ShardLevelStats(setOf(3L, 4L), new HashMap<>(), true), TEST_METADATA ); @@ -1084,6 +1086,7 @@ public void testReduceShardLevelWithHeadStats() { InternalTSDBStats.HeadStats headStats = reducedStats.getHeadStats(); assertNotNull("HeadStats should be merged from shard-level aggregations", headStats); assertEquals("numSeries should be sum", 30L, headStats.numSeries()); + assertEquals("numChunks should be sum", 30L, headStats.numChunks()); assertEquals("minTime should be minimum", 500L, headStats.minTime()); assertEquals("maxTime should be maximum", 6000L, headStats.maxTime()); } @@ -1092,13 +1095,13 @@ public void testReduceCoordinatorLevelWithHeadStats() { // HeadStats should survive the coordinator-level reduce phase. InternalTSDBStats agg1 = InternalTSDBStats.forCoordinatorLevel( TEST_NAME, - new InternalTSDBStats.HeadStats(10L, 1000L, 5000L), + new InternalTSDBStats.HeadStats(10L, 10L, 1000L, 5000L), new InternalTSDBStats.CoordinatorLevelStats(100L, new HashMap<>()), TEST_METADATA ); InternalTSDBStats agg2 = InternalTSDBStats.forCoordinatorLevel( TEST_NAME, - new InternalTSDBStats.HeadStats(20L, 500L, 6000L), + new InternalTSDBStats.HeadStats(20L, 20L, 500L, 6000L), new InternalTSDBStats.CoordinatorLevelStats(200L, new HashMap<>()), TEST_METADATA ); @@ -1109,6 +1112,7 @@ public void testReduceCoordinatorLevelWithHeadStats() { InternalTSDBStats.HeadStats headStats = reducedStats.getHeadStats(); assertNotNull("HeadStats should be preserved through coordinator-level reduce", headStats); assertEquals(30L, headStats.numSeries()); + assertEquals(30L, headStats.numChunks()); assertEquals(500L, headStats.minTime()); assertEquals(6000L, headStats.maxTime()); assertEquals(300L, reducedStats.getNumSeries().longValue()); diff --git a/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java b/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java index 48e1b1703..f8ce57b42 100644 --- a/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java +++ b/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java @@ -38,10 +38,15 @@ public class TSDBStatsResponseListenerTests extends OpenSearchTestCase { // ========== Shared Test Fixtures ========== /** HeadStats: 508 series */ - private static final InternalTSDBStats.HeadStats HEAD_STATS = new InternalTSDBStats.HeadStats(508L, 1591516800000L, 1598896800143L); + private static final InternalTSDBStats.HeadStats HEAD_STATS = new InternalTSDBStats.HeadStats( + 508L, + 508L, + 1591516800000L, + 1598896800143L + ); /** HeadStats: small numbers for simple tests */ - private static final InternalTSDBStats.HeadStats SIMPLE_HEAD_STATS = new InternalTSDBStats.HeadStats(100L, 1000L, 2000L); + private static final InternalTSDBStats.HeadStats SIMPLE_HEAD_STATS = new InternalTSDBStats.HeadStats(100L, 100L, 1000L, 2000L); /** Full stats: headStats + 2 labels (name, cluster) with multiple values, numSeries=25644 */ private static InternalTSDBStats createFullStats() { @@ -144,6 +149,7 @@ public void testGroupedFormatWithAllFields() throws IOException { { "headStats": { "numSeries": 508, + "numChunks": 508, "minTime": 1591516800000, "maxTime": 1598896800143 }, @@ -226,6 +232,7 @@ public void testFlatFormatWithAllFields() throws IOException { { "headStats": { "numSeries": 508, + "numChunks": 508, "minTime": 1591516800000, "maxTime": 1598896800143 }, @@ -317,6 +324,7 @@ public void testIncludeOnlyHeadStats() throws IOException { { "headStats": { "numSeries": 100, + "numChunks": 100, "minTime": 1000, "maxTime": 2000 } @@ -349,6 +357,7 @@ public void testIncludeLabelStatsAndAll() throws IOException { { "headStats": { "numSeries": 100, + "numChunks": 100, "minTime": 1000, "maxTime": 2000 }, From 8170d0ea7ebd6f63efd13d9b595ab1b3ed3b551f Mon Sep 17 00:00:00 2001 From: Wenting Wang Date: Thu, 5 Mar 2026 15:52:28 -0800 Subject: [PATCH 4/6] triggers test again Signed-off-by: Wenting Wang From 5bea82aada2e019bc34def5575f8d9f52f71eb8b Mon Sep 17 00:00:00 2001 From: Wenting Wang Date: Thu, 5 Mar 2026 16:07:37 -0800 Subject: [PATCH 5/6] refactor and add unit test Signed-off-by: Wenting Wang --- .../index/live/LiveSeriesIndexLeafReader.java | 12 +++-- .../live/LiveSeriesIndexLeafReaderTests.java | 44 +++++++++++++++++++ 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java b/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java index dded2ca78..e5bc6bc91 100644 --- a/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java +++ b/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java @@ -149,9 +149,15 @@ public int numChunksForDoc(int docId, TSDBDocValues tsdbDocValues) throws IOExce return 0; } long seriesRef = seriesRefValue.longValue(); - int total = memChunkReader.getChunks(seriesRef).size(); - int mmaped = mMappedChunks.getOrDefault(seriesRef, Collections.emptySet()).size(); - return total - mmaped; + int numChunks = 0; + Set chunksToFilter = mMappedChunks.getOrDefault(seriesRef, Collections.emptySet()); + List memChunks = memChunkReader.getChunks(seriesRef); // get all memchunks for the series + for (MemChunk memChunk : memChunks) { + if (!chunksToFilter.contains(memChunk)) { + numChunks++; + } + } + return numChunks; } @Override diff --git a/src/test/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReaderTests.java b/src/test/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReaderTests.java index 85eb252f5..664a98b75 100644 --- a/src/test/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReaderTests.java +++ b/src/test/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReaderTests.java @@ -622,6 +622,50 @@ public void testMMappedChunksFilteringEdgeCases() throws IOException { } } + public void testNumChunksForDocWithMMappedChunks() throws IOException { + // 3 chunks total, 2 mMapped → should count only the 1 remaining in-memory chunk + MemChunk chunk1 = new MemChunk(1L, 1000L, 2000L, null, org.opensearch.tsdb.core.chunk.Encoding.XOR); + MemChunk chunk2 = new MemChunk(2L, 2000L, 3000L, null, org.opensearch.tsdb.core.chunk.Encoding.XOR); + MemChunk chunk3 = new MemChunk(3L, 3000L, 4000L, null, org.opensearch.tsdb.core.chunk.Encoding.XOR); + referenceToChunkMap.put(810L, List.of(chunk1, chunk2, chunk3)); + mMappedChunks.put(810L, java.util.Set.of(chunk1, chunk3)); // chunk1 and chunk3 are mMapped + + createTestDocument(810L, "partial_mmapped_metric", "server2", "region2"); + indexWriter.commit(); + + try (DirectoryReader reader = DirectoryReader.open(directory)) { + LiveSeriesIndexLeafReader leafReader = new LiveSeriesIndexLeafReader( + reader.leaves().getFirst().reader(), + memChunkReader, + mMappedChunks, + LabelStorageType.BINARY + ); + TSDBDocValues tsdbDocValues = leafReader.getTSDBDocValues(); + + assertEquals("Should count only chunk2 (the one not mMapped)", 1, leafReader.numChunksForDoc(0, tsdbDocValues)); + } + } + + public void testNumChunksForDocNoChunks() throws IOException { + // Series reference exists but has no chunks (empty list from memChunkReader) → should return 0 + referenceToChunkMap.put(830L, List.of()); + + createTestDocument(830L, "no_chunks_metric", "server4", "region4"); + indexWriter.commit(); + + try (DirectoryReader reader = DirectoryReader.open(directory)) { + LiveSeriesIndexLeafReader leafReader = new LiveSeriesIndexLeafReader( + reader.leaves().getFirst().reader(), + memChunkReader, + mMappedChunks, + LabelStorageType.BINARY + ); + TSDBDocValues tsdbDocValues = leafReader.getTSDBDocValues(); + + assertEquals("Should return 0 when series has no chunks", 0, leafReader.numChunksForDoc(0, tsdbDocValues)); + } + } + private void createTestDocument(long reference, String metricName, String host, String region) throws IOException { Document doc = new Document(); doc.add(new NumericDocValuesField(REFERENCE, reference)); From 43e06d4a7581eddfd1c3ce3aa0b0e720850aaa13 Mon Sep 17 00:00:00 2001 From: Wenting Wang Date: Wed, 11 Mar 2026 09:51:57 -0700 Subject: [PATCH 6/6] address comments Signed-off-by: Wenting Wang --- .../tsdb/query/aggregator/TSDBStatsAggregator.java | 11 +++++------ .../tsdb/query/rest/RestTSDBStatsAction.java | 1 + 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java index 38da2b138..22ef335fa 100644 --- a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java +++ b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java @@ -160,6 +160,11 @@ public TSDBStatsLeafBucketCollector(LeafReaderContext ctx, TSDBLeafReader tsdbLe @Override public void collect(int doc, long bucket) throws IOException { + // Accumulate per-doc HeadStats for live series segments + if (includeHeadStats && tsdbLeafReader instanceof LiveSeriesIndexLeafReader) { + headNumSeries++; + headNumChunks += ((LiveSeriesIndexLeafReader) tsdbLeafReader).numChunksForDoc(doc, tsdbDocValues); + } Labels labels = tsdbLeafReader.labelsForDoc(doc, tsdbDocValues); // We assume labels hash is the seriesId // This need to be changed when we start accepting reference/seriesId from indexing @@ -172,12 +177,6 @@ public void collect(int doc, long bucket) throws IOException { return; } - // Accumulate per-doc HeadStats for live series segments - if (includeHeadStats && tsdbLeafReader instanceof LiveSeriesIndexLeafReader) { - headNumSeries++; - headNumChunks += ((LiveSeriesIndexLeafReader) tsdbLeafReader).numChunksForDoc(doc, tsdbDocValues); - } - // TODO process each label using BytesRef directly (avoid String conversion) BytesRef[] keyValuePairs = labels.toKeyValueBytesRefs(); for (BytesRef keyValue : keyValuePairs) { diff --git a/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java b/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java index 2c753bfb7..f961076b2 100644 --- a/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java +++ b/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java @@ -95,6 +95,7 @@ * { * "headStats": { * "numSeries": 508, + * "numChunks": 937, * "minTime": 1591516800000, * "maxTime": 1598896800143 * },