Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@
import org.opensearch.search.aggregations.MultiBucketCollector;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper;
import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper.AbstractDateHistogramAggregationType;
import org.opensearch.search.aggregations.bucket.filterrewrite.CompositeAggregatorBridge;
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.internal.SearchContext;
Expand All @@ -89,13 +89,15 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;
import java.util.stream.Collectors;

import static org.opensearch.search.aggregations.MultiBucketConsumerService.MAX_BUCKET_SETTING;
import static org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge.segmentMatchAll;

/**
* Main aggregator that aggregates docs from mulitple aggregations
* Main aggregator that aggregates docs from multiple aggregations
*
* @opensearch.internal
*/
Expand All @@ -118,9 +120,8 @@ public final class CompositeAggregator extends BucketsAggregator {

private boolean earlyTerminated;

private final FastFilterRewriteHelper.FastFilterContext fastFilterContext;
private LongKeyedBucketOrds bucketOrds = null;
private Rounding.Prepared preparedRounding = null;
private final FilterRewriteOptimizationContext filterRewriteOptimizationContext;
private LongKeyedBucketOrds bucketOrds;

CompositeAggregator(
String name,
Expand Down Expand Up @@ -166,57 +167,62 @@ public final class CompositeAggregator extends BucketsAggregator {
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) {
return;
}
fastFilterContext.setAggregationType(new CompositeAggregationType());
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
// bucketOrds is used for saving date histogram results
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
preparedRounding = ((CompositeAggregationType) fastFilterContext.getAggregationType()).getRoundingPrepared();
fastFilterContext.buildRanges(sourceConfigs[0].fieldType());
}
}
CompositeAggregatorBridge bridge = new CompositeAggregatorBridge() {
private RoundingValuesSource valuesSource;
private long afterKey = -1L;

/**
* Currently the filter rewrite is only supported for date histograms
*/
public class CompositeAggregationType extends AbstractDateHistogramAggregationType {
private final RoundingValuesSource valuesSource;
private long afterKey = -1L;

public CompositeAggregationType() {
super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript());
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
@Override
protected boolean canOptimize() {
if (canOptimize(sourceConfigs)) {
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}

// bucketOrds is used for saving the date histogram results got from the optimization path
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
return true;
}
return false;
}
}

public Rounding getRounding(final long low, final long high) {
return valuesSource.getRounding();
}
@Override
protected void prepare() throws IOException {
buildRanges(context);
}

public Rounding.Prepared getRoundingPrepared() {
return valuesSource.getPreparedRounding();
}
protected Rounding getRounding(final long low, final long high) {
return valuesSource.getRounding();
}

@Override
protected void processAfterKey(long[] bound, long interval) {
// afterKey is the last bucket key in previous response, and the bucket key
// is the minimum of all values in the bucket, so need to add the interval
if (afterKey != -1L) {
bound[0] = afterKey + interval;
protected Rounding.Prepared getRoundingPrepared() {
return valuesSource.getPreparedRounding();
}
}

public int getSize() {
return size;
}
@Override
protected long[] processAfterKey(long[] bounds, long interval) {
// afterKey is the last bucket key in previous response, and the bucket key
// is the minimum of all values in the bucket, so need to add the interval
if (afterKey != -1L) {
bounds[0] = afterKey + interval;
}
return bounds;
}

@Override
protected int getSize() {
return size;
}

@Override
protected Function<Long, Long> bucketOrdProducer() {
return (key) -> bucketOrds.add(0, getRoundingPrepared().round((long) key));
}
};
filterRewriteOptimizationContext = new FilterRewriteOptimizationContext(bridge, parent, subAggregators.length, context);
}

@Override
Expand Down Expand Up @@ -368,7 +374,7 @@ private boolean isMaybeMultivalued(LeafReaderContext context, SortField sortFiel
return v2 != null && DocValues.unwrapSingleton(v2) == null;

default:
// we have no clue whether the field is multi-valued or not so we assume it is.
// we have no clue whether the field is multivalued or not so we assume it is.
return true;
}
}
Expand Down Expand Up @@ -551,11 +557,7 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
boolean optimized = fastFilterContext.tryFastFilterAggregation(
ctx,
this::incrementBucketDocCount,
(key) -> bucketOrds.add(0, preparedRounding.round((long) key))
);
boolean optimized = filterRewriteOptimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, segmentMatchAll(context, ctx));
if (optimized) throw new CollectionTerminatedException();

