Skip to content

Add task cancellation check in aggregation code paths#18386

Closed
kaushalmahi12 wants to merge 20 commits intoopensearch-project:mainfrom
kaushalmahi12:task_cancellation_check
Closed

Add task cancellation check in aggregation code paths#18386
kaushalmahi12 wants to merge 20 commits intoopensearch-project:mainfrom
kaushalmahi12:task_cancellation_check

Conversation

@kaushalmahi12
Copy link
Copy Markdown
Contributor

Description

This change adds cancellation checks for nested and bucket aggregations. This change will solve the long running cancelled queries stuck in aggregation code paths both at shard and co-ordinator level.

Related Issues

Resolves #15413

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
@kaushalmahi12 kaushalmahi12 added backport 2.x Backport to 2.x branch backport 3.0 labels May 28, 2025
@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 5fe20f9: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Copy Markdown
Contributor

@jainankitk jainankitk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaushalmahi12 - Thanks for raising this PR! This will significantly harden the cancellation mechanism for aggregation queries, especially at the coordinator level. I have added few comments to ensure maintainability and keep performance overhead minimal

buckets = new LongObjectPagedHashMap<>(grid.buckets.size(), reduceContext.bigArrays());
}
for (Object obj : grid.buckets) {
checkCancelled(reduceContext);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does checkingCancelled for every bucket adds too much overhead? How about sampling after specific number of iterations or something?


@Override
public void postCollection() throws IOException {
public void postCollection(Runnable checkCancelled) throws IOException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that might be overridden by subclass in plugins. We need to ensure this is not breaking those

Comment on lines +518 to +523
BooleanSupplier isRequestCancelled = () -> {
if (task instanceof CancellableTask) {
return ((CancellableTask) task).isCancelled();
}
return false;
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be static lambda somewhere? Doesn't have any logic specific to this class?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is capturing the task instance from the enclosing scope

Comment on lines +64 to +68
Runnable checkCancelled = () -> {
if (isCancelled.getAsBoolean()) {
throw new OpenSearchRejectedExecutionException("Search was cancelled while post collection phase");
}
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this can be common Runnable for rejecting requests post cancellation

if (reduceContext.isTaskCancelled()) {
throw new OpenSearchRejectedExecutionException("search is cancelled");
}
reduceContext.consumeBucketsAndMaybeBreak(0);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this as part of checkCancelled?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this triggers the check for memory based CB underneath after every X calls to consumeBucketsAndMaybeBreak

@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
boolean promoteToDouble = false;
checkCancelled(reduceContext);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check looks redundant to me


@Override
void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException {}
void collectZeroDocEntriesIfNeeded(long owningBucketOrd, Runnable checkCancelled) throws IOException {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to pass this checkCancelled runnable around? We should be able to use class level object or something?


@Override
public AbstractInternalHDRPercentiles reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
checkCancelled(reduceContext);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this check is redundant. Seeing this at few other places as well, which we can probably get rid of

@kaushalmahi12
Copy link
Copy Markdown
Contributor Author

Closing this PR in favor of : #18426

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport 2.x Backport to 2.x branch backport 3.0 bug Something isn't working Search:Aggregations

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] Deeply nested aggregations are not terminable by any mechanism and cause Out of Memory errors in data nodes.

2 participants