Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
872ef99
Support sub agg in filter rewrite optimization
bowenlan-amzn Jan 30, 2025
7936c05
Clean unused code
bowenlan-amzn Feb 25, 2025
4808924
remove singleton DV related change
bowenlan-amzn Feb 25, 2025
f5267eb
let aggregator decide whether to support sub agg
bowenlan-amzn Mar 24, 2025
0c88c59
refactor range collector
bowenlan-amzn Mar 25, 2025
b211be3
prevent NPE
bowenlan-amzn Mar 25, 2025
9420ec7
handle tryPrecomputeAggregationForLeaf interface
bowenlan-amzn Mar 25, 2025
02251f1
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Mar 26, 2025
3b2e507
clean up for review
bowenlan-amzn Mar 26, 2025
d5fe988
add changelog
bowenlan-amzn Mar 26, 2025
edfc0f2
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Mar 27, 2025
e6ff1f1
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Apr 2, 2025
2584455
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Apr 4, 2025
173bcb4
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Apr 7, 2025
9207592
add segment level check
bowenlan-amzn Apr 8, 2025
6fc79eb
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Apr 8, 2025
0e074f4
improvements
bowenlan-amzn Apr 8, 2025
f9a730d
experiment annotation
bowenlan-amzn Apr 8, 2025
51569de
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Apr 8, 2025
cfe5df6
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Apr 9, 2025
38b6c71
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Apr 9, 2025
ae7214e
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Apr 9, 2025
0b5e86c
Update server/src/main/java/org/opensearch/search/SearchService.java
bowenlan-amzn Apr 9, 2025
29fbea6
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Apr 9, 2025
955a9cc
Collect sub agg after each bucket
bowenlan-amzn Apr 9, 2025
80fde5e
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Apr 10, 2025
6a00855
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Apr 10, 2025
8961d7d
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Apr 10, 2025
c95088a
try fixed bit set
bowenlan-amzn Apr 10, 2025
ac8e372
address comments
bowenlan-amzn Apr 11, 2025
e751779
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Apr 11, 2025
90b481b
Merge branch 'main' into sub-agg-pr
bowenlan-amzn Apr 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Star Tree] [Search] Add query changes to support unsigned-long in star tree ([#17275](https://github.com/opensearch-project/OpenSearch/pull/17275))
- Add `ApproximateMatchAllQuery` that targets match_all queries and approximates sorts ([#17772](https://github.com/opensearch-project/OpenSearch/pull/17772))
- Add TermsQuery support to Search GRPC endpoint ([#17888](https://github.com/opensearch-project/OpenSearch/pull/17888))
- Support sub agg in filter rewrite optimization ([#17447](https://github.com/opensearch-project/OpenSearch/pull/17447)

### Changed
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.MAX_OPEN_PIT_CONTEXT,
SearchService.MAX_PIT_KEEPALIVE_SETTING,
SearchService.MAX_AGGREGATION_REWRITE_FILTERS,
SearchService.AGGREGATION_REWRITE_FILTER_SEGMENT_THRESHOLD,
SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING,
SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD,
SearchService.KEYWORD_INDEX_OR_DOC_VALUES_ENABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import java.util.function.Function;
import java.util.function.LongSupplier;

import static org.opensearch.search.SearchService.AGGREGATION_REWRITE_FILTER_SEGMENT_THRESHOLD;
import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
Expand Down Expand Up @@ -207,6 +208,7 @@
private final String concurrentSearchMode;
private final SetOnce<Boolean> requestShouldUseConcurrentSearch = new SetOnce<>();
private final int maxAggRewriteFilters;
private final int filterRewriteSegmentThreshold;
private final int cardinalityAggregationPruningThreshold;
private final boolean keywordIndexOrDocValuesEnabled;

Expand Down Expand Up @@ -267,6 +269,7 @@
this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder;

this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
this.filterRewriteSegmentThreshold = evaluateAggRewriteFilterSegThreshold();
this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold();
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled();
Expand Down Expand Up @@ -1124,6 +1127,18 @@
return 0;
}

@Override
public int filterRewriteSegmentThreshold() {
return filterRewriteSegmentThreshold;

Check warning on line 1132 in server/src/main/java/org/opensearch/search/DefaultSearchContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/DefaultSearchContext.java#L1132

Added line #L1132 was not covered by tests
}

private int evaluateAggRewriteFilterSegThreshold() {
if (clusterService != null) {
return clusterService.getClusterSettings().get(AGGREGATION_REWRITE_FILTER_SEGMENT_THRESHOLD);
}
return 0;
}

@Override
public int cardinalityAggregationPruningThreshold() {
return cardinalityAggregationPruningThreshold;
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.UUIDs;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
Expand Down Expand Up @@ -309,6 +310,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
);

// value 0 means rewrite filters optimization in aggregations will be disabled
@ExperimentalApi
public static final Setting<Integer> MAX_AGGREGATION_REWRITE_FILTERS = Setting.intSetting(
"search.max_aggregation_rewrite_filters",
3000,
Expand All @@ -317,6 +319,16 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

// only do optimization when there's enough docs per range at segment level and sub agg exists
@ExperimentalApi
public static final Setting<Integer> AGGREGATION_REWRITE_FILTER_SEGMENT_THRESHOLD = Setting.intSetting(
"search.aggregation_rewrite_filters.segment_threshold.docs_per_bucket",
1000,
0,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Integer> INDICES_MAX_CLAUSE_COUNT_SETTING = Setting.intSetting(
"indices.query.bool.max_clause_count",
1024,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
import java.util.stream.Collectors;

import static org.opensearch.search.aggregations.MultiBucketConsumerService.MAX_BUCKET_SETTING;
import static org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge.segmentMatchAll;
import static org.opensearch.search.aggregations.bucket.filterrewrite.AggregatorBridge.segmentMatchAll;

/**
* Main aggregator that aggregates docs from multiple aggregations
Expand Down Expand Up @@ -173,6 +173,9 @@ public final class CompositeAggregator extends BucketsAggregator {

@Override
protected boolean canOptimize() {
if (subAggregators.length > 0) {
return false;
}
if (canOptimize(sourceConfigs)) {
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
if (rawAfterKey != null) {
Expand Down Expand Up @@ -566,7 +569,12 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t
@Override
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
finishLeaf(); // May need to wrap up previous leaf if it could not be precomputed
return filterRewriteOptimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, segmentMatchAll(context, ctx));
return filterRewriteOptimizationContext.tryOptimize(
ctx,
this::incrementBucketDocCount,
segmentMatchAll(context, ctx),
collectableSubAggregators
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,23 @@

package org.opensearch.search.aggregations.bucket.filterrewrite;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.aggregations.bucket.filterrewrite.rangecollector.RangeCollector;
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.opensearch.search.aggregations.bucket.filterrewrite.PointTreeTraversal.createCollector;
import static org.opensearch.search.aggregations.bucket.filterrewrite.PointTreeTraversal.multiRangesTraverse;

/**
* This interface provides a bridge between an aggregator and the optimization context, allowing
Expand All @@ -35,6 +42,8 @@
*/
public abstract class AggregatorBridge {

static final Logger logger = LogManager.getLogger(Helper.loggerName);

/**
* The field type associated with this aggregator bridge.
*/
Expand Down Expand Up @@ -75,16 +84,51 @@
/**
* Attempts to build aggregation results for a segment
*
* @param values the point values (index structure for numeric values) for a segment
* @param incrementDocCount a consumer to increment the document count for a range bucket. The First parameter is document count, the second is the key of the bucket
* @param values the point values (index structure for numeric values) for a segment
* @param incrementDocCount a consumer to increment the document count for a range bucket. The First parameter is document count, the second is the key of the bucket
* @param ranges
* @param subAggCollectorParam
*/
abstract FilterRewriteOptimizationContext.DebugInfo tryOptimize(
abstract FilterRewriteOptimizationContext.OptimizeResult tryOptimize(
PointValues values,
BiConsumer<Long, Long> incrementDocCount,
Ranges ranges
Ranges ranges,
FilterRewriteOptimizationContext.SubAggCollectorParam subAggCollectorParam
) throws IOException;

static FilterRewriteOptimizationContext.OptimizeResult getResult(
PointValues values,
BiConsumer<Long, Long> incrementDocCount,
Ranges ranges,
Function<Integer, Long> getBucketOrd,
int size,
FilterRewriteOptimizationContext.SubAggCollectorParam subAggCollectorParam
) throws IOException {
BiConsumer<Integer, Integer> incrementFunc = (activeIndex, docCount) -> {
long bucketOrd = getBucketOrd.apply(activeIndex);
incrementDocCount.accept(bucketOrd, (long) docCount);
};

PointValues.PointTree tree = values.getPointTree();
FilterRewriteOptimizationContext.OptimizeResult optimizeResult = new FilterRewriteOptimizationContext.OptimizeResult();
int activeIndex = ranges.firstRangeIndex(tree.getMinPackedValue(), tree.getMaxPackedValue());
if (activeIndex < 0) {
logger.debug("No ranges match the query, skip the fast filter optimization");
return optimizeResult;

Check warning on line 117 in server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/AggregatorBridge.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/AggregatorBridge.java#L116-L117

Added lines #L116 - L117 were not covered by tests
}
RangeCollector collector = createCollector(
ranges,
incrementFunc,
size,
activeIndex,
getBucketOrd,
optimizeResult,
subAggCollectorParam
);

return multiRangesTraverse(tree, collector);
}

/**
* Checks whether the top level query matches all documents on the segment
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.opensearch.search.aggregations.bucket.filterrewrite.PointTreeTraversal.multiRangesTraverse;

/**
* For date histogram aggregation
*/
Expand Down Expand Up @@ -127,27 +125,31 @@ private DateFieldMapper.DateFieldType getFieldType() {
return (DateFieldMapper.DateFieldType) fieldType;
}

/**
* Get the size of buckets to stop early
*/
protected int getSize() {
return Integer.MAX_VALUE;
}

@Override
final FilterRewriteOptimizationContext.DebugInfo tryOptimize(
final FilterRewriteOptimizationContext.OptimizeResult tryOptimize(
PointValues values,
BiConsumer<Long, Long> incrementDocCount,
Ranges ranges
Ranges ranges,
FilterRewriteOptimizationContext.SubAggCollectorParam subAggCollectorParam
) throws IOException {
int size = getSize();

DateFieldMapper.DateFieldType fieldType = getFieldType();
BiConsumer<Integer, Integer> incrementFunc = (activeIndex, docCount) -> {

Function<Integer, Long> getBucketOrd = (activeIndex) -> {
long rangeStart = LongPoint.decodeDimension(ranges.lowers[activeIndex], 0);
rangeStart = fieldType.convertNanosToMillis(rangeStart);
long bucketOrd = getBucketOrd(bucketOrdProducer().apply(rangeStart));
incrementDocCount.accept(bucketOrd, (long) docCount);
return getBucketOrd(bucketOrdProducer().apply(rangeStart));
};

return multiRangesTraverse(values.getPointTree(), ranges, incrementFunc, size);
return getResult(values, incrementDocCount, ranges, getBucketOrd, size, subAggCollectorParam);
}

private static long getBucketOrd(long bucketOrd) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.util.DocIdSetBuilder;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.search.aggregations.BucketCollector;
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
Expand Down Expand Up @@ -42,12 +44,16 @@

private Ranges ranges; // built at shard level

private boolean hasSubAgg;

// debug info related fields
private final AtomicInteger leafNodeVisited = new AtomicInteger();
private final AtomicInteger innerNodeVisited = new AtomicInteger();
private final AtomicInteger segments = new AtomicInteger();
private final AtomicInteger optimizedSegments = new AtomicInteger();

private int segmentThreshold = 0;

public FilterRewriteOptimizationContext(
AggregatorBridge aggregatorBridge,
final Object parent,
Expand All @@ -65,7 +71,8 @@
private boolean canOptimize(final Object parent, final int subAggLength, SearchContext context) throws IOException {
if (context.maxAggRewriteFilters() == 0) return false;

if (parent != null || subAggLength != 0) return false;
if (parent != null) return false;
this.hasSubAgg = subAggLength > 0;

boolean canOptimize = aggregatorBridge.canOptimize();
if (canOptimize) {
Expand All @@ -81,6 +88,7 @@
}
logger.debug("Fast filter rewriteable: {} for shard {}", canOptimize, shardId);

segmentThreshold = context.filterRewriteSegmentThreshold();
return canOptimize;
}

Expand All @@ -94,10 +102,14 @@
* Usage: invoked at segment level — in getLeafCollector of aggregator
*
* @param incrementDocCount consume the doc_count results for certain ordinal
* @param segmentMatchAll if your optimization can prepareFromSegment, you should pass in this flag to decide whether to prepareFromSegment
* @param segmentMatchAll we can always tryOptimize for match all scenario
*/
public boolean tryOptimize(final LeafReaderContext leafCtx, final BiConsumer<Long, Long> incrementDocCount, boolean segmentMatchAll)
throws IOException {
public boolean tryOptimize(
final LeafReaderContext leafCtx,
final BiConsumer<Long, Long> incrementDocCount,
boolean segmentMatchAll,
BucketCollector collectableSubAggregators
) throws IOException {
segments.incrementAndGet();
if (!canOptimize) {
return false;
Expand All @@ -123,7 +135,25 @@
Ranges ranges = getRanges(leafCtx, segmentMatchAll);
if (ranges == null) return false;

consumeDebugInfo(aggregatorBridge.tryOptimize(values, incrementDocCount, ranges));
if (hasSubAgg && this.segmentThreshold > leafCtx.reader().maxDoc() / ranges.getSize()) {
// comparing with a rough estimate of docs per range in this segment
return false;

Check warning on line 140 in server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java#L140

Added line #L140 was not covered by tests
}

OptimizeResult optimizeResult;
SubAggCollectorParam subAggCollectorParam;
if (hasSubAgg) {
subAggCollectorParam = new SubAggCollectorParam(collectableSubAggregators, leafCtx);
} else {
subAggCollectorParam = null;
}
try {
optimizeResult = aggregatorBridge.tryOptimize(values, incrementDocCount, ranges, subAggCollectorParam);
consumeDebugInfo(optimizeResult);
} catch (AbortFilterRewriteOptimizationException e) {
logger.error("Abort filter rewrite optimization, fall back to default path");
return false;

Check warning on line 155 in server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java#L153-L155

Added lines #L153 - L155 were not covered by tests
}

optimizedSegments.incrementAndGet();
logger.debug("Fast filter optimization applied to shard {} segment {}", shardId, leafCtx.ord);
Expand All @@ -132,6 +162,18 @@
return true;
}

/**
* Parameters for {@link org.opensearch.search.aggregations.bucket.filterrewrite.rangecollector.SubAggRangeCollector}
*/
public record SubAggCollectorParam(BucketCollector collectableSubAggregators, LeafReaderContext leafCtx) {
}

static class AbortFilterRewriteOptimizationException extends RuntimeException {
AbortFilterRewriteOptimizationException(String message, Exception e) {
super(message, e);
}

Check warning on line 174 in server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java#L173-L174

Added lines #L173 - L174 were not covered by tests
}

Ranges getRanges(LeafReaderContext leafCtx, boolean segmentMatchAll) {
if (!preparedAtShardLevel) {
try {
Expand Down Expand Up @@ -160,20 +202,22 @@
/**
* Contains debug info of BKD traversal to show in profile
*/
static class DebugInfo {
public static class OptimizeResult {
private final AtomicInteger leafNodeVisited = new AtomicInteger(); // leaf node visited
private final AtomicInteger innerNodeVisited = new AtomicInteger(); // inner node visited

void visitLeaf() {
public DocIdSetBuilder[] builders = new DocIdSetBuilder[0];

public void visitLeaf() {
leafNodeVisited.incrementAndGet();
}

void visitInner() {
public void visitInner() {
innerNodeVisited.incrementAndGet();
}
}

void consumeDebugInfo(DebugInfo debug) {
void consumeDebugInfo(OptimizeResult debug) {
leafNodeVisited.addAndGet(debug.leafNodeVisited.get());
innerNodeVisited.addAndGet(debug.innerNodeVisited.get());
}
Expand Down
Loading
Loading