-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Add task cancellation check in aggregation code paths #18386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3ecf0f1
62f9eb2
981792d
606a836
79e075d
297ca5a
660623e
1d930b3
1f36b3d
84f45ef
d73dbb6
7cdcf08
c39afed
968e96f
a4e315e
c498cdf
b34734b
f39d96f
9ca1bc4
5fe20f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -128,7 +128,7 @@ public void collect(int docId, long owningBucketOrd) throws IOException { | |
| } | ||
|
|
||
| @Override | ||
| public void postCollection() throws IOException { | ||
| public void postCollection(Runnable checkCancelled) throws IOException { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is something that might be overridden by subclass in plugins. We need to ensure this is not breaking those |
||
| // Delaying until beforeBuildingBuckets | ||
| } | ||
|
|
||
|
|
@@ -177,7 +177,7 @@ public float score() { | |
| } | ||
| } | ||
| } | ||
| super.postCollection(); // Run post collection after collecting the sub-aggs | ||
| super.postCollection(() -> {}); // Run post collection after collecting the sub-aggs | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -119,6 +119,7 @@ | |
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.function.BiConsumer; | ||
| import java.util.function.BiFunction; | ||
| import java.util.function.BooleanSupplier; | ||
| import java.util.function.Function; | ||
| import java.util.function.LongSupplier; | ||
| import java.util.stream.Collectors; | ||
|
|
@@ -514,6 +515,12 @@ private ActionListener<SearchSourceBuilder> buildRewriteListener( | |
| ); | ||
| } | ||
| OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); | ||
| BooleanSupplier isRequestCancelled = () -> { | ||
| if (task instanceof CancellableTask) { | ||
| return ((CancellableTask) task).isCancelled(); | ||
| } | ||
| return false; | ||
| }; | ||
|
Comment on lines
+518
to
+523
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be static lambda somewhere? Doesn't have any logic specific to this class?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is capturing the task instance from the enclosing scope |
||
| if (remoteClusterIndices.isEmpty()) { | ||
| executeLocalSearch( | ||
| task, | ||
|
|
@@ -533,7 +540,7 @@ private ActionListener<SearchSourceBuilder> buildRewriteListener( | |
| localIndices, | ||
| remoteClusterIndices, | ||
| timeProvider, | ||
| searchService.aggReduceContextBuilder(searchRequest.source()), | ||
| searchService.aggReduceContextBuilder(searchRequest.source(), isRequestCancelled), | ||
| remoteClusterService, | ||
| threadPool, | ||
| listener, | ||
|
|
@@ -1265,7 +1272,8 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction | |
| task.getProgressListener(), | ||
| searchRequest, | ||
| shardIterators.size(), | ||
| exc -> cancelTask(task, exc) | ||
| exc -> cancelTask(task, exc), | ||
| task::isCancelled | ||
| ); | ||
| AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction; | ||
| switch (searchRequest.searchType()) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does checkingCancelled for every bucket adds too much overhead? How about sampling after specific number of iterations or something?