Skip to content

[Concurrent Segment Search] Support parent-join aggregations #9316

@jed326

Description

@jed326

The join field is a special field type that is used to create a relationship between documents in the same index. Parent and child documents must be indexed into the same shard to maintain the lineage relationship for aggregation purposes. Ref: https://opensearch.org/docs/latest/field-types/supported-field-types/join/

For example, if there is a child document on shard_0 and a parent document on shard_1, then a parent aggregation would not collect any buckets because the relationship between the documents would not be found. In the case of concurrent segment search, this shard routing consideration leads to some similar problems.

Focusing on the ParentJoinAggregator, this is a special type of aggregator:

/**
* An aggregator that joins documents based on global ordinals.
* Global ordinals that match the main query and the <code>inFilter</code> query are replayed
* with documents matching the <code>outFilter</code> query.
*/

Specifically for parent aggregations, the inFilter set is the child documents, while the outFilter set will be the parent documents. This is reversed for children aggregations. Unlike other aggregators, during LeafBucketCollector::colect ParentJoinAggregator does not collect the buckets. Instead it iterates through the inFilter set and saves the globalOrdinal we are aggregating into a CollectionStrategy object.

return new LeafBucketCollector() {
@Override
public void collect(int docId, long owningBucketOrd) throws IOException {
if (parentDocs.get(docId) && globalOrdinals.advanceExact(docId)) {
int globalOrdinal = (int) globalOrdinals.nextOrd();
assert globalOrdinal != -1 && globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
collectionStrategy.add(owningBucketOrd, globalOrdinal);
}
}
};

Later at the beginning of the reduce phase ParentJoinAggregator::beforeBuildingBuckets is called which will iterate through the outFilter set and now we will collect buckets using the documents from the outFilter set that were saved in CollectionStrategy.

for (long owningBucketOrd : ordsToCollect) {
if (collectionStrategy.exists(owningBucketOrd, globalOrdinal)) {
collectBucket(sub, docId, owningBucketOrd);
}
}

In the concurrent segment search path, there are 2 main problems with this approach.

  1. CollectionStrategy is created per-Aggregator instance, so it will not be shared across segment slices. CollectionStrategy is basically what stores the relationship between parent and children docs, so it not being shared between segment slices means that we will not find the same relationships across segment slices.
  2. ParentJoinAggregator::beforeBuildingBuckets iterates over all of the segments of the entire index.
    IndexReader indexReader = context().searcher().getIndexReader();
    for (LeafReaderContext ctx : indexReader.leaves()) {

    This means that we will call collectBucket for every segment slice where the relationship in CollectionStrategy was found and lead to overcounting in most cases.

Here are a few potential changes we can make:
To address problem 1:

  1. Create CollectionStrategy as a shared object between threads. This will lead to contention between the threads, especially if we implement it as a concurrent hash map and the data set is very low cardinality.
  2. Since LeafBucketCollector::collect is called concurrently in index_searcher threads but ParentJoinAggregator::beforeBuildingBuckets is called sequentially for each aggregator in the reduce phase, we could merge together the CollectionStrategy objects across the aggregators before we call aggregator.buildTopLevel() in AggregationCollectorManager.

To address problem 2:

  1. Each aggregator instance could track every LeafReaderContext that it as collected and in the reduce phase it will only evaluate those same LeafReaderContexts instead of all the leaves for the entire index.
  2. At the reduce level, we only call buildTopLevel for a single ParentJoinAggregator instance. This would work similar to how InternalAggregations::reduce only calls reduce once here:
    reducedAggregations.add(first.reduce(aggregations, context));

Taking a step back, I think there is a broader problem that not all aggregation plugins may support concurrent search due to the nature of how they are implemented and if a user has a custom plugin that they have created they may want a way to not use concurrent search for their specific plugin. Off the top of my head, there could be other aggregation types that have shard routing considerations similar to this parent-join one or maybe there could be some arithmetic optimizations to an aggregation that specifically works better in the case of sequential searching. To address that I propose introducing a dynamic cluster setting that takes in a comma separated list of aggregator names that we would disable using concurrent search on.

In summary, these are the following changes I think we should make:

  • Dynamic cluster setting to disable concurrent segment search for certain types of aggregators. See [Concurrent Segment Search] Dynamic cluster setting to disable concurrent segment search for a given aggregation type #9446
  • Solution 2 for problem 1 above. This seems like a pretty involved change that needs to happen in one of the Aggregator base classes to provide a new interface for doing the merge.
  • Solution 1 for problem 2 above.
  • A new test case to cover the issue described in problem 1 above. This should be fairly straightforward as indexing a child document and parent document in separate segments and then performing concurrent segment search on 1 segment per slice should trigger this failure.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    Status

    🆕 New

    Status

    No status

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions