Skip to content

Implement TimeSeriesStreamingAggregator for M3QL fetch+aggregation op…#43

Merged
philiplhchan merged 20 commits intoopensearch-project:mainfrom
ZiwenWan:streamingAgg
Mar 13, 2026
Merged

Implement TimeSeriesStreamingAggregator for M3QL fetch+aggregation op…#43
philiplhchan merged 20 commits intoopensearch-project:mainfrom
ZiwenWan:streamingAgg

Conversation

@ZiwenWan
Copy link
Copy Markdown
Contributor

@ZiwenWan ZiwenWan commented Feb 8, 2026

…timization

This commit adds a complete streaming aggregator infrastructure to accelerate "fetch | sum/min/max/avg" queries by processing data in streaming fashion without reconstructing full time series in memory.

Key Features:

  • Memory efficient time-indexed arrays instead of HashMap operations
  • Support for sum, min, max, avg aggregations with optional tag-based grouping
  • NaN handling for missing data without boolean arrays
  • Circuit breaker integration for memory tracking
  • CSS-safe concurrent segment search support

Implementation:

  • StreamingAggregationState interface for different aggregation strategies
  • TimeSeriesStreamingAggregator with NoTag and Tag streaming implementations
  • TimeSeriesStreamingAggregationBuilder with XContent parsing and serialization
  • TimeSeriesStreamingAggregatorFactory with proper OpenSearch integration
  • Planning layer integration in SourceBuilderVisitor (currently disabled)

Backward Compatibility:

  • New aggregator registration alongside existing TimeSeriesUnfoldAggregator
  • Setting defaults to false to avoid affecting existing functionality
  • All existing tests pass without modification
  • Pattern detection temporarily disabled pending proper setting validation

Benchmark

Configuration: 100 samples per series, 3 iterations, average time mode (ms/op)

Sum (No Grouping)

Cardinality Streaming (ms/op) Unfold (ms/op) Speedup
10,000 20.073 ± 1.259 191.662 ± 12.569 ~9.5x
100,000 201.215 ± 11.874 33,165.460 ± 2,461.329 ~165x

Sum By Tags (Grouped)

Cardinality Streaming (ms/op) Unfold (ms/op) Speedup
10,000 28.119 ± 3.205 200.983 ± 30.603 ~7x
100,000 261.259 ± 24.963 33,995.305 ± 16,925.169 ~130x

Issues Resolved

List any issues this PR will resolve, e.g. Closes [...].

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@codecov
Copy link
Copy Markdown

codecov bot commented Feb 8, 2026

Codecov Report

❌ Patch coverage is 87.50000% with 55 lines in your changes missing coverage. Please review.
✅ Project coverage is 89.13%. Comparing base (fa710d9) to head (13dd011).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
.../query/aggregator/TimeSeriesInplaceAggregator.java 79.79% 14 Missing and 6 partials ⚠️
...tsdb/query/aggregator/InplaceAggregationState.java 86.61% 8 Missing and 11 partials ⚠️
...ggregator/TimeSeriesInplaceAggregationBuilder.java 92.47% 1 Missing and 6 partials ⚠️
...nsearch/tsdb/lang/m3/dsl/SourceBuilderVisitor.java 85.18% 2 Missing and 2 partials ⚠️
...org/opensearch/tsdb/query/rest/BaseTSDBAction.java 50.00% 3 Missing ⚠️
...rg/opensearch/tsdb/lang/m3/dsl/M3OSTranslator.java 0.00% 1 Missing ⚠️
...aggregator/TimeSeriesUnfoldAggregationBuilder.java 83.33% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main      #43      +/-   ##
============================================
- Coverage     89.15%   89.13%   -0.02%     
- Complexity     5039     5181     +142     
============================================
  Files           322      330       +8     
  Lines         15532    16076     +544     
  Branches       2338     2430      +92     
