-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
The join field is a special field type that is used to create a relationship between documents in the same index. Parent and child documents must be indexed into the same shard to maintain the lineage relationship for aggregation purposes. Ref: https://opensearch.org/docs/latest/field-types/supported-field-types/join/
For example, if there is a child document on shard_0 and a parent document on shard_1, then a parent aggregation would not collect any buckets because the relationship between the documents would not be found. In the case of concurrent segment search, this shard routing consideration leads to some similar problems.
Focusing on the ParentJoinAggregator, this is a special type of aggregator:
Lines 62 to 66 in bf10ff7
| /** | |
| * An aggregator that joins documents based on global ordinals. | |
| * Global ordinals that match the main query and the <code>inFilter</code> query are replayed | |
| * with documents matching the <code>outFilter</code> query. | |
| */ |
Specifically for parent aggregations, the inFilter set is the child documents, while the outFilter set will be the parent documents. This is reversed for children aggregations. Unlike other aggregators, during LeafBucketCollector::colect ParentJoinAggregator does not collect the buckets. Instead it iterates through the inFilter set and saves the globalOrdinal we are aggregating into a CollectionStrategy object.
Lines 118 to 127 in bf10ff7
| return new LeafBucketCollector() { | |
| @Override | |
| public void collect(int docId, long owningBucketOrd) throws IOException { | |
| if (parentDocs.get(docId) && globalOrdinals.advanceExact(docId)) { | |
| int globalOrdinal = (int) globalOrdinals.nextOrd(); | |
| assert globalOrdinal != -1 && globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS; | |
| collectionStrategy.add(owningBucketOrd, globalOrdinal); | |
| } | |
| } | |
| }; |
Later at the beginning of the reduce phase ParentJoinAggregator::beforeBuildingBuckets is called which will iterate through the outFilter set and now we will collect buckets using the documents from the outFilter set that were saved in CollectionStrategy.
Lines 178 to 182 in bf10ff7
| for (long owningBucketOrd : ordsToCollect) { | |
| if (collectionStrategy.exists(owningBucketOrd, globalOrdinal)) { | |
| collectBucket(sub, docId, owningBucketOrd); | |
| } | |
| } |
In the concurrent segment search path, there are 2 main problems with this approach.
CollectionStrategyis created per-Aggregator instance, so it will not be shared across segment slices.CollectionStrategyis basically what stores the relationship between parent and children docs, so it not being shared between segment slices means that we will not find the same relationships across segment slices.ParentJoinAggregator::beforeBuildingBucketsiterates over all of the segments of the entire index.
Lines 137 to 138 in bf10ff7
IndexReader indexReader = context().searcher().getIndexReader(); for (LeafReaderContext ctx : indexReader.leaves()) {
This means that we will callcollectBucketfor every segment slice where the relationship inCollectionStrategywas found and lead to overcounting in most cases.
Here are a few potential changes we can make:
To address problem 1:
- Create
CollectionStrategyas a shared object between threads. This will lead to contention between the threads, especially if we implement it as a concurrent hash map and the data set is very low cardinality. - Since
LeafBucketCollector::collectis called concurrently inindex_searcherthreads butParentJoinAggregator::beforeBuildingBucketsis called sequentially for each aggregator in the reduce phase, we could merge together theCollectionStrategyobjects across the aggregators before we callaggregator.buildTopLevel()inAggregationCollectorManager.
To address problem 2:
- Each aggregator instance could track every
LeafReaderContextthat it as collected and in the reduce phase it will only evaluate those sameLeafReaderContexts instead of all the leaves for the entire index. - At the reduce level, we only call
buildTopLevelfor a singleParentJoinAggregatorinstance. This would work similar to howInternalAggregations::reduceonly calls reduce once here:
OpenSearch/server/src/main/java/org/opensearch/search/aggregations/InternalAggregations.java
Line 186 in bf10ff7
reducedAggregations.add(first.reduce(aggregations, context));
Taking a step back, I think there is a broader problem that not all aggregation plugins may support concurrent search due to the nature of how they are implemented and if a user has a custom plugin that they have created they may want a way to not use concurrent search for their specific plugin. Off the top of my head, there could be other aggregation types that have shard routing considerations similar to this parent-join one or maybe there could be some arithmetic optimizations to an aggregation that specifically works better in the case of sequential searching. To address that I propose introducing a dynamic cluster setting that takes in a comma separated list of aggregator names that we would disable using concurrent search on.
In summary, these are the following changes I think we should make:
- Dynamic cluster setting to disable concurrent segment search for certain types of aggregators. See [Concurrent Segment Search] Dynamic cluster setting to disable concurrent segment search for a given aggregation type #9446
- Solution 2 for problem 1 above. This seems like a pretty involved change that needs to happen in one of the
Aggregatorbase classes to provide a new interface for doing the merge. - Solution 1 for problem 2 above.
- A new test case to cover the issue described in problem 1 above. This should be fairly straightforward as indexing a child document and parent document in separate segments and then performing concurrent segment search on 1 segment per slice should trigger this failure.
Metadata
Metadata
Assignees
Labels
Type
Projects
Status
Status