finishLeaf();
Expand Down Expand Up @@ -709,11 +711,6 @@ private static class Entry {

@Override
public void collectDebugInfo(BiConsumer<String, Object> add) {
if (fastFilterContext.optimizedSegments > 0) {
add.accept("optimized_segments", fastFilterContext.optimizedSegments);
add.accept("unoptimized_segments", fastFilterContext.segments - fastFilterContext.optimizedSegments);
add.accept("leaf_visited", fastFilterContext.leaf);
add.accept("inner_visited", fastFilterContext.inner);
}
filterRewriteOptimizationContext.populateDebugInfo(add);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.bucket.filterrewrite;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.opensearch.index.mapper.MappedFieldType;

import java.io.IOException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* This interface provides a bridge between an aggregator and the optimization context, allowing
* the aggregator to provide data and optimize the aggregation process.
*
* <p>The main purpose of this interface is to encapsulate the aggregator-specific optimization
* logic and provide access to the data in Aggregator that is required for optimization, while keeping the optimization
* business logic separate from the aggregator implementation.
*
* <p>To use this interface to optimize an aggregator, you should subclass this interface in this package
* and put any specific optimization business logic in it. Then implement this subclass in the aggregator
* to provide data that is needed for doing the optimization
*
* @opensearch.internal
*/
public abstract class AggregatorBridge {

/**
* The field type associated with this aggregator bridge.
*/
MappedFieldType fieldType;

Consumer<Ranges> setRanges;

void setRangesConsumer(Consumer<Ranges> setRanges) {
this.setRanges = setRanges;
}

/**
* Checks whether the aggregator can be optimized.
* <p>
* This method is supposed to be implemented in a specific aggregator to take in fields from there
*
* @return {@code true} if the aggregator can be optimized, {@code false} otherwise.
* The result will be saved in the optimization context.
*/
protected abstract boolean canOptimize();

/**
* Prepares the optimization at shard level after checking aggregator is optimizable.
* <p>
* For example, figure out what are the ranges from the aggregation to do the optimization later
* <p>
* This method is supposed to be implemented in a specific aggregator to take in fields from there
*/
protected abstract void prepare() throws IOException;

/**
* Prepares the optimization for a specific segment when the segment is functionally matching all docs
*
* @param leaf the leaf reader context for the segment
*/
abstract Ranges tryBuildRangesFromSegment(LeafReaderContext leaf) throws IOException;

/**
* Attempts to build aggregation results for a segment
*
* @param values the point values (index structure for numeric values) for a segment
* @param incrementDocCount a consumer to increment the document count for a range bucket. The First parameter is document count, the second is the key of the bucket
* @param ranges
*/
abstract FilterRewriteOptimizationContext.DebugInfo tryOptimize(
PointValues values,
BiConsumer<Long, Long> incrementDocCount,
Ranges ranges
) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.bucket.filterrewrite;

import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig;
import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource;

/**
* For composite aggregation to do optimization when it only has a single date histogram source
*/
public abstract class CompositeAggregatorBridge extends DateHistogramAggregatorBridge {
protected boolean canOptimize(CompositeValuesSourceConfig[] sourceConfigs) {
if (sourceConfigs.length != 1 || !(sourceConfigs[0].valuesSource() instanceof RoundingValuesSource)) return false;
return canOptimize(sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript(), sourceConfigs[0].fieldType());
}

private boolean canOptimize(boolean missing, boolean hasScript, MappedFieldType fieldType) {
if (!missing && !hasScript) {
if (fieldType instanceof DateFieldMapper.DateFieldType) {
if (fieldType.isSearchable()) {
this.fieldType = fieldType;
return true;
}
}
}
return false;
}
}
Loading