diff --git a/docs/changelog/83290.yaml b/docs/changelog/83290.yaml new file mode 100644 index 0000000000000..9b3bb8ef056e5 --- /dev/null +++ b/docs/changelog/83290.yaml @@ -0,0 +1,5 @@ +pr: 83290 +summary: Update YAML Rest tests to check for product header on all responses +area: Infra/REST API +type: enhancement +issues: [] diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java index bab93e56b653f..beac9ab88c78c 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java @@ -593,7 +593,7 @@ protected void finishHim(Exception failure) { */ protected void finishHim(Exception failure, List indexingFailures, List searchFailures, boolean timedOut) { logger.debug("[{}]: finishing without any catastrophic failures", task.getId()); - scrollSource.close(() -> { + scrollSource.close(threadPool.getThreadContext().preserveContext(() -> { if (failure == null) { BulkByScrollResponse response = buildResponse( timeValueNanos(System.nanoTime() - startTime.get()), @@ -605,7 +605,7 @@ protected void finishHim(Exception failure, List indexingFailures, List } else { listener.onFailure(failure); } - }); + })); } /** diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cat.snapshots/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cat.snapshots/10_basic.yml index f7d60671c7e88..23860cb412722 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cat.snapshots/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cat.snapshots/10_basic.yml @@ -23,6 +23,10 @@ $/ --- "Test cat snapshots output": + - skip: + version: " - 8.1.99" + reason: "Pause BWC tests until #83290 is backported" + - do: snapshot.create_repository: repository: test_cat_snapshots_1 diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.clone/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.clone/10_basic.yml index fb289355e08fb..80e7139cd8df3 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.clone/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.clone/10_basic.yml @@ -1,5 +1,8 @@ --- setup: + - skip: + version: " - 8.1.99" + reason: "Pause BWC tests until #83290 is backported" - do: snapshot.create_repository: diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.create/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.create/10_basic.yml index f7c522b712244..e060e7dff5bda 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.create/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.create/10_basic.yml @@ -1,5 +1,8 @@ --- setup: + - skip: + version: " - 8.1.99" + reason: "Pause BWC tests until #83290 is backported" - do: snapshot.create_repository: diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.get/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.get/10_basic.yml index b50ece87e9f88..08753e4e732bf 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.get/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.get/10_basic.yml @@ -1,5 +1,8 @@ --- setup: + - skip: + version: " - 8.1.99" + reason: "Pause BWC tests until #83290 is backported" - do: snapshot.create_repository: @@ -61,6 +64,7 @@ setup: --- "Get snapshot info when verbose is false": + - do: indices.create: index: test_index @@ -198,7 +202,6 @@ setup: - skip: version: " - 7.12.99" reason: "Introduced in 7.13.0" - - do: indices.create: index: test_index diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.get_repository/20_repository_uuid.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.get_repository/20_repository_uuid.yml index 0532d208d0cba..503c6cc7133de 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.get_repository/20_repository_uuid.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.get_repository/20_repository_uuid.yml @@ -1,4 +1,9 @@ --- +setup: + - skip: + version: " - 8.1.99" + reason: "Pause BWC tests until #83290 is backported" +--- "Get repository returns UUID": - skip: version: " - 7.12.99" diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.restore/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.restore/10_basic.yml index 1ea5b542625e8..e91f38e985e43 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.restore/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.restore/10_basic.yml @@ -1,5 +1,8 @@ --- setup: + - skip: + version: " - 8.1.99" + reason: "Pause BWC tests until #83290 is backported" - do: snapshot.create_repository: diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.status/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.status/10_basic.yml index c35f2419bdc91..2c4573ccd58b8 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.status/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.status/10_basic.yml @@ -1,5 +1,8 @@ --- setup: + - skip: + version: " - 8.1.99" + reason: "Pause BWC tests until #83290 is backported" - do: snapshot.create_repository: diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/30_snapshot.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/30_snapshot.yml index 48986cfe82d74..c8270d3b9cc2c 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/30_snapshot.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/30_snapshot.yml @@ -1,5 +1,9 @@ --- setup: + - skip: + version: " - 8.1.99" + reason: "Pause BWC tests until #83290 is backported" + - do: snapshot.create_repository: repository: test_repo diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java index 2f3e92d2f55a9..c2931714e72a7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java @@ -16,10 +16,13 @@ import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreService; +import java.util.function.Supplier; + import static org.elasticsearch.snapshots.RestoreService.restoreInProgress; public class RestoreClusterStateListener implements ClusterStateListener { @@ -29,43 +32,48 @@ public class RestoreClusterStateListener implements ClusterStateListener { private final ClusterService clusterService; private final String uuid; private final ActionListener listener; + private final Supplier contextSupplier; private RestoreClusterStateListener( ClusterService clusterService, RestoreService.RestoreCompletionResponse response, - ActionListener listener + ActionListener listener, + Supplier contextSupplier ) { this.clusterService = clusterService; this.uuid = response.getUuid(); this.listener = listener; + this.contextSupplier = contextSupplier; } @Override public void clusterChanged(ClusterChangedEvent changedEvent) { - final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid); - final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid); - if (prevEntry == null) { - // When there is a master failure after a restore has been started, this listener might not be registered - // on the current master and as such it might miss some intermediary cluster states due to batching. - // Clean up listener in that case and acknowledge completion of restore operation to client. - clusterService.removeListener(this); - listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null)); - } else if (newEntry == null) { - clusterService.removeListener(this); - ImmutableOpenMap shards = prevEntry.shards(); - assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state(); - assert RestoreService.completed(shards) : "expected all restore entries to be completed"; - RestoreInfo ri = new RestoreInfo( - prevEntry.snapshot().getSnapshotId().getName(), - prevEntry.indices(), - shards.size(), - shards.size() - RestoreService.failedShards(shards) - ); - RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri); - logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId()); - listener.onResponse(response); - } else { - // restore not completed yet, wait for next cluster state update + try (ThreadContext.StoredContext stored = contextSupplier.get()) { + final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid); + final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid); + if (prevEntry == null) { + // When there is a master failure after a restore has been started, this listener might not be registered + // on the current master and as such it might miss some intermediary cluster states due to batching. + // Clean up listener in that case and acknowledge completion of restore operation to client. + clusterService.removeListener(this); + listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null)); + } else if (newEntry == null) { + clusterService.removeListener(this); + ImmutableOpenMap shards = prevEntry.shards(); + assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state(); + assert RestoreService.completed(shards) : "expected all restore entries to be completed"; + RestoreInfo ri = new RestoreInfo( + prevEntry.snapshot().getSnapshotId().getName(), + prevEntry.indices(), + shards.size(), + shards.size() - RestoreService.failedShards(shards) + ); + RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri); + logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId()); + listener.onResponse(response); + } else { + // restore not completed yet, wait for next cluster state update + } } } @@ -76,8 +84,11 @@ public void clusterChanged(ClusterChangedEvent changedEvent) { public static void createAndRegisterListener( ClusterService clusterService, RestoreService.RestoreCompletionResponse response, - ActionListener listener + ActionListener listener, + ThreadContext threadContext ) { - clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener)); + clusterService.addListener( + new RestoreClusterStateListener(clusterService, response, listener, threadContext.newRestorableContext(true)) + ); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index 7b247f1b14a42..73b66fa5d1bb5 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -72,7 +72,12 @@ protected void masterOperation( ) { restoreService.restoreSnapshot(request, listener.delegateFailure((delegatedListener, restoreCompletionResponse) -> { if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { - RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener); + RestoreClusterStateListener.createAndRegisterListener( + clusterService, + restoreCompletionResponse, + delegatedListener, + threadPool.getThreadContext() + ); } else { delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo())); } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index 66d5428d5d135..122659c64422e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -213,7 +213,7 @@ public void removeApplier(ClusterStateApplier applier) { } /** - * Add a listener for updated cluster states + * Add a listener for updated cluster states. Listeners are executed in the system thread context. */ public void addListener(ClusterStateListener listener) { clusterStateListeners.add(listener); @@ -222,7 +222,7 @@ public void addListener(ClusterStateListener listener) { /** * Removes a listener for updated cluster states. */ - public void removeListener(ClusterStateListener listener) { + public void removeListener(final ClusterStateListener listener) { clusterStateListeners.remove(listener); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 77885fb624e4c..b22967eb7b626 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -2957,7 +2958,8 @@ static Map filterDataStreamAliases( * @param listener listener */ private void addListener(Snapshot snapshot, ActionListener> listener) { - snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>()).add(listener); + snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>()) + .add(ContextPreservingActionListener.wrapPreservingContext(listener, threadPool.getThreadContext())); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ClientYamlTestResponse.java b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ClientYamlTestResponse.java index bdd8ba9dab1df..86121fa0d7da0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ClientYamlTestResponse.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ClientYamlTestResponse.java @@ -87,13 +87,20 @@ public String getReasonPhrase() { * Get a list of all of the values of all warning headers returned in the response. */ public List getWarningHeaders() { - List warningHeaders = new ArrayList<>(); + return getHeaders("Warning"); + } + + /** + * Get a list of all the values of a given header returned in the response. + */ + public List getHeaders(String name) { + List headers = new ArrayList<>(); for (Header header : response.getHeaders()) { - if (header.getName().equals("Warning")) { - warningHeaders.add(header.getValue()); + if (header.getName().equalsIgnoreCase(name)) { + headers.add(header.getValue()); } } - return warningHeaders; + return headers; } /** diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/section/DoSection.java b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/section/DoSection.java index 23a7146561da9..efc53b08fad27 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/section/DoSection.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/section/DoSection.java @@ -367,6 +367,7 @@ public void execute(ClientYamlTestExecutionContext executionContext) throws IOEx final String testPath = executionContext.getClientYamlTestCandidate() != null ? executionContext.getClientYamlTestCandidate().getTestPath() : null; + checkElasticProductHeader(response.getHeaders("X-elastic-product")); checkWarningHeaders(response.getWarningHeaders(), testPath); } catch (ClientYamlTestResponseException e) { ClientYamlTestResponse restTestResponse = e.getRestTestResponse(); @@ -392,6 +393,31 @@ public void execute(ClientYamlTestExecutionContext executionContext) throws IOEx } } + void checkElasticProductHeader(final List productHeaders) { + if (productHeaders.isEmpty()) { + fail("Response is missing required X-Elastic-Product response header"); + } + boolean headerPresent = false; + final List unexpected = new ArrayList<>(); + for (String header : productHeaders) { + if (header.equals("Elasticsearch")) { + headerPresent = true; + break; + } else { + unexpected.add(header); + } + } + if (headerPresent == false) { + StringBuilder failureMessage = new StringBuilder(); + appendBadHeaders( + failureMessage, + unexpected, + "did not get expected product header [Elasticsearch], found header" + (unexpected.size() > 1 ? "s" : "") + ); + fail(failureMessage.toString()); + } + } + void checkWarningHeaders(final List warningHeaders) { checkWarningHeaders(warningHeaders, null); } diff --git a/test/framework/src/test/java/org/elasticsearch/test/rest/yaml/section/DoSectionTests.java b/test/framework/src/test/java/org/elasticsearch/test/rest/yaml/section/DoSectionTests.java index fdd3451012d5c..b7238588ffe36 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/rest/yaml/section/DoSectionTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/rest/yaml/section/DoSectionTests.java @@ -605,6 +605,7 @@ public void testNodeSelectorByVersion() throws IOException { doSection.getApiCallSection().getNodeSelector() ) ).thenReturn(mockResponse); + when(mockResponse.getHeaders("X-elastic-product")).thenReturn(List.of("Elasticsearch")); doSection.execute(context); verify(context).callApi( "indices.get_field_mapping", diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 1661585b5062f..e6053ce1ff818 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -268,7 +268,8 @@ public void onFailure(Exception e) { assert restoreInfo.failedShards() > 0 : "Should have failed shards"; delegatedListener.onResponse(new PutFollowAction.Response(true, false, false)); } - }) + }), + threadPool.getThreadContext() ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java index 1f775b97ee4d0..e76154ee5f470 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; @@ -178,10 +179,16 @@ private void removeRetentionLeaseForShard( ) { logger.trace("{} removing retention lease [{}] while unfollowing leader index", followerShardId, retentionLeaseId); final ThreadContext threadContext = threadPool.getThreadContext(); + // We're about to stash the thread context for this retention lease removal. The listener will be completed while the + // context is stashed. The context needs to be restored in the listener when it is completing or else it is simply wiped. + final ActionListener preservedListener = new ContextPreservingActionListener<>( + threadContext.newRestorableContext(true), + listener + ); try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { // we have to execute under the system context so that if security is enabled the removal is authorized threadContext.markAsSystemContext(); - CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, listener); + CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, preservedListener); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportXPackUsageAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportXPackUsageAction.java index 959ffc448f548..6a9d00e62e975 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportXPackUsageAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportXPackUsageAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.internal.node.NodeClient; @@ -20,15 +21,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackFeatureSet; -import org.elasticsearch.xpack.core.XPackFeatureSet.Usage; -import org.elasticsearch.xpack.core.common.IteratingActionListener; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReferenceArray; -import java.util.function.BiConsumer; public class TransportXPackUsageAction extends TransportMasterNodeAction { @@ -66,32 +61,28 @@ protected List usageActions() { @Override protected void masterOperation(Task task, XPackUsageRequest request, ClusterState state, ActionListener listener) { - final ActionListener> usageActionListener = listener.delegateFailure( - (l, usages) -> l.onResponse(new XPackUsageResponse(usages)) - ); - final AtomicReferenceArray featureSetUsages = new AtomicReferenceArray<>(usageActions.size()); - final AtomicInteger position = new AtomicInteger(0); - final BiConsumer>> consumer = (featureUsageAction, iteratingListener) -> { - // Since we're executing the actions locally we should create a new request - // to avoid mutating the original request and setting the wrong parent task, - // since it is possible that the parent task gets cancelled and new child tasks are banned. - final XPackUsageRequest childRequest = new XPackUsageRequest(); - childRequest.setParentTask(request.getParentTask()); - client.executeLocally(featureUsageAction, childRequest, iteratingListener.delegateFailure((l, usageResponse) -> { - featureSetUsages.set(position.getAndIncrement(), usageResponse.getUsage()); - // the value sent back doesn't matter since our predicate keeps iterating - l.onResponse(Collections.emptyList()); - })); - }; - IteratingActionListener, XPackUsageFeatureAction> iteratingActionListener = - new IteratingActionListener<>(usageActionListener, consumer, usageActions, threadPool.getThreadContext(), (ignore) -> { - final List usageList = new ArrayList<>(featureSetUsages.length()); - for (int i = 0; i < featureSetUsages.length(); i++) { - usageList.add(featureSetUsages.get(i)); + new ActionRunnable<>(listener) { + final List responses = new ArrayList<>(usageActions.size()); + + @Override + protected void doRun() { + if (responses.size() < usageActions().size()) { + final var childRequest = new XPackUsageRequest(); + childRequest.setParentTask(request.getParentTask()); + client.executeLocally( + usageActions.get(responses.size()), + childRequest, + listener.delegateFailure((delegate, response) -> { + responses.add(response.getUsage()); + run(); // XPackUsageFeatureTransportAction always forks to MANAGEMENT so no risk of stack overflow here + }) + ); + } else { + assert responses.size() == usageActions.size() : responses.size() + " vs " + usageActions.size(); + listener.onResponse(new XPackUsageResponse(responses)); } - return usageList; - }, (ignore) -> true); - iteratingActionListener.run(); + } + }.run(); } @Override diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherUsageTransportAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherUsageTransportAction.java index cf4a178ba85fa..97f47e13abb7d 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherUsageTransportAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherUsageTransportAction.java @@ -8,6 +8,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -75,6 +76,10 @@ protected void masterOperation( ActionListener listener ) { if (enabled) { + ActionListener preservingListener = ContextPreservingActionListener.wrapPreservingContext( + listener, + client.threadPool().getThreadContext() + ); try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) { WatcherStatsRequest statsRequest = new WatcherStatsRequest(); statsRequest.includeStats(true); @@ -91,8 +96,8 @@ protected void masterOperation( true, mergedCounters.toNestedMap() ); - listener.onResponse(new XPackUsageFeatureResponse(usage)); - }, listener::onFailure)); + preservingListener.onResponse(new XPackUsageFeatureResponse(usage)); + }, preservingListener::onFailure)); } } else { WatcherFeatureSetUsage usage = new WatcherFeatureSetUsage(