============================================
+ Hits          13848    14330     +482     
- Misses         1014     1045      +31     
- Partials        670      701      +31     
Flag Coverage Δ
unittests 89.13% <87.50%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/main/java/org/opensearch/tsdb/TSDBPlugin.java 83.66% <100.00%> (+0.16%) ⬆️
...gregator/AbstractTimeSeriesAggregationBuilder.java 100.00% <100.00%> (ø)
.../tsdb/query/aggregator/InplaceAggregationType.java 100.00% <100.00%> (ø)
...aggregator/TimeSeriesInplaceAggregatorFactory.java 100.00% <100.00%> (ø)
...b/query/aggregator/TimeSeriesUnfoldAggregator.java 89.14% <ø> (+0.18%) ⬆️
...org/opensearch/tsdb/query/rest/RestM3QLAction.java 92.35% <100.00%> (+0.14%) ⬆️
...rg/opensearch/tsdb/lang/m3/dsl/M3OSTranslator.java 64.70% <0.00%> (ø)
...aggregator/TimeSeriesUnfoldAggregationBuilder.java 88.88% <83.33%> (-2.42%) ⬇️
...org/opensearch/tsdb/query/rest/BaseTSDBAction.java 90.32% <50.00%> (-9.68%) ⬇️
...nsearch/tsdb/lang/m3/dsl/SourceBuilderVisitor.java 92.27% <85.18%> (-0.47%) ⬇️
... and 3 more

