From 0809d162b62b6a48e5a1ac163a856e8bfb3c9d1a Mon Sep 17 00:00:00 2001
From: Wenting Wang
Date: Thu, 19 Feb 2026 10:32:44 -0800
Subject: [PATCH 1/6] Populate HeadStats in _tsdb/stats endpoint
Collect HeadStats from LiveSeriesIndexLeafReader in TSDBStatsAggregator,
merge across shards by summing numSeries/chunkCount and taking min/max
of time bounds, and include in the endpoint response.
Co-Authored-By: Claude Opus 4.6 (1M context)
Signed-off-by: Wenting Wang
rebase for reviewing stacked diff, unit tests failed, will fix in the next commit
Signed-off-by: Wenting Wang
fix unit test due to rebase
Signed-off-by: Wenting Wang
add includeHeadStats in TSDBStatsAggregator
Signed-off-by: Wenting Wang
---
.../integrationTests/TSDBStatsRestIT.java | 24 +++
.../query/aggregator/InternalTSDBStats.java | 40 ++++-
.../TSDBStatsAggregationBuilder.java | 45 ++++-
.../query/aggregator/TSDBStatsAggregator.java | 41 ++++-
.../TSDBStatsAggregatorFactory.java | 17 +-
.../tsdb/query/rest/RestTSDBStatsAction.java | 10 +-
.../query/aggregator/AggregatorTestUtils.java | 52 ++++++
.../TSDBStatsAggregationBuilderTests.java | 93 ++++++++---
.../TSDBStatsAggregatorFactoryTests.java | 3 +-
.../aggregator/TSDBStatsAggregatorTests.java | 156 +++++++++++++++---
10 files changed, 426 insertions(+), 55 deletions(-)
diff --git a/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java b/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java
index 336e25950..9f9116914 100644
--- a/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java
+++ b/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java
@@ -55,6 +55,12 @@ public void testBasicEndpoint() throws Exception {
String expectedJson = """
{
+ "headStats": {
+ "numSeries": 10,
+ "chunkCount": 0,
+ "minTime": 0,
+ "maxTime": 9223372036854775807
+ },
"labelStats": {
"numSeries": 10,
"name": {
@@ -274,6 +280,12 @@ public void testFormatOptions() throws Exception {
// Grouped format - same as default/testTSDBStatsEndpointExists
String expectedGroupedJson = """
{
+ "headStats": {
+ "numSeries": 10,
+ "chunkCount": 0,
+ "minTime": 0,
+ "maxTime": 9223372036854775807
+ },
"labelStats": {
"numSeries": 10,
"name": {
@@ -348,6 +360,12 @@ public void testFormatOptions() throws Exception {
// Flat format has arrays sorted by count descending, then name ascending
String expectedFlatJson = """
{
+ "headStats": {
+ "numSeries": 10,
+ "chunkCount": 0,
+ "minTime": 0,
+ "maxTime": 9223372036854775807
+ },
"seriesCountByMetricName": [
{"name": "http_requests_total", "value": 6},
{"name": "db_connections", "value": 2},
@@ -410,6 +428,12 @@ public void testQueryFiltering() throws Exception {
// Filtered to service:api AND name:http_* (5 series - all api series have http_* names)
String expectedJson = """
{
+ "headStats": {
+ "numSeries": 10,
+ "chunkCount": 0,
+ "minTime": 0,
+ "maxTime": 9223372036854775807
+ },
"labelStats": {
"numSeries": 5,
"name": {
diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java b/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java
index d440ef092..be4afcbf9 100644
--- a/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java
+++ b/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java
@@ -280,6 +280,7 @@ public void writeTo(StreamOutput out) throws IOException {
* Factory method for creating shard-level stats (with seriesIds ).
*
* @param name the name of the aggregation
+ * @param headStats the head statistics (null if not populated)
* @param shardStats the shard-level statistics with seriesIds
* @param metadata the aggregation metadata
* @return InternalTSDBStats instance for shard-level phase
@@ -442,6 +443,9 @@ private InternalTSDBStats reduceShardLevel(List aggregation
Set mergedSeriesIds = new HashSet<>();
Map>> mergedLabelStats = new HashMap<>();
+ // Merge HeadStats from all shard-level aggregations
+ HeadStats mergedHeadStats = mergeHeadStats(aggregations);
+
// Capture global includeValueStats flag from first shard (all shards have same value)
boolean includeValueStats = false;
if (!aggregations.isEmpty()) {
@@ -538,7 +542,7 @@ private InternalTSDBStats reduceShardLevel(List aggregation
// Return coordinator-level stats (seriesId converted to counts to save network bandwidth)
CoordinatorLevelStats coordinatorStats = new CoordinatorLevelStats(totalSeries, finalLabelStats);
- return forCoordinatorLevel(name, null, coordinatorStats, metadata);
+ return forCoordinatorLevel(name, mergedHeadStats, coordinatorStats, metadata);
}
/**
@@ -548,7 +552,7 @@ private InternalTSDBStats reduceShardLevel(List aggregation
* is needed because each time series exists on only one shard (guaranteed by routing).
*/
private InternalTSDBStats reduceCoordinatorLevel(List aggregations) {
- HeadStats mergedHeadStats = null; // TODO: Merge HeadStats in future when populated
+ HeadStats mergedHeadStats = mergeHeadStats(aggregations);
Long totalSeries = null;
Map builders = new HashMap<>();
@@ -610,6 +614,38 @@ private static class LabelStatsBuilder {
Map valueCounts = new LinkedHashMap<>();
}
+ /**
+ * Merges HeadStats from multiple aggregations by summing numSeries and chunkCount,
+ * taking the minimum minTime, and taking the maximum maxTime.
+ *
+ * @param aggregations the list of aggregations containing HeadStats to merge
+ * @return merged HeadStats, or null if no aggregation had HeadStats
+ */
+ static HeadStats mergeHeadStats(List aggregations) {
+ long totalNumSeries = 0;
+ long totalChunkCount = 0;
+ long minTime = Long.MAX_VALUE;
+ long maxTime = Long.MIN_VALUE;
+ boolean hasAny = false;
+
+ for (InternalAggregation agg : aggregations) {
+ InternalTSDBStats stats = (InternalTSDBStats) agg;
+ if (stats.headStats != null) {
+ hasAny = true;
+ totalNumSeries += stats.headStats.numSeries();
+ totalChunkCount += stats.headStats.chunkCount();
+ if (stats.headStats.minTime() < minTime) {
+ minTime = stats.headStats.minTime();
+ }
+ if (stats.headStats.maxTime() > maxTime) {
+ maxTime = stats.headStats.maxTime();
+ }
+ }
+ }
+
+ return hasAny ? new HeadStats(totalNumSeries, totalChunkCount, minTime, maxTime) : null;
+ }
+
/**
* Retrieves a property value based on the given path.
*
diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java
index 441044952..ea24f8cc6 100644
--- a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java
+++ b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java
@@ -38,7 +38,7 @@
*
* Usage Example:
* {@code
- * TSDBStatsAggregationBuilder builder = new TSDBStatsAggregationBuilder("tsdb_stats", 1000000L, 2000000L, true);
+ * TSDBStatsAggregationBuilder builder = new TSDBStatsAggregationBuilder("tsdb_stats", 1000000L, 2000000L, true, true);
* }
*
* @since 0.0.1
@@ -50,6 +50,7 @@ public class TSDBStatsAggregationBuilder extends AbstractAggregationBuilder> ordinalToSeriesIdsMap;
+ // HeadStats accumulator: tracks stats from LiveSeriesIndexLeafReader segments
+ private long headNumSeries;
+ private long headChunkCount;
+ private long headMinTime;
+ private long headMaxTime;
+ private boolean hasHeadStats;
+
/**
* Creates a TSDB stats aggregator.
*
@@ -74,6 +83,7 @@ public class TSDBStatsAggregator extends MetricsAggregator {
* @param minTimestamp The minimum timestamp for filtering
* @param maxTimestamp The maximum timestamp for filtering
* @param includeValueStats Whether to include per-value statistics
+ * @param includeHeadStats Whether to include head (in-memory) statistics
* @param metadata The aggregation metadata
* @throws IOException If an error occurs during initialization
*/
@@ -84,12 +94,14 @@ public TSDBStatsAggregator(
long minTimestamp,
long maxTimestamp,
boolean includeValueStats,
+ boolean includeHeadStats,
Map metadata
) throws IOException {
super(name, context, parent, metadata);
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
this.includeValueStats = includeValueStats;
+ this.includeHeadStats = includeHeadStats;
this.seenSeriesIds = new HashSet<>();
@@ -97,6 +109,13 @@ public TSDBStatsAggregator(
this.labelValuePairOrdinalMap = new BytesRefHash(pool);
this.ordinalToSeriesIdsMap = includeValueStats ? new HashMap<>() : null;
+
+ // Initialize HeadStats accumulator
+ this.headNumSeries = 0;
+ this.headChunkCount = 0;
+ this.headMinTime = Long.MAX_VALUE;
+ this.headMaxTime = Long.MIN_VALUE;
+ this.hasHeadStats = false;
}
@Override
@@ -111,6 +130,21 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
return sub;
}
+ // Collect HeadStats from LiveSeriesIndexLeafReader segments
+ if (includeHeadStats && tsdbLeafReader instanceof LiveSeriesIndexLeafReader) {
+ hasHeadStats = true;
+ // LiveSeriesIndex guarantees one doc per series, so numDocs == numSeries
+ headNumSeries += ctx.reader().numDocs();
+ long leafMinTime = tsdbLeafReader.getMinIndexTimestamp();
+ long leafMaxTime = tsdbLeafReader.getMaxIndexTimestamp();
+ if (leafMinTime < headMinTime) {
+ headMinTime = leafMinTime;
+ }
+ if (leafMaxTime > headMaxTime) {
+ headMaxTime = leafMaxTime;
+ }
+ }
+
return new TSDBStatsLeafBucketCollector(ctx, tsdbLeafReader, sub);
}
@@ -206,8 +240,13 @@ public InternalAggregation buildAggregation(long bucket) throws IOException {
includeValueStats // Pass global flag so coordinator knows if value stats were collected
);
+ // Build HeadStats if any LiveSeriesIndexLeafReader segments were encountered
+ InternalTSDBStats.HeadStats headStats = hasHeadStats
+ ? new InternalTSDBStats.HeadStats(headNumSeries, headChunkCount, headMinTime, headMaxTime)
+ : null;
+
// Return using factory method
- return InternalTSDBStats.forShardLevel(name, null, shardStats, metadata());
+ return InternalTSDBStats.forShardLevel(name, headStats, shardStats, metadata());
}
@Override
diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactory.java b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactory.java
index f3ecb1f60..d68318200 100644
--- a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactory.java
+++ b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactory.java
@@ -44,6 +44,7 @@ public class TSDBStatsAggregatorFactory extends AggregatorFactory {
private final long minTimestamp;
private final long maxTimestamp;
private final boolean includeValueStats;
+ private final boolean includeHeadStats;
/**
* Creates a TSDB stats aggregator factory.
@@ -56,6 +57,7 @@ public class TSDBStatsAggregatorFactory extends AggregatorFactory {
* @param minTimestamp The minimum timestamp for filtering
* @param maxTimestamp The maximum timestamp for filtering
* @param includeValueStats Whether to include per-value statistics
+ * @param includeHeadStats Whether to include head (in-memory) statistics
* @throws IOException If an error occurs during initialization
*/
public TSDBStatsAggregatorFactory(
@@ -66,12 +68,14 @@ public TSDBStatsAggregatorFactory(
Map metadata,
long minTimestamp,
long maxTimestamp,
- boolean includeValueStats
+ boolean includeValueStats,
+ boolean includeHeadStats
) throws IOException {
super(name, queryShardContext, parent, subFactoriesBuilder, metadata);
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
this.includeValueStats = includeValueStats;
+ this.includeHeadStats = includeHeadStats;
}
@Override
@@ -81,7 +85,16 @@ public Aggregator createInternal(
CardinalityUpperBound cardinality,
Map metadata
) throws IOException {
- return new TSDBStatsAggregator(name, searchContext, parent, minTimestamp, maxTimestamp, includeValueStats, metadata);
+ return new TSDBStatsAggregator(
+ name,
+ searchContext,
+ parent,
+ minTimestamp,
+ maxTimestamp,
+ includeValueStats,
+ includeHeadStats,
+ metadata
+ );
}
@Override
diff --git a/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java b/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java
index fba92f292..366c350aa 100644
--- a/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java
+++ b/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java
@@ -401,13 +401,21 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
// Determine what statistics to include
boolean includeValueStats = includeOptions.contains(TSDBStatsConstants.INCLUDE_ALL)
|| includeOptions.contains(TSDBStatsConstants.INCLUDE_VALUE_STATS);
+ boolean includeHeadStats = includeOptions.contains(TSDBStatsConstants.INCLUDE_ALL)
+ || includeOptions.contains(TSDBStatsConstants.INCLUDE_HEAD_STATS);
try {
// Build QueryBuilder from the already-parsed FetchPlanNode
QueryBuilder filter = buildQueryFromFetch(fetchPlan, startMs, endMs);
// Build aggregation
- TSDBStatsAggregationBuilder aggBuilder = new TSDBStatsAggregationBuilder(AGGREGATION_NAME, startMs, endMs, includeValueStats);
+ TSDBStatsAggregationBuilder aggBuilder = new TSDBStatsAggregationBuilder(
+ AGGREGATION_NAME,
+ startMs,
+ endMs,
+ includeValueStats,
+ includeHeadStats
+ );
// Build search request
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(filter).aggregation(aggBuilder).size(0);
diff --git a/src/test/java/org/opensearch/tsdb/query/aggregator/AggregatorTestUtils.java b/src/test/java/org/opensearch/tsdb/query/aggregator/AggregatorTestUtils.java
index 212a1ff20..163c18b43 100644
--- a/src/test/java/org/opensearch/tsdb/query/aggregator/AggregatorTestUtils.java
+++ b/src/test/java/org/opensearch/tsdb/query/aggregator/AggregatorTestUtils.java
@@ -9,6 +9,7 @@
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.document.Document;
+import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CompositeReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
@@ -20,7 +21,12 @@
import org.apache.lucene.index.TermVectors;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
import org.opensearch.tsdb.core.chunk.ChunkIterator;
+import org.opensearch.tsdb.core.index.live.LiveSeriesIndexLeafReader;
+import org.opensearch.tsdb.core.index.live.MemChunkReader;
+import org.opensearch.tsdb.core.mapping.Constants;
+import org.opensearch.tsdb.core.mapping.LabelStorageType;
import org.opensearch.tsdb.core.model.ByteLabels;
import org.opensearch.tsdb.core.model.Labels;
import org.opensearch.tsdb.core.reader.TSDBDocValues;
@@ -32,6 +38,7 @@
import java.util.Map;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Shared test utilities for aggregator tests.
@@ -239,4 +246,49 @@ public long getSumTotalTermFreq(String field) throws IOException {
}
};
}
+
+ public static TSDBLeafReaderWithContext createLiveSeriesIndexLeafReaderWithLabels(
+ long minTimestamp,
+ long maxTimestamp,
+ long seriesId,
+ Map labelPairs
+ ) throws IOException {
+ Directory directory = new ByteBuffersDirectory();
+ IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig());
+
+ Document doc = new Document();
+ // Add REFERENCE field (LiveSeriesIndex field)
+ doc.add(new NumericDocValuesField(Constants.IndexSchema.REFERENCE, seriesId));
+ // Add labels as binary doc values for LabelStorageType.BINARY
+ Labels labels = ByteLabels.fromMap(labelPairs);
+ doc.add(
+ new org.apache.lucene.document.BinaryDocValuesField(
+ Constants.IndexSchema.LABELS,
+ new BytesRef(((ByteLabels) labels).getRawBytes())
+ )
+ );
+ indexWriter.addDocument(doc);
+ indexWriter.commit();
+
+ DirectoryReader tempReader = DirectoryReader.open(indexWriter);
+ LeafReader baseReader = tempReader.leaves().get(0).reader();
+
+ // Create a real LiveSeriesIndexLeafReader with mocked chunk reader
+ MemChunkReader mockChunkReader = mock(MemChunkReader.class);
+ when(mockChunkReader.getChunks(org.mockito.ArgumentMatchers.anyLong())).thenReturn(List.of());
+
+ LiveSeriesIndexLeafReader liveReader = new LiveSeriesIndexLeafReader(
+ baseReader,
+ mockChunkReader,
+ LabelStorageType.BINARY,
+ minTimestamp,
+ java.util.Collections.emptyMap()
+ );
+
+ CompositeReader compositeReader = createCompositeReaderWrapper(liveReader);
+ LeafReaderContext context = compositeReader.leaves().get(0);
+
+ return new TSDBLeafReaderWithContext(liveReader, context, compositeReader, tempReader, indexWriter, directory);
+ }
+
}
diff --git a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java
index bc91dc0cd..6a95d1c7b 100644
--- a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java
+++ b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java
@@ -35,12 +35,13 @@ public class TSDBStatsAggregationBuilderTests extends OpenSearchTestCase {
// ========== Constructor Tests ==========
public void testConstructorWithValidParameters() {
- TSDBStatsAggregationBuilder builder = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true);
+ TSDBStatsAggregationBuilder builder = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, true);
assertEquals(TEST_NAME, builder.getName());
assertEquals(MIN_TIMESTAMP, builder.getMinTimestamp());
assertEquals(MAX_TIMESTAMP, builder.getMaxTimestamp());
assertTrue(builder.isIncludeValueStats());
+ assertTrue(builder.isIncludeHeadStats());
assertEquals("tsdb_stats_agg", builder.getType());
assertEquals(AggregationBuilder.BucketCardinality.NONE, builder.bucketCardinality());
}
@@ -49,14 +50,14 @@ public void testConstructorRejectsInvalidTimeRange() {
// max < min
IllegalArgumentException ex1 = expectThrows(
IllegalArgumentException.class,
- () -> new TSDBStatsAggregationBuilder(TEST_NAME, 2000L, 1000L, true)
+ () -> new TSDBStatsAggregationBuilder(TEST_NAME, 2000L, 1000L, true, true)
);
assertTrue(ex1.getMessage().contains("maxTimestamp must be greater than minTimestamp"));
// max == min
IllegalArgumentException ex2 = expectThrows(
IllegalArgumentException.class,
- () -> new TSDBStatsAggregationBuilder(TEST_NAME, 1000L, 1000L, true)
+ () -> new TSDBStatsAggregationBuilder(TEST_NAME, 1000L, 1000L, true, true)
);
assertTrue(ex2.getMessage().contains("maxTimestamp must be greater than minTimestamp"));
}
@@ -64,8 +65,8 @@ public void testConstructorRejectsInvalidTimeRange() {
// ========== Serialization Tests ==========
public void testSerializationRoundTrip() throws IOException {
- // includeValueStats=true
- TSDBStatsAggregationBuilder original = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true);
+ // includeValueStats=true, includeHeadStats=true
+ TSDBStatsAggregationBuilder original = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, true);
TSDBStatsAggregationBuilder deserialized = serializeAndDeserialize(original);
assertEquals(original, deserialized);
@@ -74,16 +75,18 @@ public void testSerializationRoundTrip() throws IOException {
assertEquals(original.getMinTimestamp(), deserialized.getMinTimestamp());
assertEquals(original.getMaxTimestamp(), deserialized.getMaxTimestamp());
assertEquals(original.isIncludeValueStats(), deserialized.isIncludeValueStats());
+ assertEquals(original.isIncludeHeadStats(), deserialized.isIncludeHeadStats());
- // includeValueStats=false
- TSDBStatsAggregationBuilder original2 = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, false);
+ // includeValueStats=false, includeHeadStats=false
+ TSDBStatsAggregationBuilder original2 = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, false, false);
TSDBStatsAggregationBuilder deserialized2 = serializeAndDeserialize(original2);
assertEquals(original2, deserialized2);
assertFalse(deserialized2.isIncludeValueStats());
+ assertFalse(deserialized2.isIncludeHeadStats());
}
public void testSerializationWithEdgeCaseTimestamps() throws IOException {
- TSDBStatsAggregationBuilder original = new TSDBStatsAggregationBuilder(TEST_NAME, Long.MIN_VALUE, Long.MAX_VALUE, true);
+ TSDBStatsAggregationBuilder original = new TSDBStatsAggregationBuilder(TEST_NAME, Long.MIN_VALUE, Long.MAX_VALUE, true, true);
TSDBStatsAggregationBuilder deserialized = serializeAndDeserialize(original);
assertEquals(original, deserialized);
@@ -94,7 +97,7 @@ public void testSerializationWithEdgeCaseTimestamps() throws IOException {
// ========== XContent Tests ==========
public void testXContentGeneration() throws IOException {
- TSDBStatsAggregationBuilder builder = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true);
+ TSDBStatsAggregationBuilder builder = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, true);
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
builder.internalXContent(xContentBuilder, null);
@@ -103,13 +106,14 @@ public void testXContentGeneration() throws IOException {
assertTrue(json.contains("\"min_timestamp\":" + MIN_TIMESTAMP));
assertTrue(json.contains("\"max_timestamp\":" + MAX_TIMESTAMP));
assertTrue(json.contains("\"include_value_stats\":true"));
+ assertTrue(json.contains("\"include_head_stats\":true"));
}
public void testXContentParsing() throws IOException {
// Full round-trip: parse with all fields
String json = String.format(
Locale.ROOT,
- "{\"min_timestamp\":%d,\"max_timestamp\":%d,\"include_value_stats\":true}",
+ "{\"min_timestamp\":%d,\"max_timestamp\":%d,\"include_value_stats\":true,\"include_head_stats\":true}",
MIN_TIMESTAMP,
MAX_TIMESTAMP
);
@@ -122,12 +126,13 @@ public void testXContentParsing() throws IOException {
assertEquals(MIN_TIMESTAMP, parsed.getMinTimestamp());
assertEquals(MAX_TIMESTAMP, parsed.getMaxTimestamp());
assertTrue(parsed.isIncludeValueStats());
+ assertTrue(parsed.isIncludeHeadStats());
}
- // With include_value_stats=false
+ // With include_value_stats=false, include_head_stats=false
String json2 = String.format(
Locale.ROOT,
- "{\"min_timestamp\":%d,\"max_timestamp\":%d,\"include_value_stats\":false}",
+ "{\"min_timestamp\":%d,\"max_timestamp\":%d,\"include_value_stats\":false,\"include_head_stats\":false}",
MIN_TIMESTAMP,
MAX_TIMESTAMP
);
@@ -136,12 +141,18 @@ public void testXContentParsing() throws IOException {
parser.nextToken();
TSDBStatsAggregationBuilder parsed = TSDBStatsAggregationBuilder.parse(TEST_NAME, parser);
assertFalse(parsed.isIncludeValueStats());
+ assertFalse(parsed.isIncludeHeadStats());
}
}
public void testXContentParsingMissingRequiredFields() throws IOException {
// Missing min_timestamp
- try (XContentParser parser = createParser(XContentType.JSON.xContent(), "{\"max_timestamp\":2000,\"include_value_stats\":true}")) {
+ try (
+ XContentParser parser = createParser(
+ XContentType.JSON.xContent(),
+ "{\"max_timestamp\":2000,\"include_value_stats\":true,\"include_head_stats\":true}"
+ )
+ ) {
parser.nextToken();
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
@@ -151,7 +162,12 @@ public void testXContentParsingMissingRequiredFields() throws IOException {
}
// Missing max_timestamp
- try (XContentParser parser = createParser(XContentType.JSON.xContent(), "{\"min_timestamp\":1000,\"include_value_stats\":true}")) {
+ try (
+ XContentParser parser = createParser(
+ XContentType.JSON.xContent(),
+ "{\"min_timestamp\":1000,\"include_value_stats\":true,\"include_head_stats\":true}"
+ )
+ ) {
parser.nextToken();
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
@@ -161,7 +177,12 @@ public void testXContentParsingMissingRequiredFields() throws IOException {
}
// Missing include_value_stats
- try (XContentParser parser = createParser(XContentType.JSON.xContent(), "{\"min_timestamp\":1000,\"max_timestamp\":2000}")) {
+ try (
+ XContentParser parser = createParser(
+ XContentType.JSON.xContent(),
+ "{\"min_timestamp\":1000,\"max_timestamp\":2000,\"include_head_stats\":true}"
+ )
+ ) {
parser.nextToken();
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
@@ -169,6 +190,21 @@ public void testXContentParsingMissingRequiredFields() throws IOException {
);
assertTrue(ex.getMessage().contains("Required parameter 'include_value_stats' is missing"));
}
+
+ // Missing include_head_stats
+ try (
+ XContentParser parser = createParser(
+ XContentType.JSON.xContent(),
+ "{\"min_timestamp\":1000,\"max_timestamp\":2000,\"include_value_stats\":true}"
+ )
+ ) {
+ parser.nextToken();
+ IllegalArgumentException ex = expectThrows(
+ IllegalArgumentException.class,
+ () -> TSDBStatsAggregationBuilder.parse(TEST_NAME, parser)
+ );
+ assertTrue(ex.getMessage().contains("Required parameter 'include_head_stats' is missing"));
+ }
}
public void testXContentParsingWithUnknownFields() throws IOException {
@@ -176,7 +212,7 @@ public void testXContentParsingWithUnknownFields() throws IOException {
String json = String.format(
Locale.ROOT,
"{\"min_timestamp\":%d,\"unknown_num\":99,\"max_timestamp\":%d,"
- + "\"unknown_bool\":false,\"include_value_stats\":true,"
+ + "\"unknown_bool\":false,\"include_value_stats\":true,\"include_head_stats\":true,"
+ "\"nested\":{\"a\":1},\"arr\":[1,2],\"unknown_str\":\"val\"}",
MIN_TIMESTAMP,
MAX_TIMESTAMP
@@ -195,15 +231,29 @@ public void testXContentParsingWithUnknownFields() throws IOException {
// ========== Equals and HashCode Tests ==========
public void testEqualsAndHashCode() {
- TSDBStatsAggregationBuilder builder1 = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true);
- TSDBStatsAggregationBuilder builder2 = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true);
- TSDBStatsAggregationBuilder differentValueStats = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, false);
- TSDBStatsAggregationBuilder differentMax = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, 3000L, true);
+ TSDBStatsAggregationBuilder builder1 = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, true);
+ TSDBStatsAggregationBuilder builder2 = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, true);
+ TSDBStatsAggregationBuilder differentValueStats = new TSDBStatsAggregationBuilder(
+ TEST_NAME,
+ MIN_TIMESTAMP,
+ MAX_TIMESTAMP,
+ false,
+ true
+ );
+ TSDBStatsAggregationBuilder differentHeadStats = new TSDBStatsAggregationBuilder(
+ TEST_NAME,
+ MIN_TIMESTAMP,
+ MAX_TIMESTAMP,
+ true,
+ false
+ );
+ TSDBStatsAggregationBuilder differentMax = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, 3000L, true, true);
// equals
assertEquals(builder1, builder2);
assertEquals(builder1, builder1);
assertNotEquals(builder1, differentValueStats);
+ assertNotEquals(builder1, differentHeadStats);
assertNotEquals(builder1, differentMax);
assertNotEquals(builder1, null);
assertNotEquals(builder1, new Object());
@@ -215,7 +265,7 @@ public void testEqualsAndHashCode() {
// ========== Shallow Copy Tests ==========
public void testShallowCopy() {
- TSDBStatsAggregationBuilder original = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true);
+ TSDBStatsAggregationBuilder original = new TSDBStatsAggregationBuilder(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, true);
org.opensearch.search.aggregations.AggregatorFactories.Builder subFactoriesBuilder =
new org.opensearch.search.aggregations.AggregatorFactories.Builder();
@@ -225,6 +275,7 @@ public void testShallowCopy() {
assertEquals(original.getMinTimestamp(), copy.getMinTimestamp());
assertEquals(original.getMaxTimestamp(), copy.getMaxTimestamp());
assertEquals(original.isIncludeValueStats(), copy.isIncludeValueStats());
+ assertEquals(original.isIncludeHeadStats(), copy.isIncludeHeadStats());
assertNotSame(original, copy);
// With metadata
diff --git a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java
index 913ff0e86..860b8652e 100644
--- a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java
+++ b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java
@@ -146,7 +146,8 @@ private TSDBStatsAggregatorFactory createFactory(String name, long minTimestamp,
Map.of(), // metadata
minTimestamp,
maxTimestamp,
- includeValueStats
+ includeValueStats,
+ true // includeHeadStats - default to true for existing tests
);
} catch (Exception e) {
throw new RuntimeException("Failed to create factory for test", e);
diff --git a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorTests.java b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorTests.java
index aea3ca6cc..f21ad9257 100644
--- a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorTests.java
+++ b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorTests.java
@@ -57,7 +57,7 @@ public class TSDBStatsAggregatorTests extends OpenSearchTestCase {
public void testBuildEmptyAggregationBehavior() throws IOException {
// Test with includeValueStats=true
- TSDBStatsAggregator aggregator1 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true);
+ TSDBStatsAggregator aggregator1 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, false);
InternalAggregation result1 = aggregator1.buildEmptyAggregation();
assertNotNull("Empty aggregation should not be null", result1);
@@ -72,7 +72,7 @@ public void testBuildEmptyAggregationBehavior() throws IOException {
aggregator1.close();
// Test with includeValueStats=false
- TSDBStatsAggregator aggregator2 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, false);
+ TSDBStatsAggregator aggregator2 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, false, false);
InternalAggregation result2 = aggregator2.buildEmptyAggregation();
assertNotNull("Empty aggregation should not be null", result2);
@@ -81,7 +81,7 @@ public void testBuildEmptyAggregationBehavior() throws IOException {
aggregator2.close();
// Test multiple empty aggregations (idempotency)
- TSDBStatsAggregator aggregator3 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true);
+ TSDBStatsAggregator aggregator3 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, false);
InternalAggregation firstResult = aggregator3.buildEmptyAggregation();
InternalAggregation secondResult = aggregator3.buildEmptyAggregation();
assertEquals("Multiple calls should return equivalent results", firstResult, secondResult);
@@ -90,15 +90,15 @@ public void testBuildEmptyAggregationBehavior() throws IOException {
public void testResourceCleanup() throws IOException {
// Test close with includeValueStats=true
- TSDBStatsAggregator aggregator1 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true);
+ TSDBStatsAggregator aggregator1 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, false);
aggregator1.close();
// Test close with includeValueStats=false (ordinalFingerprintSets is null)
- TSDBStatsAggregator aggregator2 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, false);
+ TSDBStatsAggregator aggregator2 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, false, false);
aggregator2.close();
// Test close after buildEmptyAggregation
- TSDBStatsAggregator aggregator3 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true);
+ TSDBStatsAggregator aggregator3 = createAggregator(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, false);
aggregator3.buildEmptyAggregation();
aggregator3.close();
}
@@ -106,14 +106,14 @@ public void testResourceCleanup() throws IOException {
public void testAggregatorWithMetadata() throws IOException {
// Test with metadata
Map metadata = Map.of("key1", "value1", "key2", 42);
- TSDBStatsAggregator aggregator1 = createAggregatorWithMetadata(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, metadata);
+ TSDBStatsAggregator aggregator1 = createAggregatorWithMetadata(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, false, metadata);
InternalAggregation result1 = aggregator1.buildEmptyAggregation();
assertNotNull("Result should have metadata", result1.getMetadata());
assertEquals("Metadata should match", metadata, result1.getMetadata());
aggregator1.close();
// Test without metadata
- TSDBStatsAggregator aggregator2 = createAggregatorWithMetadata(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, null);
+ TSDBStatsAggregator aggregator2 = createAggregatorWithMetadata(TEST_NAME, MIN_TIMESTAMP, MAX_TIMESTAMP, true, false, null);
InternalAggregation result2 = aggregator2.buildEmptyAggregation();
assertNull("Result should have null metadata", result2.getMetadata());
aggregator2.close();
@@ -125,7 +125,7 @@ public void testLeafCollectorBehavior() throws IOException {
// Test 1: Non-TSDB LeafReader should throw exception
long minTimestamp = 1000L;
long maxTimestamp = 5000L;
- TSDBStatsAggregator aggregator1 = createAggregator("test", minTimestamp, maxTimestamp, true);
+ TSDBStatsAggregator aggregator1 = createAggregator("test", minTimestamp, maxTimestamp, true, false);
Directory directory = new ByteBuffersDirectory();
IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig());
@@ -147,7 +147,7 @@ public void testLeafCollectorBehavior() throws IOException {
// Test 2: Non-overlapping time range (should prune)
long queryMinTimestamp = 1000L;
long queryMaxTimestamp = 5000L;
- TSDBStatsAggregator aggregator2 = createAggregator("test", queryMinTimestamp, queryMaxTimestamp, true);
+ TSDBStatsAggregator aggregator2 = createAggregator("test", queryMinTimestamp, queryMaxTimestamp, true, false);
long leafMinTimestamp = 6000L;
long leafMaxTimestamp = 10000L;
@@ -165,7 +165,7 @@ public void testLeafCollectorBehavior() throws IOException {
aggregator2.close();
// Test 3: Overlapping time range (should return new collector)
- TSDBStatsAggregator aggregator3 = createAggregator("test", queryMinTimestamp, queryMaxTimestamp, true);
+ TSDBStatsAggregator aggregator3 = createAggregator("test", queryMinTimestamp, queryMaxTimestamp, true, false);
long leafMinTimestamp2 = 2000L;
long leafMaxTimestamp2 = 6000L;
@@ -184,7 +184,7 @@ public void testLeafCollectorBehavior() throws IOException {
aggregator3.close();
// Test 4: Exact boundary (no overlap - should prune)
- TSDBStatsAggregator aggregator4 = createAggregator("test", queryMinTimestamp, queryMaxTimestamp, true);
+ TSDBStatsAggregator aggregator4 = createAggregator("test", queryMinTimestamp, queryMaxTimestamp, true, false);
long leafMinTimestamp3 = 0L;
long leafMaxTimestamp3 = 999L;
@@ -208,7 +208,7 @@ public void testCollectWithSingleSeries() throws IOException {
// Arrange
long minTimestamp = 1000L;
long maxTimestamp = 5000L;
- TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true);
+ TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, false);
Map labels = new HashMap<>();
labels.put("service", "api");
@@ -250,7 +250,7 @@ public void testCollectWithDuplicateSeries() throws IOException {
// Arrange: two segments with the same labels → same stableHash → second should be deduped
long minTimestamp = 1000L;
long maxTimestamp = 5000L;
- TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true);
+ TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, false);
// Same labels = same stableHash = dedup
Map sameLabels = Map.of("service", "api");
@@ -296,7 +296,7 @@ public void testBuildAggregationWithValueStats() throws IOException {
// Arrange
long minTimestamp = 1000L;
long maxTimestamp = 5000L;
- TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true);
+ TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, false);
Map labels = Map.of("service", "api", "region", "us-west");
@@ -327,7 +327,7 @@ public void testBuildAggregationWithoutValueStats() throws IOException {
// Arrange
long minTimestamp = 1000L;
long maxTimestamp = 5000L;
- TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, false);
+ TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, false, false);
Map labels = Map.of("service", "api");
@@ -359,7 +359,7 @@ public void testFullCollectionLifecycle() throws IOException {
// Arrange: two different series with different labels
long minTimestamp = 1000L;
long maxTimestamp = 5000L;
- TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true);
+ TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, false);
Map labels1 = Map.of("service", "api", "host", "server1");
Map labels2 = Map.of("service", "web", "host", "server2");
@@ -411,7 +411,7 @@ public void testCollectWithDuplicateLabelValues() throws IOException {
// To get different stableHash (avoid dedup) but share a label value, use different labels overall.
long minTimestamp = 1000L;
long maxTimestamp = 5000L;
- TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true);
+ TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, false);
// Two series that share "service:api" but differ on another label → different stableHash, same ordinal for "service:api"
Map labels1 = Map.of("service", "api", "host", "server1");
@@ -449,6 +449,103 @@ public void testCollectWithDuplicateLabelValues() throws IOException {
}
}
+ // ========== HeadStats Collection Tests ==========
+
+ public void testCollectHeadStatsFromLiveSeriesIndexLeafReader() throws IOException {
+ // Arrange
+ long minTimestamp = 1000L;
+ long maxTimestamp = 5000L;
+ TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, true);
+
+ Map labels = Map.of("service", "api");
+
+ AggregatorTestUtils.TSDBLeafReaderWithContext readerCtx = AggregatorTestUtils.createLiveSeriesIndexLeafReaderWithLabels(
+ minTimestamp,
+ maxTimestamp,
+ 12345L,
+ labels
+ );
+
+ try {
+ // Act
+ LeafBucketCollector collector = aggregator.getLeafCollector(readerCtx.context, LeafBucketCollector.NO_OP_COLLECTOR);
+ collector.collect(0, 0);
+ InternalAggregation result = aggregator.buildAggregation(0);
+
+ // Assert
+ InternalTSDBStats stats = (InternalTSDBStats) result;
+ assertNotNull("Stats should not be null", stats);
+ // HeadStats should be populated since we used LiveSeriesIndexLeafReader with includeHeadStats=true
+ assertNotNull("HeadStats should be populated for LiveSeriesIndexLeafReader", stats.getHeadStats());
+ assertEquals("HeadStats numSeries should match numDocs", 1L, stats.getHeadStats().numSeries());
+ assertEquals("HeadStats minTime should match reader minTimestamp", minTimestamp, stats.getHeadStats().minTime());
+ // LiveSeriesIndexLeafReader sets maxTimestamp to Long.MAX_VALUE (live/open-ended)
+ assertEquals("HeadStats maxTime should match LiveSeriesIndexLeafReader max", Long.MAX_VALUE, stats.getHeadStats().maxTime());
+ } finally {
+ readerCtx.close();
+ aggregator.close();
+ }
+ }
+
+ public void testNoHeadStatsForClosedChunkReader() throws IOException {
+ // Arrange - ClosedChunkIndex reader should NOT populate HeadStats
+ long minTimestamp = 1000L;
+ long maxTimestamp = 5000L;
+ TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, true);
+
+ Map labels = Map.of("service", "api");
+
+ AggregatorTestUtils.TSDBLeafReaderWithContext readerCtx = AggregatorTestUtils.createMockTSDBLeafReaderWithLabels(
+ minTimestamp,
+ maxTimestamp,
+ labels
+ );
+
+ try {
+ LeafBucketCollector collector = aggregator.getLeafCollector(readerCtx.context, LeafBucketCollector.NO_OP_COLLECTOR);
+ collector.collect(0, 0);
+ InternalAggregation result = aggregator.buildAggregation(0);
+
+ InternalTSDBStats stats = (InternalTSDBStats) result;
+ assertNotNull("Stats should not be null", stats);
+ // HeadStats should NOT be populated for non-LiveSeriesIndex readers
+ assertNull("HeadStats should be null for ClosedChunkIndex reader", stats.getHeadStats());
+ } finally {
+ readerCtx.close();
+ aggregator.close();
+ }
+ }
+
+ public void testNoHeadStatsWhenIncludeHeadStatsFalse() throws IOException {
+ // Arrange - Even LiveSeriesIndexLeafReader should NOT populate HeadStats when includeHeadStats=false
+ long minTimestamp = 1000L;
+ long maxTimestamp = 5000L;
+ TSDBStatsAggregator aggregator = createAggregator("test", minTimestamp, maxTimestamp, true, false);
+
+ Map labels = Map.of("service", "api");
+
+ AggregatorTestUtils.TSDBLeafReaderWithContext readerCtx = AggregatorTestUtils.createLiveSeriesIndexLeafReaderWithLabels(
+ minTimestamp,
+ maxTimestamp,
+ 12345L,
+ labels
+ );
+
+ try {
+ LeafBucketCollector collector = aggregator.getLeafCollector(readerCtx.context, LeafBucketCollector.NO_OP_COLLECTOR);
+ collector.collect(0, 0);
+ InternalAggregation result = aggregator.buildAggregation(0);
+
+ InternalTSDBStats stats = (InternalTSDBStats) result;
+ assertNotNull("Stats should not be null", stats);
+ // HeadStats should be null because includeHeadStats=false
+ assertNull("HeadStats should be null when includeHeadStats=false", stats.getHeadStats());
+ } finally {
+ readerCtx.close();
+ aggregator.close();
+ }
+ }
+
// ========== Helper Methods ==========
/**
@@ -468,9 +565,14 @@ private InternalTSDBStats reduceToCoordinator(InternalAggregation shardResult) {
return (InternalTSDBStats) shardResult.reduce(List.of(shardResult), reduceContext);
}
- private TSDBStatsAggregator createAggregator(String name, long minTimestamp, long maxTimestamp, boolean includeValueStats)
- throws IOException {
- return createAggregatorWithMetadata(name, minTimestamp, maxTimestamp, includeValueStats, Map.of());
+ private TSDBStatsAggregator createAggregator(
+ String name,
+ long minTimestamp,
+ long maxTimestamp,
+ boolean includeValueStats,
+ boolean includeHeadStats
+ ) throws IOException {
+ return createAggregatorWithMetadata(name, minTimestamp, maxTimestamp, includeValueStats, includeHeadStats, Map.of());
}
private TSDBStatsAggregator createAggregatorWithMetadata(
@@ -478,6 +580,7 @@ private TSDBStatsAggregator createAggregatorWithMetadata(
long minTimestamp,
long maxTimestamp,
boolean includeValueStats,
+ boolean includeHeadStats,
Map metadata
) throws IOException {
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
@@ -486,6 +589,15 @@ private TSDBStatsAggregator createAggregatorWithMetadata(
SearchContext searchContext = mock(SearchContext.class);
when(searchContext.bigArrays()).thenReturn(bigArrays);
- return new TSDBStatsAggregator(name, searchContext, null, minTimestamp, maxTimestamp, includeValueStats, metadata);
+ return new TSDBStatsAggregator(
+ name,
+ searchContext,
+ null,
+ minTimestamp,
+ maxTimestamp,
+ includeValueStats,
+ includeHeadStats,
+ metadata
+ );
}
}
From 8331508ac9297d4877321e2ef80685e003196f22 Mon Sep 17 00:00:00 2001
From: Wenting Wang
Date: Wed, 4 Mar 2026 15:49:00 -0800
Subject: [PATCH 2/6] fix test coverage
Signed-off-by: Wenting Wang
---
.../integrationTests/TSDBStatsRestIT.java | 4 -
.../query/aggregator/InternalTSDBStats.java | 17 +--
.../TSDBStatsAggregationBuilder.java | 4 +-
.../query/aggregator/TSDBStatsAggregator.java | 4 +-
.../tsdb/query/rest/RestTSDBStatsAction.java | 2 -
.../query/rest/TSDBStatsResponseListener.java | 3 +-
.../aggregator/InternalTSDBStatsTests.java | 143 +++++++++++++++++-
.../TSDBStatsAggregationBuilderTests.java | 9 +-
.../TSDBStatsAggregatorFactoryTests.java | 45 +++++-
.../rest/TSDBStatsResponseListenerTests.java | 15 +-
10 files changed, 189 insertions(+), 57 deletions(-)
diff --git a/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java b/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java
index 9f9116914..dc4d097a1 100644
--- a/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java
+++ b/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java
@@ -57,7 +57,6 @@ public void testBasicEndpoint() throws Exception {
{
"headStats": {
"numSeries": 10,
- "chunkCount": 0,
"minTime": 0,
"maxTime": 9223372036854775807
},
@@ -282,7 +281,6 @@ public void testFormatOptions() throws Exception {
{
"headStats": {
"numSeries": 10,
- "chunkCount": 0,
"minTime": 0,
"maxTime": 9223372036854775807
},
@@ -362,7 +360,6 @@ public void testFormatOptions() throws Exception {
{
"headStats": {
"numSeries": 10,
- "chunkCount": 0,
"minTime": 0,
"maxTime": 9223372036854775807
},
@@ -430,7 +427,6 @@ public void testQueryFiltering() throws Exception {
{
"headStats": {
"numSeries": 10,
- "chunkCount": 0,
"minTime": 0,
"maxTime": 9223372036854775807
},
diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java b/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java
index be4afcbf9..fd50a7442 100644
--- a/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java
+++ b/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java
@@ -61,11 +61,10 @@ public class InternalTSDBStats extends InternalAggregation {
* Statistics for the head (in-memory time series).
*
* @param numSeries the number of active time series in the head
- * @param chunkCount the total number of memory chunks currently held
* @param minTime the minimum sample timestamp present in the head
* @param maxTime the maximum sample timestamp present in the head
*/
- public record HeadStats(long numSeries, long chunkCount, long minTime, long maxTime) {
+ public record HeadStats(long numSeries, long minTime, long maxTime) {
/**
* Deserializes a {@code HeadStats} instance from a stream.
@@ -74,7 +73,7 @@ public record HeadStats(long numSeries, long chunkCount, long minTime, long maxT
* @throws IOException if an I/O error occurs during reading
*/
public HeadStats(StreamInput in) throws IOException {
- this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
+ this(in.readVLong(), in.readLong(), in.readLong());
}
/**
@@ -85,9 +84,8 @@ public HeadStats(StreamInput in) throws IOException {
*/
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(numSeries);
- out.writeVLong(chunkCount);
- out.writeVLong(minTime);
- out.writeVLong(maxTime);
+ out.writeLong(minTime);
+ out.writeLong(maxTime);
}
}
@@ -615,7 +613,7 @@ private static class LabelStatsBuilder {
}
/**
- * Merges HeadStats from multiple aggregations by summing numSeries and chunkCount,
+ * Merges HeadStats from multiple aggregations by summing numSeries,
* taking the minimum minTime, and taking the maximum maxTime.
*
* @param aggregations the list of aggregations containing HeadStats to merge
@@ -623,7 +621,6 @@ private static class LabelStatsBuilder {
*/
static HeadStats mergeHeadStats(List aggregations) {
long totalNumSeries = 0;
- long totalChunkCount = 0;
long minTime = Long.MAX_VALUE;
long maxTime = Long.MIN_VALUE;
boolean hasAny = false;
@@ -633,7 +630,6 @@ static HeadStats mergeHeadStats(List aggregations) {
if (stats.headStats != null) {
hasAny = true;
totalNumSeries += stats.headStats.numSeries();
- totalChunkCount += stats.headStats.chunkCount();
if (stats.headStats.minTime() < minTime) {
minTime = stats.headStats.minTime();
}
@@ -643,7 +639,7 @@ static HeadStats mergeHeadStats(List aggregations) {
}
}
- return hasAny ? new HeadStats(totalNumSeries, totalChunkCount, minTime, maxTime) : null;
+ return hasAny ? new HeadStats(totalNumSeries, minTime, maxTime) : null;
}
/**
@@ -716,7 +712,6 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th
if (headStats != null) {
builder.startObject("headStats");
builder.field("numSeries", headStats.numSeries());
- builder.field("chunkCount", headStats.chunkCount());
builder.field("minTime", headStats.minTime());
builder.field("maxTime", headStats.maxTime());
builder.endObject();
diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java
index ea24f8cc6..42a59838c 100644
--- a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java
+++ b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilder.java
@@ -185,9 +185,7 @@ public static TSDBStatsAggregationBuilder parse(String aggregationName, XContent
);
}
if (includeHeadStats == null) {
- throw new IllegalArgumentException(
- "Required parameter 'include_head_stats' is missing for aggregation '" + aggregationName + "'"
- );
+ includeHeadStats = false;
}
return new TSDBStatsAggregationBuilder(aggregationName, minTimestamp, maxTimestamp, includeValueStats, includeHeadStats);
diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java
index add830de1..5029193e2 100644
--- a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java
+++ b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java
@@ -69,7 +69,6 @@ public class TSDBStatsAggregator extends MetricsAggregator {
// HeadStats accumulator: tracks stats from LiveSeriesIndexLeafReader segments
private long headNumSeries;
- private long headChunkCount;
private long headMinTime;
private long headMaxTime;
private boolean hasHeadStats;
@@ -112,7 +111,6 @@ public TSDBStatsAggregator(
// Initialize HeadStats accumulator
this.headNumSeries = 0;
- this.headChunkCount = 0;
this.headMinTime = Long.MAX_VALUE;
this.headMaxTime = Long.MIN_VALUE;
this.hasHeadStats = false;
@@ -242,7 +240,7 @@ public InternalAggregation buildAggregation(long bucket) throws IOException {
// Build HeadStats if any LiveSeriesIndexLeafReader segments were encountered
InternalTSDBStats.HeadStats headStats = hasHeadStats
- ? new InternalTSDBStats.HeadStats(headNumSeries, headChunkCount, headMinTime, headMaxTime)
+ ? new InternalTSDBStats.HeadStats(headNumSeries, headMinTime, headMaxTime)
: null;
// Return using factory method
diff --git a/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java b/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java
index 366c350aa..2c753bfb7 100644
--- a/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java
+++ b/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java
@@ -95,7 +95,6 @@
* {
* "headStats": {
* "numSeries": 508,
- * "chunkCount": 937,
* "minTime": 1591516800000,
* "maxTime": 1598896800143
* },
@@ -130,7 +129,6 @@
* {
* "headStats": {
* "numSeries": 508,
- * "chunkCount": 937,
* "minTime": 1591516800000,
* "maxTime": 1598896800143
* },
diff --git a/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java b/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java
index 53ea7e112..c90cab74c 100644
--- a/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java
+++ b/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java
@@ -42,7 +42,7 @@ public class TSDBStatsResponseListener extends RestToXContentListener labelStats = createTestLabelStats();
// Act
@@ -273,7 +273,7 @@ public void testForCoordinatorLevelXContent() throws IOException {
// Test all XContent variations in one test
// Variation 1: With all fields
- InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(508L, 937L, 1591516800000L, 1598896800143L);
+ InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(508L, 1591516800000L, 1598896800143L);
Map labelStats = new HashMap<>();
labelStats.put("cluster", new InternalTSDBStats.CoordinatorLevelStats.LabelStats(100L, Map.of("prod", 80L, "staging", 15L)));
@@ -417,7 +417,7 @@ public void testGetPropertyBehavior() {
}
public void testEqualsAndHashCode() {
- InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 200L, 1000L, 2000L);
+ InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 1000L, 2000L);
Map labelStats = createTestLabelStats();
InternalTSDBStats stats1 = InternalTSDBStats.forCoordinatorLevel(
@@ -460,7 +460,7 @@ public void testEqualsAndHashCode() {
stats1,
InternalTSDBStats.forCoordinatorLevel(
TEST_NAME,
- new InternalTSDBStats.HeadStats(999L, 200L, 1000L, 2000L),
+ new InternalTSDBStats.HeadStats(999L, 1000L, 2000L),
new InternalTSDBStats.CoordinatorLevelStats(500L, labelStats),
TEST_METADATA
)
@@ -515,7 +515,7 @@ public void testTopLevelSerializationRoundTrip() throws IOException {
assertNull(shardDeserialized.getHeadStats());
// Scenario 2: Coordinator-level with headStats
- InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 200L, 1000L, 2000L);
+ InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 1000L, 2000L);
InternalTSDBStats coordOriginal = InternalTSDBStats.forCoordinatorLevel(
TEST_NAME,
headStats,
@@ -983,6 +983,137 @@ public void testReduceShardLevelWithEmptySeriesIdsForValue() {
assertEquals(1L, resultLabelStats.get("cluster").valuesStats().get("staging").longValue());
}
+ // ============================================================================================
+ // Section 9: mergeHeadStats — unit tests and HeadStats propagation through reduce
+ // ============================================================================================
+
+ public void testMergeHeadStatsWithMultipleInputs() {
+ // All three aggregations have HeadStats; verify sum, min, and max are taken correctly.
+ InternalTSDBStats agg1 = InternalTSDBStats.forCoordinatorLevel(
+ TEST_NAME,
+ new InternalTSDBStats.HeadStats(100L, 1000L, 3000L),
+ new InternalTSDBStats.CoordinatorLevelStats(100L, new HashMap<>()),
+ TEST_METADATA
+ );
+ InternalTSDBStats agg2 = InternalTSDBStats.forCoordinatorLevel(
+ TEST_NAME,
+ new InternalTSDBStats.HeadStats(200L, 500L, 4000L), // smallest minTime, largest maxTime
+ new InternalTSDBStats.CoordinatorLevelStats(200L, new HashMap<>()),
+ TEST_METADATA
+ );
+ InternalTSDBStats agg3 = InternalTSDBStats.forCoordinatorLevel(
+ TEST_NAME,
+ new InternalTSDBStats.HeadStats(50L, 2000L, 2500L), // middle minTime, smallest maxTime
+ new InternalTSDBStats.CoordinatorLevelStats(50L, new HashMap<>()),
+ TEST_METADATA
+ );
+
+ InternalTSDBStats.HeadStats merged = InternalTSDBStats.mergeHeadStats(List.of(agg1, agg2, agg3));
+
+ assertNotNull("mergeHeadStats should be non-null when any input has HeadStats", merged);
+ assertEquals("numSeries should be sum of all inputs", 350L, merged.numSeries());
+ assertEquals("minTime should be minimum across all inputs", 500L, merged.minTime());
+ assertEquals("maxTime should be maximum across all inputs", 4000L, merged.maxTime());
+ }
+
+ public void testMergeHeadStatsWithMixedNullAndNonNull() {
+ // One aggregation has HeadStats, one does not; only non-null contributes.
+ InternalTSDBStats withHead = InternalTSDBStats.forCoordinatorLevel(
+ TEST_NAME,
+ new InternalTSDBStats.HeadStats(75L, 2000L, 5000L),
+ new InternalTSDBStats.CoordinatorLevelStats(75L, new HashMap<>()),
+ TEST_METADATA
+ );
+ InternalTSDBStats noHead = InternalTSDBStats.forCoordinatorLevel(
+ TEST_NAME,
+ null,
+ new InternalTSDBStats.CoordinatorLevelStats(100L, new HashMap<>()),
+ TEST_METADATA
+ );
+
+ InternalTSDBStats.HeadStats merged = InternalTSDBStats.mergeHeadStats(List.of(withHead, noHead));
+
+ assertNotNull("mergeHeadStats should be non-null when at least one input has HeadStats", merged);
+ assertEquals(75L, merged.numSeries());
+ assertEquals(2000L, merged.minTime());
+ assertEquals(5000L, merged.maxTime());
+ }
+
+ public void testMergeHeadStatsWithAllNull() {
+ InternalTSDBStats agg1 = InternalTSDBStats.forCoordinatorLevel(
+ TEST_NAME,
+ null,
+ new InternalTSDBStats.CoordinatorLevelStats(100L, new HashMap<>()),
+ TEST_METADATA
+ );
+ InternalTSDBStats agg2 = InternalTSDBStats.forCoordinatorLevel(
+ TEST_NAME,
+ null,
+ new InternalTSDBStats.CoordinatorLevelStats(200L, new HashMap<>()),
+ TEST_METADATA
+ );
+
+ InternalTSDBStats.HeadStats merged = InternalTSDBStats.mergeHeadStats(List.of(agg1, agg2));
+
+ assertNull("mergeHeadStats should return null when all inputs have null HeadStats", merged);
+ }
+
+ public void testMergeHeadStatsEmptyList() {
+ InternalTSDBStats.HeadStats merged = InternalTSDBStats.mergeHeadStats(List.of());
+ assertNull("mergeHeadStats should return null for empty list", merged);
+ }
+
+ public void testReduceShardLevelWithHeadStats() {
+ // HeadStats from shard-level aggregations should be merged (sum, min, max) in reduceShardLevel.
+ InternalTSDBStats agg1 = InternalTSDBStats.forShardLevel(
+ TEST_NAME,
+ new InternalTSDBStats.HeadStats(10L, 1000L, 5000L),
+ new InternalTSDBStats.ShardLevelStats(setOf(1L, 2L), new HashMap<>(), true),
+ TEST_METADATA
+ );
+ InternalTSDBStats agg2 = InternalTSDBStats.forShardLevel(
+ TEST_NAME,
+ new InternalTSDBStats.HeadStats(20L, 500L, 6000L),
+ new InternalTSDBStats.ShardLevelStats(setOf(3L, 4L), new HashMap<>(), true),
+ TEST_METADATA
+ );
+
+ InternalAggregation result = agg1.reduce(List.of(agg1, agg2), createPartialReduceContext());
+ InternalTSDBStats reducedStats = (InternalTSDBStats) result;
+
+ InternalTSDBStats.HeadStats headStats = reducedStats.getHeadStats();
+ assertNotNull("HeadStats should be merged from shard-level aggregations", headStats);
+ assertEquals("numSeries should be sum", 30L, headStats.numSeries());
+ assertEquals("minTime should be minimum", 500L, headStats.minTime());
+ assertEquals("maxTime should be maximum", 6000L, headStats.maxTime());
+ }
+
+ public void testReduceCoordinatorLevelWithHeadStats() {
+ // HeadStats should survive the coordinator-level reduce phase.
+ InternalTSDBStats agg1 = InternalTSDBStats.forCoordinatorLevel(
+ TEST_NAME,
+ new InternalTSDBStats.HeadStats(10L, 1000L, 5000L),
+ new InternalTSDBStats.CoordinatorLevelStats(100L, new HashMap<>()),
+ TEST_METADATA
+ );
+ InternalTSDBStats agg2 = InternalTSDBStats.forCoordinatorLevel(
+ TEST_NAME,
+ new InternalTSDBStats.HeadStats(20L, 500L, 6000L),
+ new InternalTSDBStats.CoordinatorLevelStats(200L, new HashMap<>()),
+ TEST_METADATA
+ );
+
+ InternalAggregation result = agg1.reduce(List.of(agg1, agg2), createFinalReduceContext());
+ InternalTSDBStats reducedStats = (InternalTSDBStats) result;
+
+ InternalTSDBStats.HeadStats headStats = reducedStats.getHeadStats();
+ assertNotNull("HeadStats should be preserved through coordinator-level reduce", headStats);
+ assertEquals(30L, headStats.numSeries());
+ assertEquals(500L, headStats.minTime());
+ assertEquals(6000L, headStats.maxTime());
+ assertEquals(300L, reducedStats.getNumSeries().longValue());
+ }
+
// ============================================================================================
// Helper Methods
// ============================================================================================
diff --git a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java
index 6a95d1c7b..eea273ce7 100644
--- a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java
+++ b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregationBuilderTests.java
@@ -191,7 +191,7 @@ public void testXContentParsingMissingRequiredFields() throws IOException {
assertTrue(ex.getMessage().contains("Required parameter 'include_value_stats' is missing"));
}
- // Missing include_head_stats
+ // Missing include_head_stats: should default to false (not throw)
try (
XContentParser parser = createParser(
XContentType.JSON.xContent(),
@@ -199,11 +199,8 @@ public void testXContentParsingMissingRequiredFields() throws IOException {
)
) {
parser.nextToken();
- IllegalArgumentException ex = expectThrows(
- IllegalArgumentException.class,
- () -> TSDBStatsAggregationBuilder.parse(TEST_NAME, parser)
- );
- assertTrue(ex.getMessage().contains("Required parameter 'include_head_stats' is missing"));
+ TSDBStatsAggregationBuilder parsed = TSDBStatsAggregationBuilder.parse(TEST_NAME, parser);
+ assertFalse("include_head_stats should default to false when not provided", parsed.isIncludeHeadStats());
}
}
diff --git a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java
index 860b8652e..210ad582e 100644
--- a/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java
+++ b/src/test/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregatorFactoryTests.java
@@ -33,8 +33,8 @@ public class TSDBStatsAggregatorFactoryTests extends OpenSearchTestCase {
public void testSupportsConcurrentSegmentSearch() {
// Arrange - Test with both true and false for includeValueStats
- TSDBStatsAggregatorFactory factory1 = createFactory("test_css_true", 1000L, 2000L, true);
- TSDBStatsAggregatorFactory factory2 = createFactory("test_css_false", 1000L, 2000L, false);
+ TSDBStatsAggregatorFactory factory1 = createFactory("test_css_true", 1000L, 2000L, true, false);
+ TSDBStatsAggregatorFactory factory2 = createFactory("test_css_false", 1000L, 2000L, false, false);
// Act & Assert - CSS support should be unconditional
assertTrue("TSDBStatsAggregatorFactory should support CSS", factory1.supportsConcurrentSegmentSearch());
@@ -50,7 +50,7 @@ public void testFactoryConfiguration() {
long maxTimestamp = 10000L;
// Act
- TSDBStatsAggregatorFactory factory = createFactory("config_test", minTimestamp, maxTimestamp, true);
+ TSDBStatsAggregatorFactory factory = createFactory("config_test", minTimestamp, maxTimestamp, true, true);
// Assert - Verify factory was created successfully with correct name
assertNotNull("Factory should be created successfully", factory);
@@ -62,7 +62,7 @@ public void testCreateInternal() throws Exception {
long minTimestamp = 1000L;
long maxTimestamp = 5000L;
boolean includeValueStats = true;
- TSDBStatsAggregatorFactory factory = createFactory("test_create", minTimestamp, maxTimestamp, includeValueStats);
+ TSDBStatsAggregatorFactory factory = createFactory("test_create", minTimestamp, maxTimestamp, includeValueStats, false);
// Create mock SearchContext
SearchContext searchContext = mock(SearchContext.class);
@@ -86,7 +86,7 @@ public void testCreateInternal() throws Exception {
public void testCreateInternalWithDifferentParameters() throws Exception {
// Arrange - Test with includeValueStats=false and custom metadata
- TSDBStatsAggregatorFactory factory = createFactory("test_no_values", 2000L, 6000L, false);
+ TSDBStatsAggregatorFactory factory = createFactory("test_no_values", 2000L, 6000L, false, false);
SearchContext searchContext = mock(SearchContext.class);
when(searchContext.bigArrays()).thenReturn(new BigArrays(null, new NoneCircuitBreakerService(), "test"));
@@ -109,7 +109,7 @@ public void testCreateInternalWithDifferentParameters() throws Exception {
public void testCreateInternalWithExtremeTimestamps() throws Exception {
// Arrange - Test edge case with extreme timestamp values
- TSDBStatsAggregatorFactory factory = createFactory("test_extreme", Long.MIN_VALUE, Long.MAX_VALUE, true);
+ TSDBStatsAggregatorFactory factory = createFactory("test_extreme", Long.MIN_VALUE, Long.MAX_VALUE, true, true);
SearchContext searchContext = mock(SearchContext.class);
when(searchContext.bigArrays()).thenReturn(new BigArrays(null, new NoneCircuitBreakerService(), "test"));
@@ -129,11 +129,40 @@ public void testCreateInternalWithExtremeTimestamps() throws Exception {
aggregator.close();
}
+ public void testCreateInternalWithHeadStatsDisabled() throws Exception {
+ // Arrange - explicitly disable head stats collection
+ TSDBStatsAggregatorFactory factory = createFactory("test_no_head", 1000L, 5000L, true, false);
+
+ SearchContext searchContext = mock(SearchContext.class);
+ when(searchContext.bigArrays()).thenReturn(new BigArrays(null, new NoneCircuitBreakerService(), "test"));
+
+ // Act
+ TSDBStatsAggregator aggregator = (TSDBStatsAggregator) factory.createInternal(
+ searchContext,
+ null,
+ CardinalityUpperBound.NONE,
+ Map.of()
+ );
+
+ // Assert
+ assertNotNull("Should create aggregator with head stats disabled", aggregator);
+ assertEquals("test_no_head", aggregator.name());
+
+ // Cleanup
+ aggregator.close();
+ }
+
/**
* Helper method to create TSDBStatsAggregatorFactory with minimal parameters.
* Uses null for complex OpenSearch infrastructure components that aren't needed for basic tests.
*/
- private TSDBStatsAggregatorFactory createFactory(String name, long minTimestamp, long maxTimestamp, boolean includeValueStats) {
+ private TSDBStatsAggregatorFactory createFactory(
+ String name,
+ long minTimestamp,
+ long maxTimestamp,
+ boolean includeValueStats,
+ boolean includeHeadStats
+ ) {
try {
// Create an empty AggregatorFactories.Builder to satisfy the constructor
AggregatorFactories.Builder subFactoriesBuilder = new AggregatorFactories.Builder();
@@ -147,7 +176,7 @@ private TSDBStatsAggregatorFactory createFactory(String name, long minTimestamp,
minTimestamp,
maxTimestamp,
includeValueStats,
- true // includeHeadStats - default to true for existing tests
+ includeHeadStats
);
} catch (Exception e) {
throw new RuntimeException("Failed to create factory for test", e);
diff --git a/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java b/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java
index 98c1ba64c..48e1b1703 100644
--- a/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java
+++ b/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java
@@ -37,16 +37,11 @@ public class TSDBStatsResponseListenerTests extends OpenSearchTestCase {
// ========== Shared Test Fixtures ==========
- /** HeadStats: 508 series, 937 chunks */
- private static final InternalTSDBStats.HeadStats HEAD_STATS = new InternalTSDBStats.HeadStats(
- 508L,
- 937L,
- 1591516800000L,
- 1598896800143L
- );
+ /** HeadStats: 508 series */
+ private static final InternalTSDBStats.HeadStats HEAD_STATS = new InternalTSDBStats.HeadStats(508L, 1591516800000L, 1598896800143L);
/** HeadStats: small numbers for simple tests */
- private static final InternalTSDBStats.HeadStats SIMPLE_HEAD_STATS = new InternalTSDBStats.HeadStats(100L, 200L, 1000L, 2000L);
+ private static final InternalTSDBStats.HeadStats SIMPLE_HEAD_STATS = new InternalTSDBStats.HeadStats(100L, 1000L, 2000L);
/** Full stats: headStats + 2 labels (name, cluster) with multiple values, numSeries=25644 */
private static InternalTSDBStats createFullStats() {
@@ -149,7 +144,6 @@ public void testGroupedFormatWithAllFields() throws IOException {
{
"headStats": {
"numSeries": 508,
- "chunkCount": 937,
"minTime": 1591516800000,
"maxTime": 1598896800143
},
@@ -232,7 +226,6 @@ public void testFlatFormatWithAllFields() throws IOException {
{
"headStats": {
"numSeries": 508,
- "chunkCount": 937,
"minTime": 1591516800000,
"maxTime": 1598896800143
},
@@ -324,7 +317,6 @@ public void testIncludeOnlyHeadStats() throws IOException {
{
"headStats": {
"numSeries": 100,
- "chunkCount": 200,
"minTime": 1000,
"maxTime": 2000
}
@@ -357,7 +349,6 @@ public void testIncludeLabelStatsAndAll() throws IOException {
{
"headStats": {
"numSeries": 100,
- "chunkCount": 200,
"minTime": 1000,
"maxTime": 2000
},
From df14bdbe03937f94e6cec3794bc341654bc57915 Mon Sep 17 00:00:00 2001
From: Wenting Wang
Date: Thu, 5 Mar 2026 15:27:35 -0800
Subject: [PATCH 3/6] add numChunks and fix integrationTest
Signed-off-by: Wenting Wang
---
.../framework/RestTSDBEngineIngestor.java | 18 +++++++++-
.../integrationTests/TSDBStatsRestIT.java | 35 +++++++++++++++----
.../index/live/LiveSeriesIndexLeafReader.java | 19 ++++++++++
.../query/aggregator/InternalTSDBStats.java | 23 +++++-------
.../query/aggregator/TSDBStatsAggregator.java | 12 +++++--
.../query/rest/TSDBStatsResponseListener.java | 2 ++
.../aggregator/InternalTSDBStatsTests.java | 32 +++++++++--------
.../rest/TSDBStatsResponseListenerTests.java | 13 +++++--
8 files changed, 112 insertions(+), 42 deletions(-)
diff --git a/src/javaRestTest/java/org/opensearch/tsdb/framework/RestTSDBEngineIngestor.java b/src/javaRestTest/java/org/opensearch/tsdb/framework/RestTSDBEngineIngestor.java
index 83e8770a9..8fa4a190a 100644
--- a/src/javaRestTest/java/org/opensearch/tsdb/framework/RestTSDBEngineIngestor.java
+++ b/src/javaRestTest/java/org/opensearch/tsdb/framework/RestTSDBEngineIngestor.java
@@ -22,6 +22,9 @@
import org.opensearch.tsdb.utils.TSDBTestUtils;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -61,6 +64,7 @@ public record RestTSDBEngineIngestor(RestClient restClient) {
* Ingest time series data from multiple input data configurations into their respective indices.
* Each InputDataConfig specifies which index its data should go to.
* Uses bulk API with batching for efficient ingestion.
+ * Sorted all time series samples by timestamp for the same index, to avoid ooo event cutoff
*
* @param inputDataConfigs List of input data configurations with their target indices
* @throws IOException If ingestion fails
@@ -70,12 +74,24 @@ public void ingestDataToMultipleIndices(List inputDataConfigs)
return;
}
+ // Collect all samples per index, then sort each index's samples by timestamp.
+ // Sorting ensures that samples across multiple time series are interleaved chronologically
+ // before ingestion. Without this, series are emitted one-at-a-time: a shard may process
+ // series1 up to its maxTime (e.g. 07:00) and then receive series2's first sample (e.g. 00:00),
+ // which falls outside a short OOO cutoff window and gets rejected.
+ Map> samplesByIndex = new HashMap<>();
for (InputDataConfig inputDataConfig : inputDataConfigs) {
if (inputDataConfig.indexName() == null || inputDataConfig.indexName().isEmpty()) {
throw new IllegalArgumentException("InputDataConfig must specify an index_name for data ingestion");
}
List samples = TimeSeriesSampleGenerator.generateSamples(inputDataConfig);
- ingestInBulk(samples, inputDataConfig.indexName(), DEFAULT_BULK_SIZE);
+ samplesByIndex.computeIfAbsent(inputDataConfig.indexName(), k -> new ArrayList<>()).addAll(samples);
+ }
+
+ for (Map.Entry> entry : samplesByIndex.entrySet()) {
+ List samples = entry.getValue();
+ samples.sort(Comparator.comparing(TimeSeriesSample::timestamp));
+ ingestInBulk(samples, entry.getKey(), DEFAULT_BULK_SIZE);
}
}
diff --git a/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java b/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java
index dc4d097a1..9a7a344eb 100644
--- a/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java
+++ b/src/javaRestTest/java/org/opensearch/tsdb/integrationTests/TSDBStatsRestIT.java
@@ -33,6 +33,19 @@ public class TSDBStatsRestIT extends RestTimeSeriesTestFramework {
private static final String TEST_DATA_YAML = "test_cases/tsdb_stats_rest_it.yaml";
private boolean dataLoaded = false;
+ // OVERRIDE max_closeable_chunks_per_chunk_range_percentage = 100, otherwise only 15% of closed chunk will be dropped
+ // This is to test numChunks in HeadStats
+ private static final String CUSTOMIZED_INDEX_SETTINGS_YAML = """
+ index.refresh_interval: "1s"
+ index.tsdb_engine.enabled: true
+ index.tsdb_engine.labels.storage_type: binary
+ index.tsdb_engine.lang.m3.default_step_size: "10s"
+ index.tsdb_engine.max_closeable_chunks_per_chunk_range_percentage: 100
+ index.queries.cache.enabled: false
+ index.requests.cache.enable: false
+ index.translog.durability: async
+ index.translog.sync_interval: "1s"
+ """;
/**
* Lazily initialize test data on first use for each test.
@@ -40,8 +53,11 @@ public class TSDBStatsRestIT extends RestTimeSeriesTestFramework {
*/
private void ensureDataLoaded() throws Exception {
if (!dataLoaded) {
- initializeTest(TEST_DATA_YAML);
+ initializeTest(TEST_DATA_YAML, CUSTOMIZED_INDEX_SETTINGS_YAML);
setupTest();
+ // Force CCI creation and MemChunk drop
+ Request flushRequest = new Request("POST", "/tsdb_stats_test/_flush?force=true");
+ client().performRequest(flushRequest);
dataLoaded = true;
}
}
@@ -53,11 +69,13 @@ private void ensureDataLoaded() throws Exception {
public void testBasicEndpoint() throws Exception {
ensureDataLoaded();
+ // "numChunks": 20, because 1 MemChunk(20m OOO cutoff window) + 1 open MemChunk
String expectedJson = """
{
"headStats": {
"numSeries": 10,
- "minTime": 0,
+ "numChunks": 20,
+ "minTime": 1735713600000,
"maxTime": 9223372036854775807
},
"labelStats": {
@@ -133,7 +151,7 @@ public void testBasicEndpoint() throws Exception {
// Test POST request returns same result
Request postRequest = new Request("POST", "/_tsdb/stats");
postRequest.addParameter("start", "1735689600000");
- postRequest.addParameter("end", "1735707600000");
+ postRequest.addParameter("end", "1735714800000");
postRequest.setJsonEntity("{\"query\": \"fetch name:*\"}");
Response postResponse = client().performRequest(postRequest);
@@ -281,7 +299,8 @@ public void testFormatOptions() throws Exception {
{
"headStats": {
"numSeries": 10,
- "minTime": 0,
+ "numChunks": 20,
+ "minTime": 1735713600000,
"maxTime": 9223372036854775807
},
"labelStats": {
@@ -360,7 +379,8 @@ public void testFormatOptions() throws Exception {
{
"headStats": {
"numSeries": 10,
- "minTime": 0,
+ "numChunks": 20,
+ "minTime": 1735713600000,
"maxTime": 9223372036854775807
},
"seriesCountByMetricName": [
@@ -426,8 +446,9 @@ public void testQueryFiltering() throws Exception {
String expectedJson = """
{
"headStats": {
- "numSeries": 10,
- "minTime": 0,
+ "numSeries": 5,
+ "numChunks": 10,
+ "minTime": 1735713600000,
"maxTime": 9223372036854775807
},
"labelStats": {
diff --git a/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java b/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java
index 4d7148ab7..dded2ca78 100644
--- a/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java
+++ b/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java
@@ -135,6 +135,25 @@ public List chunksForDoc(int docId, TSDBDocValues tsdbDocValues)
return chunkIterators;
}
+ /**
+ * Returns the number of MemChunks(open+OOO_cutoff) for the series corresponding to the given document.
+ *
+ * @param docId the document ID to look up
+ * @param tsdbDocValues the doc values reader for this leaf
+ * @return number of MemChunks for the series, or 0 if the doc has no series reference
+ * @throws IOException if an I/O error occurs
+ */
+ public int numChunksForDoc(int docId, TSDBDocValues tsdbDocValues) throws IOException {
+ NumericDocValues seriesRefValue = tsdbDocValues.getChunkRefDocValues();
+ if (!seriesRefValue.advanceExact(docId)) {
+ return 0;
+ }
+ long seriesRef = seriesRefValue.longValue();
+ int total = memChunkReader.getChunks(seriesRef).size();
+ int mmaped = mMappedChunks.getOrDefault(seriesRef, Collections.emptySet()).size();
+ return total - mmaped;
+ }
+
@Override
public Labels labelsForDoc(int docId, TSDBDocValues tsdbDocValues) throws IOException {
return tsdbDocValues.getLabelsStorage().readLabels(docId);
diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java b/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java
index fd50a7442..37e01a358 100644
--- a/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java
+++ b/src/main/java/org/opensearch/tsdb/query/aggregator/InternalTSDBStats.java
@@ -61,29 +61,19 @@ public class InternalTSDBStats extends InternalAggregation {
* Statistics for the head (in-memory time series).
*
* @param numSeries the number of active time series in the head
+ * @param numChunks total MemChunks across all series (≥ numSeries due to OOO or not-yet-dropped chunks)
* @param minTime the minimum sample timestamp present in the head
* @param maxTime the maximum sample timestamp present in the head
*/
- public record HeadStats(long numSeries, long minTime, long maxTime) {
+ public record HeadStats(long numSeries, long numChunks, long minTime, long maxTime) {
- /**
- * Deserializes a {@code HeadStats} instance from a stream.
- *
- * @param in the stream input to read from
- * @throws IOException if an I/O error occurs during reading
- */
public HeadStats(StreamInput in) throws IOException {
- this(in.readVLong(), in.readLong(), in.readLong());
+ this(in.readVLong(), in.readVLong(), in.readLong(), in.readLong());
}
- /**
- * Serializes this {@code HeadStats} instance to a stream.
- *
- * @param out the stream output to write to
- * @throws IOException if an I/O error occurs during writing
- */
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(numSeries);
+ out.writeVLong(numChunks);
out.writeLong(minTime);
out.writeLong(maxTime);
}
@@ -621,6 +611,7 @@ private static class LabelStatsBuilder {
*/
static HeadStats mergeHeadStats(List aggregations) {
long totalNumSeries = 0;
+ long totalNumChunks = 0;
long minTime = Long.MAX_VALUE;
long maxTime = Long.MIN_VALUE;
boolean hasAny = false;
@@ -630,6 +621,7 @@ static HeadStats mergeHeadStats(List aggregations) {
if (stats.headStats != null) {
hasAny = true;
totalNumSeries += stats.headStats.numSeries();
+ totalNumChunks += stats.headStats.numChunks();
if (stats.headStats.minTime() < minTime) {
minTime = stats.headStats.minTime();
}
@@ -639,7 +631,7 @@ static HeadStats mergeHeadStats(List aggregations) {
}
}
- return hasAny ? new HeadStats(totalNumSeries, minTime, maxTime) : null;
+ return hasAny ? new HeadStats(totalNumSeries, totalNumChunks, minTime, maxTime) : null;
}
/**
@@ -712,6 +704,7 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th
if (headStats != null) {
builder.startObject("headStats");
builder.field("numSeries", headStats.numSeries());
+ builder.field("numChunks", headStats.numChunks());
builder.field("minTime", headStats.minTime());
builder.field("maxTime", headStats.maxTime());
builder.endObject();
diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java
index 5029193e2..38da2b138 100644
--- a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java
+++ b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java
@@ -69,6 +69,7 @@ public class TSDBStatsAggregator extends MetricsAggregator {
// HeadStats accumulator: tracks stats from LiveSeriesIndexLeafReader segments
private long headNumSeries;
+ private long headNumChunks;
private long headMinTime;
private long headMaxTime;
private boolean hasHeadStats;
@@ -111,6 +112,7 @@ public TSDBStatsAggregator(
// Initialize HeadStats accumulator
this.headNumSeries = 0;
+ this.headNumChunks = 0;
this.headMinTime = Long.MAX_VALUE;
this.headMaxTime = Long.MIN_VALUE;
this.hasHeadStats = false;
@@ -131,8 +133,6 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
// Collect HeadStats from LiveSeriesIndexLeafReader segments
if (includeHeadStats && tsdbLeafReader instanceof LiveSeriesIndexLeafReader) {
hasHeadStats = true;
- // LiveSeriesIndex guarantees one doc per series, so numDocs == numSeries
- headNumSeries += ctx.reader().numDocs();
long leafMinTime = tsdbLeafReader.getMinIndexTimestamp();
long leafMaxTime = tsdbLeafReader.getMaxIndexTimestamp();
if (leafMinTime < headMinTime) {
@@ -172,6 +172,12 @@ public void collect(int doc, long bucket) throws IOException {
return;
}
+ // Accumulate per-doc HeadStats for live series segments
+ if (includeHeadStats && tsdbLeafReader instanceof LiveSeriesIndexLeafReader) {
+ headNumSeries++;
+ headNumChunks += ((LiveSeriesIndexLeafReader) tsdbLeafReader).numChunksForDoc(doc, tsdbDocValues);
+ }
+
// TODO process each label using BytesRef directly (avoid String conversion)
BytesRef[] keyValuePairs = labels.toKeyValueBytesRefs();
for (BytesRef keyValue : keyValuePairs) {
@@ -240,7 +246,7 @@ public InternalAggregation buildAggregation(long bucket) throws IOException {
// Build HeadStats if any LiveSeriesIndexLeafReader segments were encountered
InternalTSDBStats.HeadStats headStats = hasHeadStats
- ? new InternalTSDBStats.HeadStats(headNumSeries, headMinTime, headMaxTime)
+ ? new InternalTSDBStats.HeadStats(headNumSeries, headNumChunks, headMinTime, headMaxTime)
: null;
// Return using factory method
diff --git a/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java b/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java
index c90cab74c..2f28a8bfe 100644
--- a/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java
+++ b/src/main/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListener.java
@@ -42,6 +42,7 @@ public class TSDBStatsResponseListener extends RestToXContentListener labelStats = createTestLabelStats();
// Act
@@ -273,7 +273,7 @@ public void testForCoordinatorLevelXContent() throws IOException {
// Test all XContent variations in one test
// Variation 1: With all fields
- InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(508L, 1591516800000L, 1598896800143L);
+ InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(508L, 508L, 1591516800000L, 1598896800143L);
Map labelStats = new HashMap<>();
labelStats.put("cluster", new InternalTSDBStats.CoordinatorLevelStats.LabelStats(100L, Map.of("prod", 80L, "staging", 15L)));
@@ -417,7 +417,7 @@ public void testGetPropertyBehavior() {
}
public void testEqualsAndHashCode() {
- InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 1000L, 2000L);
+ InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 100L, 1000L, 2000L);
Map labelStats = createTestLabelStats();
InternalTSDBStats stats1 = InternalTSDBStats.forCoordinatorLevel(
@@ -460,7 +460,7 @@ public void testEqualsAndHashCode() {
stats1,
InternalTSDBStats.forCoordinatorLevel(
TEST_NAME,
- new InternalTSDBStats.HeadStats(999L, 1000L, 2000L),
+ new InternalTSDBStats.HeadStats(999L, 999L, 1000L, 2000L),
new InternalTSDBStats.CoordinatorLevelStats(500L, labelStats),
TEST_METADATA
)
@@ -515,7 +515,7 @@ public void testTopLevelSerializationRoundTrip() throws IOException {
assertNull(shardDeserialized.getHeadStats());
// Scenario 2: Coordinator-level with headStats
- InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 1000L, 2000L);
+ InternalTSDBStats.HeadStats headStats = new InternalTSDBStats.HeadStats(100L, 100L, 1000L, 2000L);
InternalTSDBStats coordOriginal = InternalTSDBStats.forCoordinatorLevel(
TEST_NAME,
headStats,
@@ -991,19 +991,19 @@ public void testMergeHeadStatsWithMultipleInputs() {
// All three aggregations have HeadStats; verify sum, min, and max are taken correctly.
InternalTSDBStats agg1 = InternalTSDBStats.forCoordinatorLevel(
TEST_NAME,
- new InternalTSDBStats.HeadStats(100L, 1000L, 3000L),
+ new InternalTSDBStats.HeadStats(100L, 50L, 1000L, 3000L),
new InternalTSDBStats.CoordinatorLevelStats(100L, new HashMap<>()),
TEST_METADATA
);
InternalTSDBStats agg2 = InternalTSDBStats.forCoordinatorLevel(
TEST_NAME,
- new InternalTSDBStats.HeadStats(200L, 500L, 4000L), // smallest minTime, largest maxTime
+ new InternalTSDBStats.HeadStats(200L, 100L, 500L, 4000L), // smallest minTime, largest maxTime
new InternalTSDBStats.CoordinatorLevelStats(200L, new HashMap<>()),
TEST_METADATA
);
InternalTSDBStats agg3 = InternalTSDBStats.forCoordinatorLevel(
TEST_NAME,
- new InternalTSDBStats.HeadStats(50L, 2000L, 2500L), // middle minTime, smallest maxTime
+ new InternalTSDBStats.HeadStats(50L, 25L, 2000L, 2500L), // middle minTime, smallest maxTime
new InternalTSDBStats.CoordinatorLevelStats(50L, new HashMap<>()),
TEST_METADATA
);
@@ -1012,6 +1012,7 @@ public void testMergeHeadStatsWithMultipleInputs() {
assertNotNull("mergeHeadStats should be non-null when any input has HeadStats", merged);
assertEquals("numSeries should be sum of all inputs", 350L, merged.numSeries());
+ assertEquals("numChunks should be sum of all inputs", 175L, merged.numChunks());
assertEquals("minTime should be minimum across all inputs", 500L, merged.minTime());
assertEquals("maxTime should be maximum across all inputs", 4000L, merged.maxTime());
}
@@ -1020,7 +1021,7 @@ public void testMergeHeadStatsWithMixedNullAndNonNull() {
// One aggregation has HeadStats, one does not; only non-null contributes.
InternalTSDBStats withHead = InternalTSDBStats.forCoordinatorLevel(
TEST_NAME,
- new InternalTSDBStats.HeadStats(75L, 2000L, 5000L),
+ new InternalTSDBStats.HeadStats(75L, 40L, 2000L, 5000L),
new InternalTSDBStats.CoordinatorLevelStats(75L, new HashMap<>()),
TEST_METADATA
);
@@ -1035,6 +1036,7 @@ public void testMergeHeadStatsWithMixedNullAndNonNull() {
assertNotNull("mergeHeadStats should be non-null when at least one input has HeadStats", merged);
assertEquals(75L, merged.numSeries());
+ assertEquals(40L, merged.numChunks());
assertEquals(2000L, merged.minTime());
assertEquals(5000L, merged.maxTime());
}
@@ -1067,13 +1069,13 @@ public void testReduceShardLevelWithHeadStats() {
// HeadStats from shard-level aggregations should be merged (sum, min, max) in reduceShardLevel.
InternalTSDBStats agg1 = InternalTSDBStats.forShardLevel(
TEST_NAME,
- new InternalTSDBStats.HeadStats(10L, 1000L, 5000L),
+ new InternalTSDBStats.HeadStats(10L, 10L, 1000L, 5000L),
new InternalTSDBStats.ShardLevelStats(setOf(1L, 2L), new HashMap<>(), true),
TEST_METADATA
);
InternalTSDBStats agg2 = InternalTSDBStats.forShardLevel(
TEST_NAME,
- new InternalTSDBStats.HeadStats(20L, 500L, 6000L),
+ new InternalTSDBStats.HeadStats(20L, 20L, 500L, 6000L),
new InternalTSDBStats.ShardLevelStats(setOf(3L, 4L), new HashMap<>(), true),
TEST_METADATA
);
@@ -1084,6 +1086,7 @@ public void testReduceShardLevelWithHeadStats() {
InternalTSDBStats.HeadStats headStats = reducedStats.getHeadStats();
assertNotNull("HeadStats should be merged from shard-level aggregations", headStats);
assertEquals("numSeries should be sum", 30L, headStats.numSeries());
+ assertEquals("numChunks should be sum", 30L, headStats.numChunks());
assertEquals("minTime should be minimum", 500L, headStats.minTime());
assertEquals("maxTime should be maximum", 6000L, headStats.maxTime());
}
@@ -1092,13 +1095,13 @@ public void testReduceCoordinatorLevelWithHeadStats() {
// HeadStats should survive the coordinator-level reduce phase.
InternalTSDBStats agg1 = InternalTSDBStats.forCoordinatorLevel(
TEST_NAME,
- new InternalTSDBStats.HeadStats(10L, 1000L, 5000L),
+ new InternalTSDBStats.HeadStats(10L, 10L, 1000L, 5000L),
new InternalTSDBStats.CoordinatorLevelStats(100L, new HashMap<>()),
TEST_METADATA
);
InternalTSDBStats agg2 = InternalTSDBStats.forCoordinatorLevel(
TEST_NAME,
- new InternalTSDBStats.HeadStats(20L, 500L, 6000L),
+ new InternalTSDBStats.HeadStats(20L, 20L, 500L, 6000L),
new InternalTSDBStats.CoordinatorLevelStats(200L, new HashMap<>()),
TEST_METADATA
);
@@ -1109,6 +1112,7 @@ public void testReduceCoordinatorLevelWithHeadStats() {
InternalTSDBStats.HeadStats headStats = reducedStats.getHeadStats();
assertNotNull("HeadStats should be preserved through coordinator-level reduce", headStats);
assertEquals(30L, headStats.numSeries());
+ assertEquals(30L, headStats.numChunks());
assertEquals(500L, headStats.minTime());
assertEquals(6000L, headStats.maxTime());
assertEquals(300L, reducedStats.getNumSeries().longValue());
diff --git a/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java b/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java
index 48e1b1703..f8ce57b42 100644
--- a/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java
+++ b/src/test/java/org/opensearch/tsdb/query/rest/TSDBStatsResponseListenerTests.java
@@ -38,10 +38,15 @@ public class TSDBStatsResponseListenerTests extends OpenSearchTestCase {
// ========== Shared Test Fixtures ==========
/** HeadStats: 508 series */
- private static final InternalTSDBStats.HeadStats HEAD_STATS = new InternalTSDBStats.HeadStats(508L, 1591516800000L, 1598896800143L);
+ private static final InternalTSDBStats.HeadStats HEAD_STATS = new InternalTSDBStats.HeadStats(
+ 508L,
+ 508L,
+ 1591516800000L,
+ 1598896800143L
+ );
/** HeadStats: small numbers for simple tests */
- private static final InternalTSDBStats.HeadStats SIMPLE_HEAD_STATS = new InternalTSDBStats.HeadStats(100L, 1000L, 2000L);
+ private static final InternalTSDBStats.HeadStats SIMPLE_HEAD_STATS = new InternalTSDBStats.HeadStats(100L, 100L, 1000L, 2000L);
/** Full stats: headStats + 2 labels (name, cluster) with multiple values, numSeries=25644 */
private static InternalTSDBStats createFullStats() {
@@ -144,6 +149,7 @@ public void testGroupedFormatWithAllFields() throws IOException {
{
"headStats": {
"numSeries": 508,
+ "numChunks": 508,
"minTime": 1591516800000,
"maxTime": 1598896800143
},
@@ -226,6 +232,7 @@ public void testFlatFormatWithAllFields() throws IOException {
{
"headStats": {
"numSeries": 508,
+ "numChunks": 508,
"minTime": 1591516800000,
"maxTime": 1598896800143
},
@@ -317,6 +324,7 @@ public void testIncludeOnlyHeadStats() throws IOException {
{
"headStats": {
"numSeries": 100,
+ "numChunks": 100,
"minTime": 1000,
"maxTime": 2000
}
@@ -349,6 +357,7 @@ public void testIncludeLabelStatsAndAll() throws IOException {
{
"headStats": {
"numSeries": 100,
+ "numChunks": 100,
"minTime": 1000,
"maxTime": 2000
},
From 8170d0ea7ebd6f63efd13d9b595ab1b3ed3b551f Mon Sep 17 00:00:00 2001
From: Wenting Wang
Date: Thu, 5 Mar 2026 15:52:28 -0800
Subject: [PATCH 4/6] triggers test again
Signed-off-by: Wenting Wang
From 5bea82aada2e019bc34def5575f8d9f52f71eb8b Mon Sep 17 00:00:00 2001
From: Wenting Wang
Date: Thu, 5 Mar 2026 16:07:37 -0800
Subject: [PATCH 5/6] refactor and add unit test
Signed-off-by: Wenting Wang
---
.../index/live/LiveSeriesIndexLeafReader.java | 12 +++--
.../live/LiveSeriesIndexLeafReaderTests.java | 44 +++++++++++++++++++
2 files changed, 53 insertions(+), 3 deletions(-)
diff --git a/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java b/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java
index dded2ca78..e5bc6bc91 100644
--- a/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java
+++ b/src/main/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReader.java
@@ -149,9 +149,15 @@ public int numChunksForDoc(int docId, TSDBDocValues tsdbDocValues) throws IOExce
return 0;
}
long seriesRef = seriesRefValue.longValue();
- int total = memChunkReader.getChunks(seriesRef).size();
- int mmaped = mMappedChunks.getOrDefault(seriesRef, Collections.emptySet()).size();
- return total - mmaped;
+ int numChunks = 0;
+ Set chunksToFilter = mMappedChunks.getOrDefault(seriesRef, Collections.emptySet());
+ List memChunks = memChunkReader.getChunks(seriesRef); // get all memchunks for the series
+ for (MemChunk memChunk : memChunks) {
+ if (!chunksToFilter.contains(memChunk)) {
+ numChunks++;
+ }
+ }
+ return numChunks;
}
@Override
diff --git a/src/test/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReaderTests.java b/src/test/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReaderTests.java
index 85eb252f5..664a98b75 100644
--- a/src/test/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReaderTests.java
+++ b/src/test/java/org/opensearch/tsdb/core/index/live/LiveSeriesIndexLeafReaderTests.java
@@ -622,6 +622,50 @@ public void testMMappedChunksFilteringEdgeCases() throws IOException {
}
}
+ public void testNumChunksForDocWithMMappedChunks() throws IOException {
+ // 3 chunks total, 2 mMapped → should count only the 1 remaining in-memory chunk
+ MemChunk chunk1 = new MemChunk(1L, 1000L, 2000L, null, org.opensearch.tsdb.core.chunk.Encoding.XOR);
+ MemChunk chunk2 = new MemChunk(2L, 2000L, 3000L, null, org.opensearch.tsdb.core.chunk.Encoding.XOR);
+ MemChunk chunk3 = new MemChunk(3L, 3000L, 4000L, null, org.opensearch.tsdb.core.chunk.Encoding.XOR);
+ referenceToChunkMap.put(810L, List.of(chunk1, chunk2, chunk3));
+ mMappedChunks.put(810L, java.util.Set.of(chunk1, chunk3)); // chunk1 and chunk3 are mMapped
+
+ createTestDocument(810L, "partial_mmapped_metric", "server2", "region2");
+ indexWriter.commit();
+
+ try (DirectoryReader reader = DirectoryReader.open(directory)) {
+ LiveSeriesIndexLeafReader leafReader = new LiveSeriesIndexLeafReader(
+ reader.leaves().getFirst().reader(),
+ memChunkReader,
+ mMappedChunks,
+ LabelStorageType.BINARY
+ );
+ TSDBDocValues tsdbDocValues = leafReader.getTSDBDocValues();
+
+ assertEquals("Should count only chunk2 (the one not mMapped)", 1, leafReader.numChunksForDoc(0, tsdbDocValues));
+ }
+ }
+
+ public void testNumChunksForDocNoChunks() throws IOException {
+ // Series reference exists but has no chunks (empty list from memChunkReader) → should return 0
+ referenceToChunkMap.put(830L, List.of());
+
+ createTestDocument(830L, "no_chunks_metric", "server4", "region4");
+ indexWriter.commit();
+
+ try (DirectoryReader reader = DirectoryReader.open(directory)) {
+ LiveSeriesIndexLeafReader leafReader = new LiveSeriesIndexLeafReader(
+ reader.leaves().getFirst().reader(),
+ memChunkReader,
+ mMappedChunks,
+ LabelStorageType.BINARY
+ );
+ TSDBDocValues tsdbDocValues = leafReader.getTSDBDocValues();
+
+ assertEquals("Should return 0 when series has no chunks", 0, leafReader.numChunksForDoc(0, tsdbDocValues));
+ }
+ }
+
private void createTestDocument(long reference, String metricName, String host, String region) throws IOException {
Document doc = new Document();
doc.add(new NumericDocValuesField(REFERENCE, reference));
From 43e06d4a7581eddfd1c3ce3aa0b0e720850aaa13 Mon Sep 17 00:00:00 2001
From: Wenting Wang
Date: Wed, 11 Mar 2026 09:51:57 -0700
Subject: [PATCH 6/6] address comments
Signed-off-by: Wenting Wang
---
.../tsdb/query/aggregator/TSDBStatsAggregator.java | 11 +++++------
.../tsdb/query/rest/RestTSDBStatsAction.java | 1 +
2 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java
index 38da2b138..22ef335fa 100644
--- a/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java
+++ b/src/main/java/org/opensearch/tsdb/query/aggregator/TSDBStatsAggregator.java
@@ -160,6 +160,11 @@ public TSDBStatsLeafBucketCollector(LeafReaderContext ctx, TSDBLeafReader tsdbLe
@Override
public void collect(int doc, long bucket) throws IOException {
+ // Accumulate per-doc HeadStats for live series segments
+ if (includeHeadStats && tsdbLeafReader instanceof LiveSeriesIndexLeafReader) {
+ headNumSeries++;
+ headNumChunks += ((LiveSeriesIndexLeafReader) tsdbLeafReader).numChunksForDoc(doc, tsdbDocValues);
+ }
Labels labels = tsdbLeafReader.labelsForDoc(doc, tsdbDocValues);
// We assume labels hash is the seriesId
// This need to be changed when we start accepting reference/seriesId from indexing
@@ -172,12 +177,6 @@ public void collect(int doc, long bucket) throws IOException {
return;
}
- // Accumulate per-doc HeadStats for live series segments
- if (includeHeadStats && tsdbLeafReader instanceof LiveSeriesIndexLeafReader) {
- headNumSeries++;
- headNumChunks += ((LiveSeriesIndexLeafReader) tsdbLeafReader).numChunksForDoc(doc, tsdbDocValues);
- }
-
// TODO process each label using BytesRef directly (avoid String conversion)
BytesRef[] keyValuePairs = labels.toKeyValueBytesRefs();
for (BytesRef keyValue : keyValuePairs) {
diff --git a/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java b/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java
index 2c753bfb7..f961076b2 100644
--- a/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java
+++ b/src/main/java/org/opensearch/tsdb/query/rest/RestTSDBStatsAction.java
@@ -95,6 +95,7 @@
* {
* "headStats": {
* "numSeries": 508,
+ * "numChunks": 937,
* "minTime": 1591516800000,
* "maxTime": 1598896800143
* },