Populate HeadStats in _tsdb/stats endpoint#70
Populate HeadStats in _tsdb/stats endpoint#70wentingwang wants to merge 6 commits intoopensearch-project:mainfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #70 +/- ##
============================================
+ Coverage 89.09% 89.11% +0.01%
- Complexity 5009 5026 +17
============================================
Files 321 321
Lines 15421 15486 +65
Branches 2328 2345 +17
============================================
+ Hits 13740 13800 +60
- Misses 1014 1015 +1
- Partials 667 671 +4
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
| @Override | ||
| public InternalAggregation buildEmptyAggregation() { | ||
| // Return empty coordinator-level stats | ||
| InternalTSDBStats.CoordinatorLevelStats emptyStats = new InternalTSDBStats.CoordinatorLevelStats(null, Map.of()); | ||
| return InternalTSDBStats.forCoordinatorLevel(name, null, emptyStats, metadata()); | ||
| } |
There was a problem hiding this comment.
shouldn't this return shardLevel stats?
what happens when there's no data on a shard? or is that no possible?
buildEmptyAggregation() returns coordinator-level stats, but shard reduce rejects them
TSDBStatsAggregator.java:236-240 — buildEmptyAggregation() returns CoordinatorLevelStats: public InternalAggregation buildEmptyAggregation() { InternalTSDBStats.CoordinatorLevelStats emptyStats = ...; return InternalTSDBStats.forCoordinatorLevel(name, null, emptyStats, metadata()); }But InternalTSDBStats.java:413-414 — the shard-level reduce throws when it encounters coordinator-level stats:
if (stats.shardStats == null) { throw new IllegalStateException("Expected shard-level stats but got coordinator-level stats in shard reduce"); }OpenSearch calls buildEmptyAggregation() for shards with zero matching documents. In a multi-shard deployment where some shards have data and others don't, the shard-level reduce will mix buildAggregation() results (shard-level) with buildEmptyAggregation() results (coordinator-level) and crash with
IllegalStateException.
if TSDBStatsAggregator is used as a sub-aggregation under a bucket aggregator (e.g., a filters or terms agg), then any bucket with zero docs will call
buildEmptyAggregation(), producing coordinator-level stats. Those get mixed with shard-level stats from non-empty buckets during reduceShardLevel(), triggering the crash.
Is this possible in TSDB? or will this never happen?
There was a problem hiding this comment.
what happens when there's no data on a shard? or is that no possible?
It still go through buildAggregation which will build an empty ShardLevelStats. TSDBStatsAggregator is a top-level aggregation for now, so in practice we never call buildEmptyAggregation(), but we can changed to shard level stats.
Also fixed the reduce(), so it wont relay on isfinalReduce for the partialReduce case.
There was a problem hiding this comment.
all the changes are fixed in #57. so it will be easier to review this PR later
| if (seriesFingerprintSet != null) { | ||
| out.writeBoolean(true); | ||
| out.writeVLong(numSeries); | ||
| out.writeCollection(seriesFingerprintSet, StreamOutput::writeVLong); |
There was a problem hiding this comment.
writeVLong used for fingerprint serialization — crashes on negative values
InternalTSDBStats.java:128,146 — ShardLevelStats serializes fingerprint sets using writeVLong/readVLong:
out.writeCollection(seriesFingerprintSet, StreamOutput::writeVLong); // line 128
out.writeCollection(fingerprintSet, StreamOutput::writeVLong); // line 146writeVLong throws IllegalStateException for negative values. The fingerprints come from TSDBStatsAggregator.java:148:
long seriesId = seriesIdDocValues.longValue();
This reads from LABELS_HASH (a hash field) whose values can absolutely be negative. Any negative hash will crash serialization during shard-to-coordinator transport.
Fix: Use writeLong/readLong (or writeZLong/readZLong for variable-length encoding that supports negatives) instead of writeVLong/readVLong for fingerprint sets.
please double check if this is a real issue, i do remember stable hash will produce negative values
There was a problem hiding this comment.
good catch, changed it to ZLong. but interestingly when I tested on ts-poc even for 1.6M query, it didn't have exceptions.
| public record ShardLevelStats(Set<Long> seriesFingerprintSet, Map<String, Map<String, Set<Long>>> labelStats, | ||
| boolean includeValueStats) { |
There was a problem hiding this comment.
can you document what each of the fields are with an example
| (valueOutput, fingerprintSet) -> { // Value writer: Set<Long> or null | ||
| if (fingerprintSet != null) { | ||
| valueOutput.writeBoolean(true); | ||
| valueOutput.writeCollection(fingerprintSet, StreamOutput::writeVLong); |
There was a problem hiding this comment.
what's the difference between this fingerprintSet and seriesFingerprintSet above?
There was a problem hiding this comment.
seriesFingerprintSet is all the fingerprints we've seen so far (give the total num series for "fetch" statement), fingerprintSet is the fingerprintSet per label value (give the numSeries per label value)
| // Sanity check: exactly one of shardStats or coordinatorStats must be non-null | ||
| if ((shardStats == null) == (coordinatorStats == null)) { | ||
| throw new IllegalArgumentException("Exactly one of shardStats or coordinatorStats must be non-null"); | ||
| } |
There was a problem hiding this comment.
this feels more like something for assert than illegal argument?
this is not something controlled by the caller right? if this happens this is a bug in our code?
There was a problem hiding this comment.
yes, if it happens, it is a bug. fixed
| long seriesId = seriesIdDocValues.longValue(); | ||
|
|
||
| // Already processed this series - skip entire document | ||
| if (!seenSeriesIdentifiers.add(seriesId)) { |
There was a problem hiding this comment.
iirc LSI.reference and labels_hash are not the same value, so this check would not be able to detect that the same series was already accounted for.
Does this affect the correctness/count values (e.g. result in double counting)? or it would just be less optimal?
There was a problem hiding this comment.
i think this can potentially affect correctness. e.g. if labels hash is between 0-1,000,000 (which will be in the series Ref range which is just the count of number of series in LSI), this will incorrectly skip processing. vice versa.
There was a problem hiding this comment.
no, they are the same. https://github.com/opensearch-project/time-series-db/blob/main/src/main/java/org/opensearch/index/engine/TSDBEngine.java#L432. It is just later we save it in translog and called seriesReference. We should use save name to store in LSI and CCI.
There was a problem hiding this comment.
ok i see, can you add a comment to the constructor where the this.seriesIdDocValues are initialized, and mention we assume LSI.reference == CCI.labels_hash and it's required for this to function properly.
And lets add a test case to HeadTests similar to HeadTests#testHeadLifecycle, but instead assert that if you scan through all the docs across LSI and CCI to get the seriesId, they are all equal (since that test only has 1 series). You can mention in the test the stats aggregator requires this behavior in the javadoc/comments.
There was a problem hiding this comment.
offline synced with @itschrispeck, we'd still need decode labels from cci and lsi to always fully dedup across both. changing it back to decoding labels instead of fetch reference/labels_hash
| try { | ||
| labelValuePairOrdinalMap.close(); | ||
| } catch (Exception e) { | ||
| // Log and continue - BytesRefHash may fail on multiple close calls |
There was a problem hiding this comment.
says log, but doesn't log
There was a problem hiding this comment.
already fixed in the upstream, will rebase.
| new CachedWildcardQueryBuilder(org.opensearch.tsdb.core.mapping.Constants.IndexSchema.LABELS, labelFilter) | ||
| ); | ||
| } else { | ||
| // Exact term query on labels field | ||
| labelQuery.should( | ||
| QueryBuilders.termQuery(org.opensearch.tsdb.core.mapping.Constants.IndexSchema.LABELS, labelFilter) |
| } | ||
|
|
||
| // ========== Grouped Format Tests ========== | ||
| // ========== Grouped Format Tests (Combined: 3 → 2) ========== |
There was a problem hiding this comment.
what does (Combined: 3 → 2) mean?
There was a problem hiding this comment.
at one phase, claude generated much more unit test and I asked it to consolidate them, probably from that.
|
|
||
| index_configs: | ||
| - name: "tsdb_stats_test" | ||
| shards: 1 |
There was a problem hiding this comment.
tests should have multiple shards to verify coordinator reduce
| time_config: | ||
| min_timestamp: "2025-01-01T00:00:00Z" | ||
| max_timestamp: "2025-01-01T01:00:00Z" | ||
| step: "5m" |
There was a problem hiding this comment.
this will only test LSI/Head block without CCI right?
we should use sparser time stamps to ensure that behavior is also tested, eg > 6 hour range to test >2+ blocks
There was a problem hiding this comment.
fixed in the upstream diff
|
other's i'll leave here, you can take a look and see if these are actual issues, if not, you can ignore them |
What it said is true. This works for TSDBStatsResponseListener but not for other caller. will fixed later.
This is intensional. But maybe because include=labelStats the naming is confusing. Original I wanted a mode that we only returns label values per key to cover /search m3 endpoint use case. renamed the params to be include=labelValues. |
a12c508 to
b6b8342
Compare
| // Accumulate per-doc HeadStats for live series segments | ||
| if (includeHeadStats && tsdbLeafReader instanceof LiveSeriesIndexLeafReader) { | ||
| headNumSeries++; | ||
| headNumChunks += ((LiveSeriesIndexLeafReader) tsdbLeafReader).numChunksForDoc(doc, tsdbDocValues); |
There was a problem hiding this comment.
HeadStats accumulation sits after the seenSeriesIds dedup check. If a series exists in both a ClosedChunkIndex segment and a LiveSeriesIndex segment (same stableHash), and the closed chunk doc is processed first, the live doc gets deduped, and HeadStats skips it — undercounting headNumSeries/headNumChunks.
We can move this block before the dedup check, since HeadStats should count all series in the head independently of label-stats dedup.
There was a problem hiding this comment.
make sense, fixed
| * "headStats": { | ||
| * "numSeries": 508, | ||
| * "chunkCount": 937, | ||
| * "minTime": 1591516800000, |
There was a problem hiding this comment.
chunkCount was removed from this javadoc example, but numChunks wasn't added to replace it. Same with the flat-format example below.
Collect HeadStats from LiveSeriesIndexLeafReader in TSDBStatsAggregator, merge across shards by summing numSeries/chunkCount and taking min/max of time bounds, and include in the endpoint response. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Wenting Wang <wenting.wang@uber.com> rebase for reviewing stacked diff, unit tests failed, will fix in the next commit Signed-off-by: Wenting Wang <wenting.wang@uber.com> fix unit test due to rebase Signed-off-by: Wenting Wang <wenting.wang@uber.com> add includeHeadStats in TSDBStatsAggregator Signed-off-by: Wenting Wang <wenting.wang@uber.com>
Signed-off-by: Wenting Wang <wenting.wang@uber.com>
Signed-off-by: Wenting Wang <wenting.wang@uber.com>
Signed-off-by: Wenting Wang <wenting.wang@uber.com>
Signed-off-by: Wenting Wang <wenting.wang@uber.com>
Signed-off-by: Wenting Wang <wenting.wang@uber.com>
43aa46f to
43e06d4
Compare
Summary
Collect HeadStats from
LiveSeriesIndexLeafReaderinTSDBStatsAggregator(numSeries, numChunks, minTime, maxTime)Add numChunksForDoc in LiveSeriesIndexLeafReader to read numChunks in Head
Merge HeadStats across shards in
InternalTSDBStats.reduce()by summing numSeries/chunkCount and taking min/max of time boundsUpdated integration tests to include HeadStats in expected responses
Change integration to insert data based on timestamp to avoid OOO cutoff
Test plan
./gradlew checkpasses./gradlew javaRestTestpasses🤖 Generated with Claude Code