Implement TimeSeriesStreamingAggregator for M3QL fetch+aggregation op…#43
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #43 +/- ##
============================================
- Coverage 89.15% 89.13% -0.02%
- Complexity 5039 5181 +142
============================================
Files 322 330 +8
Lines 15532 16076 +544
Branches 2338 2430 +92
============================================
+ Hits 13848 14330 +482
- Misses 1014 1045 +31
- Partials 670 701 +31
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
835ae02 to
ca32588
Compare
| if (!stageStack.isEmpty()) { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
This means there's no stage after the current stage? e.g.
fetch | a | b | c
should have a stack of
c -> b
when reaching a?
There was a problem hiding this comment.
Yes you are right. I use the wrong planning logic. Let me update the planning logic.
| private static class NoTagStreamingState implements StreamingAggregationState { | ||
| private final StreamingAggregationType aggregationType; | ||
| private final double[] values; | ||
| private final long[] counts; // Only used for AVG |
There was a problem hiding this comment.
I think int[] counts is good enough? (the upper bound should be number of timeseries we retrieved?)
There was a problem hiding this comment.
yes. int[] should be good enough.
| boolean pushdown = resolvePushdownParam(request, true); | ||
| boolean profile = request.paramAsBoolean(PROFILE_PARAM, false); | ||
| boolean includeMetadata = request.paramAsBoolean(INCLUDE_METADATA_PARAM, false); | ||
| boolean streaming = request.paramAsBoolean(STREAMING_PARAM, false); |
There was a problem hiding this comment.
Are we plan to enable it by default in the future? (Seems no downside?)
There was a problem hiding this comment.
The downside is the data duplication and thus lead to double count. Duplication might happen between different segments from same shard or duplication between partitions (upstream double write during metrics migration)
There was a problem hiding this comment.
could you elaborate on how this works with stitching? iiuc we can use streaming for pushdown-able parts but have to fallback for the stitched section in the middle?
There was a problem hiding this comment.
Right now, then the stitching is enabled, this new feature will not be enabled. That can be a future optimization to enable this during the stitching scenario
| } | ||
|
|
||
| // removeEmpty right after fetch is a no-op — strip it to enable streaming optimization | ||
| if (!unfoldStages.isEmpty() && unfoldStages.get(0) instanceof RemoveEmptyStage) { |
There was a problem hiding this comment.
For corner cases where someone write fetch | removeEmpty | removeEmpty |..., let's make this a while loop?
There was a problem hiding this comment.
That's a good callout. I do not want to make one PR too long. I think this is good enough for most of the cases. But let me add a while loop.
There was a problem hiding this comment.
can you add a TODO here? imo this logic belongs in an optimizer layer and shouldn't be part of the SourceBuilderVisitor - we should be removing extraneous functions and no-ops prior to trying to build the SourceBuilder
| * List<TimeSeries> results = state.getFinalResults(minTimestamp, maxTimestamp, step); | ||
| * }</pre> | ||
| */ | ||
| public interface StreamingAggregationState { |
There was a problem hiding this comment.
Should we eventually make the min/max/sum/avg stages uses the same implementation as this one?
There was a problem hiding this comment.
I think we can rewrite it in a more efficient way. Do not want to make this PR too complicated.
| * Check if unfoldStages consists of exactly one supported streaming aggregation stage. | ||
| */ | ||
| private boolean isStreamingEligibleStages(List<UnaryPipelineStage> unfoldStages) { | ||
| if (unfoldStages.size() != 1) { |
There was a problem hiding this comment.
This is due to all those stages should be a global stage?
I wonder if it's something like fetch | a | b | c | sum we probably is still able to apply this optimization theoretically? Since a | b | c being able to be handled in data node means it does not care about the order the samples are being processed?
There was a problem hiding this comment.
- You cannot always optimize this. For example if
ais a moving function, or derivate, we have to reconstruct the whole series. and keep all the tags in memory. There might be some advanced approach to optimize that. But I think we should start from the most basic ones. And also, based on chat with Muttley team, optimizingfetch | sumis good enough. - There are optimization chance, for example
fetch | moving sum | sum, we can change the order, and the result should be the same. We can apply this rule on planning layer, and try to push aggregation stage as early as possible. This can also reduce the pressure of moving - If all functions before aggregation are operating on sample layer, we can apply transformations in collect() function, and directly do aggregation. E.g.,
fetch | abs | sum. This might need some code refactoring, and we can evaluate the frequency of those queries later
| /** | ||
| * Create streaming state for a bucket based on aggregation configuration. | ||
| */ | ||
| private StreamingAggregationState createStreamingState() { |
There was a problem hiding this comment.
nit: Can we move Streaming state related code to another file or to the interface's file? This one feels a bit longer.
| } | ||
|
|
||
| int timeIndex = (int) ((samples.getTimestamp(i) - minTimestamp) / step); | ||
| if (timeIndex >= 0 && timeIndex < timeArraySize) { |
There was a problem hiding this comment.
Maybe use SampleList#search to find the start index?
| private final double[] values; | ||
| private final int[] counts; // Only used for AVG |
There was a problem hiding this comment.
Can we use GroupTimesArray as well?
| return SampleList.fromList(samples); | ||
| } | ||
|
|
||
| FloatSampleList.Builder builder = new FloatSampleList.Builder(values.length); |
There was a problem hiding this comment.
Instead of using values.length we probably can track how many values being transformed from NaN to a real number to get the exact count?
0fa1fee to
cadd188
Compare
| return values.length; | ||
| } | ||
|
|
||
| void aggregate(int timeIndex, double value, StreamingAggregationType type) { |
There was a problem hiding this comment.
IMO it makes more sense to have StreamingAggregationType as an instance property set in constructor instead of pass into this method. you can't change the type per .aggregate call anyways.
|
|
||
| private void processChunk(ChunkIterator chunk) throws IOException { | ||
| SampleList samples = chunk.decodeSamples(this.minTimestamp, this.maxTimestamp).samples(); | ||
| int startIndex = GroupTimeArrays.findStartIndex(samples, this.minTimestamp); |
There was a problem hiding this comment.
what if startIndex < 0/not found?
There was a problem hiding this comment.
Taking another look. calling findStartIndex is unnecessary. The reason is chunk.decodeSamples already filter out the samples, and only return samples with correct time range. Just iterating the sample list from index=0 is enough
|
|
||
| NoTagStreamingState(StreamingAggregationType aggregationType, int timeArraySize, long minTimestamp, long maxTimestamp, long step) { | ||
| this.aggregationType = aggregationType; | ||
| this.arrays = new GroupTimeArrays(timeArraySize, aggregationType.requiresCountTracking()); |
There was a problem hiding this comment.
under what case will timeArraySize != (maxTimestamp-minTimestamp)/step +1?
There was a problem hiding this comment.
Use the correct formula. It should be (maxTimestamp - 1 - minTimeStamp)/step + 1, I think i might over-allocate the array before.
| Map<String, String> groupLabelMap = new HashMap<>(); | ||
| for (String tagName : groupByTags) { | ||
| if (allLabels != null && allLabels.has(tagName)) { | ||
| groupLabelMap.put(tagName, allLabels.get(tagName)); |
There was a problem hiding this comment.
iirc this will re-iterate all the tags every .has and every .get.
if we do this a lot, it will prob add up
depending on how many labelsgroupByTags has, it maybe faster to do toMapView once and directly access all the tags.
and if we haven't already, we probably should update labels to accept a filter list of label names which will only collect the required labels in 1 pass, so you can just do
e.g.
return allLabels.keepOnlyLabels(groupByTags);
There was a problem hiding this comment.
Yep this is on my radar. I do not think we need to convert bytes into Map. We can just have a new API which can sort labels internally, and return all values by doing one scan. Did not include into this PR since I do not want to make one PR complicated.
| // Streaming aggregations are CSS-safe for all supported types | ||
| // because they work with idempotent operations that can be | ||
| // safely merged across segment boundaries | ||
| return true; |
There was a problem hiding this comment.
assuming there is never any duplicate sample right?
if that's the case, let's make this very clear here
and iiuc, it is explicitly not idempotent, which is why we must ensure no duplicate samples when reading. So fix/remove that comment as well
| * @return true if groupByTags is null or empty, false otherwise | ||
| */ | ||
| public boolean isGlobalAggregation() { | ||
| return groupByTags == null || groupByTags.isEmpty(); |
There was a problem hiding this comment.
i keep getting confused on this, remind me again why groupByTag empty means it's global, but otherwise not global?
isn't sum/min/max/avg always global aggregation?
cadd188 to
9edc6ca
Compare
| # for eligible queries (fetch | sum/min/max/avg) | ||
|
|
||
| test_setup: | ||
| name: "Streaming Aggregation Integration Test" |
There was a problem hiding this comment.
do these need to be separate from the other test cases? or can we run the same test cases in both streaming and non-streaming mode?
| data: | ||
| - metric: { } | ||
| values: [-2.5,5,12,-6,7.5,4,-3,9.5,6,5,3,2] | ||
| - name: "streaming - min across POST series" |
There was a problem hiding this comment.
same question as above, can we just run all test cases with streaming enabled and disabled? this looks like a lot of duplication
| } | ||
|
|
||
| // removeEmpty right after fetch is a no-op — strip it to enable streaming optimization | ||
| if (!unfoldStages.isEmpty() && unfoldStages.get(0) instanceof RemoveEmptyStage) { |
There was a problem hiding this comment.
can you add a TODO here? imo this logic belongs in an optimizer layer and shouldn't be part of the SourceBuilderVisitor - we should be removing extraneous functions and no-ops prior to trying to build the SourceBuilder
| /** | ||
| * Check if unfoldStages consists of exactly one supported streaming aggregation stage. | ||
| */ | ||
| private boolean isStreamingEligibleStages(List<UnaryPipelineStage> unfoldStages) { |
There was a problem hiding this comment.
should we have UnaryPipelineStage#isStreamingEligble?
| /** | ||
| * Map a pipeline stage instance to the corresponding StreamingAggregationType. | ||
| */ | ||
| private StreamingAggregationType mapStageToStreamingType(UnaryPipelineStage stage) { |
There was a problem hiding this comment.
same here, if we're already creating the stage and using that to construct the streaming agg builder, can we do that directly? or use a factory instead of adding the logic here
| state.processDocument(doc, tsdbDocValues, tsdbLeafReader); | ||
|
|
||
| // Track memory usage for circuit breaker | ||
| addCircuitBreakerBytes(state.getEstimatedMemoryUsage()); |
There was a problem hiding this comment.
are we supposed to add the delta, or add the total used? if delta this seems like it will significantly overcount and trip early
| GroupTimeArrays arrays = groupData.computeIfAbsent(groupLabels, k -> new GroupTimeArrays(timeArraySize, aggregationType)); | ||
|
|
||
| // Process chunks for this group | ||
| List<ChunkIterator> chunks = reader.chunksForDoc(docId, docValues); |
There was a problem hiding this comment.
for LSI leaves this needs to be wrapped in dedup iterator
we can guarantee no duplicates across chunk MemChunks but inner chunks within a MemChunk often have duplicatesn (let's also add some test coverage for this case)
There was a problem hiding this comment.
Actually can you be more specific? E.g., give a concrete case of duplications.
| boolean tsdbEnabled = TSDBPlugin.TSDB_ENGINE_ENABLED.get(queryShardContext.getIndexSettings().getSettings()); | ||
| if (!tsdbEnabled) { | ||
| throw new IllegalStateException( | ||
| "Time Series Streaming Aggregator can only be used on indices where index.tsdb_engine.enabled is true" | ||
| ); | ||
| } |
There was a problem hiding this comment.
nit: feels like we should think about a base TSDBAggBuilder class for the copy/pasted code
| TSDBLeafReader tsdbLeafReader = TSDBLeafReader.unwrapLeafReader(ctx.reader()); | ||
| if (tsdbLeafReader == null) { | ||
| throw new IOException("Expected TSDBLeafReader but found: " + ctx.reader().getClass().getName()); | ||
| } | ||
| if (!tsdbLeafReader.overlapsTimeRange(minTimestamp, maxTimestamp)) { | ||
| // No matching data in this segment, skip it by returning the sub-collector | ||
| return sub; | ||
| } |
There was a problem hiding this comment.
nit: same here, feels like we should think about a base TSDBAgg class for the copy/pasted logic so we don't need to test in multiple places
| boolean pushdown = resolvePushdownParam(request, true); | ||
| boolean profile = request.paramAsBoolean(PROFILE_PARAM, false); | ||
| boolean includeMetadata = request.paramAsBoolean(INCLUDE_METADATA_PARAM, false); | ||
| boolean streaming = request.paramAsBoolean(STREAMING_PARAM, false); |
There was a problem hiding this comment.
could you elaborate on how this works with stitching? iiuc we can use streaming for pushdown-able parts but have to fallback for the stitched section in the middle?
There was a problem hiding this comment.
Follow-up TODOs:
- Move the logic of whether we should use streaming aggregation into a dedicated optimization layer. Also apply other optimizations, such as remove all
removeEmptyimmediately after fetch - Refactor the aggregation type to reduce the deduplicated mapping between state and the stages.
- Apply the streaming aggregation into the stitching scenario
- Add new interface for labels to avoid multiple scanning.
| GroupTimeArrays arrays = groupData.computeIfAbsent(groupLabels, k -> new GroupTimeArrays(timeArraySize, aggregationType)); | ||
|
|
||
| // Process chunks for this group | ||
| List<ChunkIterator> chunks = reader.chunksForDoc(docId, docValues); |
There was a problem hiding this comment.
Actually can you be more specific? E.g., give a concrete case of duplications.
| sampleList, | ||
| groupLabels, | ||
| this.minTimestamp, | ||
| this.maxTimestamp, |
There was a problem hiding this comment.
seems like this needs to be TimeSeries.calculateAlignedMaxTimestamp(this.minTimestamp, this.maxTimestamp, this.step); instead
| } | ||
|
|
||
| // Create single TimeSeries with empty labels (global aggregation) | ||
| TimeSeries timeSeries = new TimeSeries(sampleList, ByteLabels.emptyLabels(), this.minTimestamp, this.maxTimestamp, this.step, null); |
There was a problem hiding this comment.
same here
maxTimestamp should be TimeSeries.calculateAlignedMaxTimestamp(this.minTimestamp, this.maxTimestamp, this.step);
…timization This commit adds a complete streaming aggregator infrastructure to accelerate "fetch | sum/min/max/avg" queries by processing data in streaming fashion without reconstructing full time series in memory. Key Features: - Memory efficient time-indexed arrays instead of HashMap operations - Support for sum, min, max, avg aggregations with optional tag-based grouping - NaN handling for missing data without boolean arrays - Circuit breaker integration for memory tracking - CSS-safe concurrent segment search support - New configuration setting: index.tsdb.streaming_aggregator.enable (default: false) Implementation: - StreamingAggregationState interface for different aggregation strategies - TimeSeriesStreamingAggregator with NoTag and Tag streaming implementations - TimeSeriesStreamingAggregationBuilder with XContent parsing and serialization - TimeSeriesStreamingAggregatorFactory with proper OpenSearch integration - Planning layer integration in SourceBuilderVisitor (currently disabled) Backward Compatibility: - New aggregator registration alongside existing TimeSeriesUnfoldAggregator - Setting defaults to false to avoid affecting existing functionality - All existing tests pass without modification - Pattern detection temporarily disabled pending proper setting validation Co-Authored-By: Claude Sonnet 4 <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…lder Adapt TimeSeriesStreamingAggregator to the new FloatSampleList API: - Use SampleList directly from decodeSamples() instead of List<Sample> - Use indexed access (getValue/getTimestamp) instead of iterator pattern - Use FloatSampleList.Builder for non-AVG result construction - Keep SampleList.fromList() only for AVG path (SumCountSample) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
- Add `boolean streaming` field to M3OSTranslator.Params record - Enable streaming aggregation eligibility when params.streaming() is true, stageStack is empty, and aggregation type is SUM/AVG/MIN/MAX - Remove TSDB_ENGINE_ENABLE_STREAMING_AGGREGATOR index setting from TSDBPlugin and its check in TimeSeriesStreamingAggregationBuilder - Wire up `streaming` REST query parameter in RestM3QLAction - Default streaming to false in all existing tests for compatibility Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
- Add streaming flag to QueryConfig and wire through M3QLTranslator and RestQueryExecutor for e2e test support - Fix registerUsage in TimeSeriesStreamingAggregationBuilder to register with AggregationUsageService - Add StreamingAggregationIT with test cases for sum/min/max/avg validating streaming results match unfold results - Add TimeSeriesStreamingAggregationBenchmark comparing unfold+sum vs streaming+sum with production-like labels (12 tags from fixed pools), both global and grouped by zone+service Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
The streaming aggregator was creating InternalTimeSeries with no reduce stage, causing cross-shard reduction to use SampleMerger ANY_WINS policy instead of properly re-aggregating partial results. Fix by passing the corresponding pipeline stage (SumStage/MinStage/MaxStage/AvgStage) as the reduceStage. Also add 10 streaming REST integration test variants to verify correctness through the full REST API path. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…timization removeEmpty right after fetch is a no-op since fetch already returns only non-empty series. Stripping it from the front of unfoldStages allows queries like `fetch | removeEmpty | sum` to qualify for streaming aggregation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
… streaming e2e coverage Add unit tests for TimeSeriesStreamingAggregator covering leaf pruning, buildEmptyAggregation, reduce stage creation for all types, time array size calculation, and factory configuration. Change counts arrays from long[] to int[] since per-time-index series counts won't exceed 2B, saving 4 bytes per element. Add streaming: true variants for eligible queries in multi-index and empty-result e2e test files. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…nd optimize - Extract StreamingAggregationState, GroupTimeArrays, NoTagStreamingState, and TagStreamingState to separate file with package-private visibility - Refactor NoTagStreamingState to delegate to GroupTimeArrays instead of duplicating values/counts arrays and aggregateValue() switch - Track nonNanCount in GroupTimeArrays for exact FloatSampleList.Builder capacity - Use SampleList#search binary search for start index instead of iterating from 0 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…dback Use (maxTimestamp - 1 - minTimestamp) / step + 1 to avoid over-allocating by 1 when end is step-aligned. Make StreamingAggregationType an instance field of GroupTimeArrays instead of a per-call parameter. Remove redundant findStartIndex binary search since decodeSamples already filters to [minTimestamp, maxTimestamp). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…up iterator, deduplicate YAML tests - Fix circuit breaker overcounting by tracking memory deltas per bucket instead of passing total memory on every document - Add mergeAndDedup for LSI MemChunks with overlapping inner chunks to prevent duplicate timestamps from being double-counted during streaming aggregation - Add streaming_eligible YAML flag so the framework auto-runs queries with and without streaming, eliminating copy-paste test duplication - Add TODO comments for future refactoring: RemoveEmptyStage optimizer pass, streaming eligibility on stage classes, and shared base TSDB aggregator classes Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…aker batching, simplify test dedup - Extract AbstractTimeSeriesAggregationBuilder base class with shared time range fields, validation, TSDB check, bucketCardinality, and serialization helpers - Add CircuitBreakerBatcher and peak tracking to streaming aggregator to match unfold aggregator pattern (5 MB batch threshold, maxCircuitBreakerBytes metric) - Remove streaming_eligible flag; framework now runs all success queries with streaming=true automatically to validate parity - Refactor dedup iterator out of StreamingAggregationState interface; always apply DedupIterator even for single chunks - Fix RemoveEmptyStage TODO to reference PlanNode optimization pass Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…overage - Match unfold aggregator pattern: only apply MergeIterator+DedupIterator when multiple chunks exist (LSI MemChunk inner chunks with overlapping timestamps). Single chunks pass through directly via getFirst() without wrapping. - Add 4 LSI MemChunk dedup tests: partial overlap, dedup-then-aggregate across documents, tag-based grouping with dedup, and three inner chunks with progressive overlap. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…to O(1) Rename all "streaming" references to "inplace" across the codebase. Optimize TagInplaceAggregationState.getEstimatedMemoryUsage() from O(G) to O(1) by precomputing per-group memory cost. Add incremental consumeMemoryDelta() to eliminate per-document HashMap lookups in the hot path. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…tion Both NoTagInplaceAggregationState and TagInplaceAggregationState were passing the raw query end boundary as maxTimestamp to the TimeSeries constructor instead of aligning it to step boundaries. This aligns with what TimeSeriesUnfoldAggregator already does. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…ation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
Signed-off-by: Ziwen Wan <38177263+ZiwenWan@users.noreply.github.com>
…issing method argument Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…timization
This commit adds a complete streaming aggregator infrastructure to accelerate "fetch | sum/min/max/avg" queries by processing data in streaming fashion without reconstructing full time series in memory.
Key Features:
Implementation:
Backward Compatibility:
Benchmark
Configuration: 100 samples per series, 3 iterations, average time mode (ms/op)
Sum (No Grouping)
Sum By Tags (Grouped)
Issues Resolved
List any issues this PR will resolve, e.g. Closes [...].
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.