Add task cancellation check in aggregation code paths#18386
Add task cancellation check in aggregation code paths#18386kaushalmahi12 wants to merge 20 commits intoopensearch-project:mainfrom
Conversation
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
|
❌ Gradle check result for 5fe20f9: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
jainankitk
left a comment
There was a problem hiding this comment.
@kaushalmahi12 - Thanks for raising this PR! This will significantly harden the cancellation mechanism for aggregation queries, especially at the coordinator level. I have added few comments to ensure maintainability and keep performance overhead minimal
| buckets = new LongObjectPagedHashMap<>(grid.buckets.size(), reduceContext.bigArrays()); | ||
| } | ||
| for (Object obj : grid.buckets) { | ||
| checkCancelled(reduceContext); |
There was a problem hiding this comment.
Does checkingCancelled for every bucket adds too much overhead? How about sampling after specific number of iterations or something?
|
|
||
| @Override | ||
| public void postCollection() throws IOException { | ||
| public void postCollection(Runnable checkCancelled) throws IOException { |
There was a problem hiding this comment.
This is something that might be overridden by subclass in plugins. We need to ensure this is not breaking those
| BooleanSupplier isRequestCancelled = () -> { | ||
| if (task instanceof CancellableTask) { | ||
| return ((CancellableTask) task).isCancelled(); | ||
| } | ||
| return false; | ||
| }; |
There was a problem hiding this comment.
This could be static lambda somewhere? Doesn't have any logic specific to this class?
There was a problem hiding this comment.
It is capturing the task instance from the enclosing scope
| Runnable checkCancelled = () -> { | ||
| if (isCancelled.getAsBoolean()) { | ||
| throw new OpenSearchRejectedExecutionException("Search was cancelled while post collection phase"); | ||
| } | ||
| }; |
There was a problem hiding this comment.
Again, this can be common Runnable for rejecting requests post cancellation
| if (reduceContext.isTaskCancelled()) { | ||
| throw new OpenSearchRejectedExecutionException("search is cancelled"); | ||
| } | ||
| reduceContext.consumeBucketsAndMaybeBreak(0); |
There was a problem hiding this comment.
Why do we need this as part of checkCancelled?
There was a problem hiding this comment.
this triggers the check for memory based CB underneath after every X calls to consumeBucketsAndMaybeBreak
| @Override | ||
| public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) { | ||
| boolean promoteToDouble = false; | ||
| checkCancelled(reduceContext); |
There was a problem hiding this comment.
This check looks redundant to me
|
|
||
| @Override | ||
| void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException {} | ||
| void collectZeroDocEntriesIfNeeded(long owningBucketOrd, Runnable checkCancelled) throws IOException {} |
There was a problem hiding this comment.
Do we really need to pass this checkCancelled runnable around? We should be able to use class level object or something?
|
|
||
| @Override | ||
| public AbstractInternalHDRPercentiles reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) { | ||
| checkCancelled(reduceContext); |
There was a problem hiding this comment.
Again, this check is redundant. Seeing this at few other places as well, which we can probably get rid of
|
Closing this PR in favor of : #18426 |
Description
This change adds cancellation checks for nested and bucket aggregations. This change will solve the long running cancelled queries stuck in aggregation code paths both at shard and co-ordinator level.
Related Issues
Resolves #15413
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.