... and 3 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@ZiwenWan ZiwenWan marked this pull request as draft February 8, 2026 06:54
Comment on lines +843 to +845
if (!stageStack.isEmpty()) {
return false;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means there's no stage after the current stage? e.g.

fetch | a | b | c

should have a stack of

c -> b

when reaching a?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right. I use the wrong planning logic. Let me update the planning logic.

private static class NoTagStreamingState implements StreamingAggregationState {
private final StreamingAggregationType aggregationType;
private final double[] values;
private final long[] counts; // Only used for AVG
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think int[] counts is good enough? (the upper bound should be number of timeseries we retrieved?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. int[] should be good enough.

@ZiwenWan ZiwenWan closed this Feb 19, 2026
@ZiwenWan ZiwenWan reopened this Feb 19, 2026
boolean pushdown = resolvePushdownParam(request, true);
boolean profile = request.paramAsBoolean(PROFILE_PARAM, false);
boolean includeMetadata = request.paramAsBoolean(INCLUDE_METADATA_PARAM, false);
boolean streaming = request.paramAsBoolean(STREAMING_PARAM, false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we plan to enable it by default in the future? (Seems no downside?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside is the data duplication and thus lead to double count. Duplication might happen between different segments from same shard or duplication between partitions (upstream double write during metrics migration)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you elaborate on how this works with stitching? iiuc we can use streaming for pushdown-able parts but have to fallback for the stitched section in the middle?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, then the stitching is enabled, this new feature will not be enabled. That can be a future optimization to enable this during the stitching scenario

}

// removeEmpty right after fetch is a no-op — strip it to enable streaming optimization
if (!unfoldStages.isEmpty() && unfoldStages.get(0) instanceof RemoveEmptyStage) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For corner cases where someone write fetch | removeEmpty | removeEmpty |..., let's make this a while loop?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good callout. I do not want to make one PR too long. I think this is good enough for most of the cases. But let me add a while loop.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a TODO here? imo this logic belongs in an optimizer layer and shouldn't be part of the SourceBuilderVisitor - we should be removing extraneous functions and no-ops prior to trying to build the SourceBuilder

* List<TimeSeries> results = state.getFinalResults(minTimestamp, maxTimestamp, step);
* }</pre>
*/
public interface StreamingAggregationState {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we eventually make the min/max/sum/avg stages uses the same implementation as this one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can rewrite it in a more efficient way. Do not want to make this PR too complicated.

* Check if unfoldStages consists of exactly one supported streaming aggregation stage.
*/
private boolean isStreamingEligibleStages(List<UnaryPipelineStage> unfoldStages) {
if (unfoldStages.size() != 1) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to all those stages should be a global stage?

I wonder if it's something like fetch | a | b | c | sum we probably is still able to apply this optimization theoretically? Since a | b | c being able to be handled in data node means it does not care about the order the samples are being processed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. You cannot always optimize this. For example if a is a moving function, or derivate, we have to reconstruct the whole series. and keep all the tags in memory. There might be some advanced approach to optimize that. But I think we should start from the most basic ones. And also, based on chat with Muttley team, optimizing fetch | sum is good enough.
  2. There are optimization chance, for example fetch | moving sum | sum, we can change the order, and the result should be the same. We can apply this rule on planning layer, and try to push aggregation stage as early as possible. This can also reduce the pressure of moving
  3. If all functions before aggregation are operating on sample layer, we can apply transformations in collect() function, and directly do aggregation. E.g., fetch | abs | sum. This might need some code refactoring, and we can evaluate the frequency of those queries later

/**
* Create streaming state for a bucket based on aggregation configuration.
*/
private StreamingAggregationState createStreamingState() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we move Streaming state related code to another file or to the interface's file? This one feels a bit longer.

}

int timeIndex = (int) ((samples.getTimestamp(i) - minTimestamp) / step);
if (timeIndex >= 0 && timeIndex < timeArraySize) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use SampleList#search to find the start index?

Comment on lines +299 to +300
private final double[] values;
private final int[] counts; // Only used for AVG
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use GroupTimesArray as well?

return SampleList.fromList(samples);
}

FloatSampleList.Builder builder = new FloatSampleList.Builder(values.length);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using values.length we probably can track how many values being transformed from NaN to a real number to get the exact count?

return values.length;
}

void aggregate(int timeIndex, double value, StreamingAggregationType type) {
Copy link
Copy Markdown
Collaborator

@philiplhchan philiplhchan Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it makes more sense to have StreamingAggregationType as an instance property set in constructor instead of pass into this method. you can't change the type per .aggregate call anyways.


private void processChunk(ChunkIterator chunk) throws IOException {
SampleList samples = chunk.decodeSamples(this.minTimestamp, this.maxTimestamp).samples();
int startIndex = GroupTimeArrays.findStartIndex(samples, this.minTimestamp);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if startIndex < 0/not found?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking another look. calling findStartIndex is unnecessary. The reason is chunk.decodeSamples already filter out the samples, and only return samples with correct time range. Just iterating the sample list from index=0 is enough


NoTagStreamingState(StreamingAggregationType aggregationType, int timeArraySize, long minTimestamp, long maxTimestamp, long step) {
this.aggregationType = aggregationType;
this.arrays = new GroupTimeArrays(timeArraySize, aggregationType.requiresCountTracking());
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

under what case will timeArraySize != (maxTimestamp-minTimestamp)/step +1?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the correct formula. It should be (maxTimestamp - 1 - minTimeStamp)/step + 1, I think i might over-allocate the array before.

Map<String, String> groupLabelMap = new HashMap<>();
for (String tagName : groupByTags) {
if (allLabels != null && allLabels.has(tagName)) {
groupLabelMap.put(tagName, allLabels.get(tagName));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc this will re-iterate all the tags every .has and every .get.
if we do this a lot, it will prob add up
depending on how many labelsgroupByTags has, it maybe faster to do toMapView once and directly access all the tags.
and if we haven't already, we probably should update labels to accept a filter list of label names which will only collect the required labels in 1 pass, so you can just do
e.g.

return allLabels.keepOnlyLabels(groupByTags);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep this is on my radar. I do not think we need to convert bytes into Map. We can just have a new API which can sort labels internally, and return all values by doing one scan. Did not include into this PR since I do not want to make one PR complicated.

Comment on lines +117 to +120
// Streaming aggregations are CSS-safe for all supported types
// because they work with idempotent operations that can be
// safely merged across segment boundaries
return true;
Copy link
Copy Markdown
Collaborator

@philiplhchan philiplhchan Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assuming there is never any duplicate sample right?
if that's the case, let's make this very clear here

and iiuc, it is explicitly not idempotent, which is why we must ensure no duplicate samples when reading. So fix/remove that comment as well

* @return true if groupByTags is null or empty, false otherwise
*/
public boolean isGlobalAggregation() {
return groupByTags == null || groupByTags.isEmpty();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i keep getting confused on this, remind me again why groupByTag empty means it's global, but otherwise not global?

isn't sum/min/max/avg always global aggregation?

# for eligible queries (fetch | sum/min/max/avg)

test_setup:
name: "Streaming Aggregation Integration Test"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these need to be separate from the other test cases? or can we run the same test cases in both streaming and non-streaming mode?

data:
- metric: { }
values: [-2.5,5,12,-6,7.5,4,-3,9.5,6,5,3,2]
- name: "streaming - min across POST series"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question as above, can we just run all test cases with streaming enabled and disabled? this looks like a lot of duplication

}

// removeEmpty right after fetch is a no-op — strip it to enable streaming optimization
if (!unfoldStages.isEmpty() && unfoldStages.get(0) instanceof RemoveEmptyStage) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a TODO here? imo this logic belongs in an optimizer layer and shouldn't be part of the SourceBuilderVisitor - we should be removing extraneous functions and no-ops prior to trying to build the SourceBuilder

/**
* Check if unfoldStages consists of exactly one supported streaming aggregation stage.
*/
private boolean isStreamingEligibleStages(List<UnaryPipelineStage> unfoldStages) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have UnaryPipelineStage#isStreamingEligble?

/**
* Map a pipeline stage instance to the corresponding StreamingAggregationType.
*/
private StreamingAggregationType mapStageToStreamingType(UnaryPipelineStage stage) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, if we're already creating the stage and using that to construct the streaming agg builder, can we do that directly? or use a factory instead of adding the logic here

state.processDocument(doc, tsdbDocValues, tsdbLeafReader);

// Track memory usage for circuit breaker
addCircuitBreakerBytes(state.getEstimatedMemoryUsage());
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we supposed to add the delta, or add the total used? if delta this seems like it will significantly overcount and trip early

GroupTimeArrays arrays = groupData.computeIfAbsent(groupLabels, k -> new GroupTimeArrays(timeArraySize, aggregationType));

// Process chunks for this group
List<ChunkIterator> chunks = reader.chunksForDoc(docId, docValues);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for LSI leaves this needs to be wrapped in dedup iterator

we can guarantee no duplicates across chunk MemChunks but inner chunks within a MemChunk often have duplicatesn (let's also add some test coverage for this case)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually can you be more specific? E.g., give a concrete case of duplications.

Comment on lines +184 to +189
boolean tsdbEnabled = TSDBPlugin.TSDB_ENGINE_ENABLED.get(queryShardContext.getIndexSettings().getSettings());
if (!tsdbEnabled) {
throw new IllegalStateException(
"Time Series Streaming Aggregator can only be used on indices where index.tsdb_engine.enabled is true"
);
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: feels like we should think about a base TSDBAggBuilder class for the copy/pasted code

Comment on lines +160 to +167
TSDBLeafReader tsdbLeafReader = TSDBLeafReader.unwrapLeafReader(ctx.reader());
if (tsdbLeafReader == null) {
throw new IOException("Expected TSDBLeafReader but found: " + ctx.reader().getClass().getName());
}
if (!tsdbLeafReader.overlapsTimeRange(minTimestamp, maxTimestamp)) {
// No matching data in this segment, skip it by returning the sub-collector
return sub;
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same here, feels like we should think about a base TSDBAgg class for the copy/pasted logic so we don't need to test in multiple places

boolean pushdown = resolvePushdownParam(request, true);
boolean profile = request.paramAsBoolean(PROFILE_PARAM, false);
boolean includeMetadata = request.paramAsBoolean(INCLUDE_METADATA_PARAM, false);
boolean streaming = request.paramAsBoolean(STREAMING_PARAM, false);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you elaborate on how this works with stitching? iiuc we can use streaming for pushdown-able parts but have to fallback for the stitched section in the middle?

Copy link
Copy Markdown
Contributor Author

@ZiwenWan ZiwenWan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up TODOs:

  1. Move the logic of whether we should use streaming aggregation into a dedicated optimization layer. Also apply other optimizations, such as remove all removeEmpty immediately after fetch
  2. Refactor the aggregation type to reduce the deduplicated mapping between state and the stages.
  3. Apply the streaming aggregation into the stitching scenario
  4. Add new interface for labels to avoid multiple scanning.

GroupTimeArrays arrays = groupData.computeIfAbsent(groupLabels, k -> new GroupTimeArrays(timeArraySize, aggregationType));

// Process chunks for this group
List<ChunkIterator> chunks = reader.chunksForDoc(docId, docValues);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually can you be more specific? E.g., give a concrete case of duplications.

@ZiwenWan ZiwenWan marked this pull request as ready for review February 27, 2026 23:58
Comment thread src/main/java/org/opensearch/tsdb/query/rest/BaseTSDBAction.java Outdated
Comment thread src/main/java/org/opensearch/tsdb/query/rest/BaseTSDBAction.java Outdated
sampleList,
groupLabels,
this.minTimestamp,
this.maxTimestamp,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this needs to be TimeSeries.calculateAlignedMaxTimestamp(this.minTimestamp, this.maxTimestamp, this.step); instead

}

// Create single TimeSeries with empty labels (global aggregation)
TimeSeries timeSeries = new TimeSeries(sampleList, ByteLabels.emptyLabels(), this.minTimestamp, this.maxTimestamp, this.step, null);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here
maxTimestamp should be TimeSeries.calculateAlignedMaxTimestamp(this.minTimestamp, this.maxTimestamp, this.step);

ZiwenWan and others added 4 commits March 11, 2026 11:40
…timization

This commit adds a complete streaming aggregator infrastructure to accelerate
"fetch | sum/min/max/avg" queries by processing data in streaming fashion
without reconstructing full time series in memory.

Key Features:
- Memory efficient time-indexed arrays instead of HashMap operations
- Support for sum, min, max, avg aggregations with optional tag-based grouping
- NaN handling for missing data without boolean arrays
- Circuit breaker integration for memory tracking
- CSS-safe concurrent segment search support
- New configuration setting: index.tsdb.streaming_aggregator.enable (default: false)

Implementation:
- StreamingAggregationState interface for different aggregation strategies
- TimeSeriesStreamingAggregator with NoTag and Tag streaming implementations
- TimeSeriesStreamingAggregationBuilder with XContent parsing and serialization
- TimeSeriesStreamingAggregatorFactory with proper OpenSearch integration
- Planning layer integration in SourceBuilderVisitor (currently disabled)

Backward Compatibility:
- New aggregator registration alongside existing TimeSeriesUnfoldAggregator
- Setting defaults to false to avoid affecting existing functionality
- All existing tests pass without modification
- Pattern detection temporarily disabled pending proper setting validation

Co-Authored-By: Claude Sonnet 4 <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…lder

Adapt TimeSeriesStreamingAggregator to the new FloatSampleList API:
- Use SampleList directly from decodeSamples() instead of List<Sample>
- Use indexed access (getValue/getTimestamp) instead of iterator pattern
- Use FloatSampleList.Builder for non-AVG result construction
- Keep SampleList.fromList() only for AVG path (SumCountSample)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
- Add `boolean streaming` field to M3OSTranslator.Params record
- Enable streaming aggregation eligibility when params.streaming() is
  true, stageStack is empty, and aggregation type is SUM/AVG/MIN/MAX
- Remove TSDB_ENGINE_ENABLE_STREAMING_AGGREGATOR index setting from
  TSDBPlugin and its check in TimeSeriesStreamingAggregationBuilder
- Wire up `streaming` REST query parameter in RestM3QLAction
- Default streaming to false in all existing tests for compatibility

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
- Add streaming flag to QueryConfig and wire through M3QLTranslator
  and RestQueryExecutor for e2e test support
- Fix registerUsage in TimeSeriesStreamingAggregationBuilder to
  register with AggregationUsageService
- Add StreamingAggregationIT with test cases for sum/min/max/avg
  validating streaming results match unfold results
- Add TimeSeriesStreamingAggregationBenchmark comparing unfold+sum
  vs streaming+sum with production-like labels (12 tags from fixed
  pools), both global and grouped by zone+service

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
ZiwenWan and others added 13 commits March 11, 2026 11:40
The streaming aggregator was creating InternalTimeSeries with no reduce
stage, causing cross-shard reduction to use SampleMerger ANY_WINS policy
instead of properly re-aggregating partial results. Fix by passing the
corresponding pipeline stage (SumStage/MinStage/MaxStage/AvgStage) as
the reduceStage. Also add 10 streaming REST integration test variants
to verify correctness through the full REST API path.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…timization

removeEmpty right after fetch is a no-op since fetch already returns only
non-empty series. Stripping it from the front of unfoldStages allows queries
like `fetch | removeEmpty | sum` to qualify for streaming aggregation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
… streaming e2e coverage

Add unit tests for TimeSeriesStreamingAggregator covering leaf pruning,
buildEmptyAggregation, reduce stage creation for all types, time array
size calculation, and factory configuration. Change counts arrays from
long[] to int[] since per-time-index series counts won't exceed 2B,
saving 4 bytes per element. Add streaming: true variants for eligible
queries in multi-index and empty-result e2e test files.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…nd optimize

- Extract StreamingAggregationState, GroupTimeArrays, NoTagStreamingState, and
  TagStreamingState to separate file with package-private visibility
- Refactor NoTagStreamingState to delegate to GroupTimeArrays instead of
  duplicating values/counts arrays and aggregateValue() switch
- Track nonNanCount in GroupTimeArrays for exact FloatSampleList.Builder capacity
- Use SampleList#search binary search for start index instead of iterating from 0

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…dback

Use (maxTimestamp - 1 - minTimestamp) / step + 1 to avoid over-allocating
by 1 when end is step-aligned. Make StreamingAggregationType an instance
field of GroupTimeArrays instead of a per-call parameter. Remove redundant
findStartIndex binary search since decodeSamples already filters to
[minTimestamp, maxTimestamp).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…up iterator, deduplicate YAML tests

- Fix circuit breaker overcounting by tracking memory deltas per bucket instead of
  passing total memory on every document
- Add mergeAndDedup for LSI MemChunks with overlapping inner chunks to prevent
  duplicate timestamps from being double-counted during streaming aggregation
- Add streaming_eligible YAML flag so the framework auto-runs queries with and
  without streaming, eliminating copy-paste test duplication
- Add TODO comments for future refactoring: RemoveEmptyStage optimizer pass,
  streaming eligibility on stage classes, and shared base TSDB aggregator classes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…aker batching, simplify test dedup

- Extract AbstractTimeSeriesAggregationBuilder base class with shared time range
  fields, validation, TSDB check, bucketCardinality, and serialization helpers
- Add CircuitBreakerBatcher and peak tracking to streaming aggregator to match
  unfold aggregator pattern (5 MB batch threshold, maxCircuitBreakerBytes metric)
- Remove streaming_eligible flag; framework now runs all success queries with
  streaming=true automatically to validate parity
- Refactor dedup iterator out of StreamingAggregationState interface; always
  apply DedupIterator even for single chunks
- Fix RemoveEmptyStage TODO to reference PlanNode optimization pass

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…overage

- Match unfold aggregator pattern: only apply MergeIterator+DedupIterator when
  multiple chunks exist (LSI MemChunk inner chunks with overlapping timestamps).
  Single chunks pass through directly via getFirst() without wrapping.
- Add 4 LSI MemChunk dedup tests: partial overlap, dedup-then-aggregate across
  documents, tag-based grouping with dedup, and three inner chunks with
  progressive overlap.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…to O(1)

Rename all "streaming" references to "inplace" across the codebase. Optimize
TagInplaceAggregationState.getEstimatedMemoryUsage() from O(G) to O(1) by
precomputing per-group memory cost. Add incremental consumeMemoryDelta() to
eliminate per-document HashMap lookups in the hot path.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…tion

Both NoTagInplaceAggregationState and TagInplaceAggregationState were
passing the raw query end boundary as maxTimestamp to the TimeSeries
constructor instead of aligning it to step boundaries. This aligns with
what TimeSeriesUnfoldAggregator already does.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
…ation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
ZiwenWan and others added 2 commits March 12, 2026 16:01
Signed-off-by: Ziwen Wan <38177263+ZiwenWan@users.noreply.github.com>
…issing method argument

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Ziwen Wan <ziwen.wan@uber.com>
@philiplhchan philiplhchan merged commit 3ddc337 into opensearch-project:main Mar 13, 2026
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants