Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/83290.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 83290
summary: Update YAML Rest tests to check for product header on all responses
area: Infra/REST API
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ protected void finishHim(Exception failure) {
*/
protected void finishHim(Exception failure, List<Failure> indexingFailures, List<SearchFailure> searchFailures, boolean timedOut) {
logger.debug("[{}]: finishing without any catastrophic failures", task.getId());
scrollSource.close(() -> {
scrollSource.close(threadPool.getThreadContext().preserveContext(() -> {
if (failure == null) {
BulkByScrollResponse response = buildResponse(
timeValueNanos(System.nanoTime() - startTime.get()),
Expand All @@ -605,7 +605,7 @@ protected void finishHim(Exception failure, List<Failure> indexingFailures, List
} else {
listener.onFailure(failure);
}
});
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
$/
---
"Test cat snapshots output":
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
repository: test_cat_snapshots_1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
Expand Down Expand Up @@ -61,6 +64,7 @@ setup:

---
"Get snapshot info when verbose is false":

- do:
indices.create:
index: test_index
Expand Down Expand Up @@ -198,7 +202,6 @@ setup:
- skip:
version: " - 7.12.99"
reason: "Introduced in 7.13.0"

- do:
indices.create:
index: test_index
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"
---
"Get repository returns UUID":
- skip:
version: " - 7.12.99"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
repository: test_repo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;

import java.util.function.Supplier;

import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;

public class RestoreClusterStateListener implements ClusterStateListener {
Expand All @@ -29,43 +32,48 @@ public class RestoreClusterStateListener implements ClusterStateListener {
private final ClusterService clusterService;
private final String uuid;
private final ActionListener<RestoreSnapshotResponse> listener;
private final Supplier<ThreadContext.StoredContext> contextSupplier;

private RestoreClusterStateListener(
ClusterService clusterService,
RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener
ActionListener<RestoreSnapshotResponse> listener,
Supplier<ThreadContext.StoredContext> contextSupplier
) {
this.clusterService = clusterService;
this.uuid = response.getUuid();
this.listener = listener;
this.contextSupplier = contextSupplier;
}

@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
// Clean up listener in that case and acknowledge completion of restore operation to client.
clusterService.removeListener(this);
listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null));
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
RestoreInfo ri = new RestoreInfo(
prevEntry.snapshot().getSnapshotId().getName(),
prevEntry.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards)
);
RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId());
listener.onResponse(response);
} else {
// restore not completed yet, wait for next cluster state update
try (ThreadContext.StoredContext stored = contextSupplier.get()) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
// Clean up listener in that case and acknowledge completion of restore operation to client.
clusterService.removeListener(this);
listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null));
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
RestoreInfo ri = new RestoreInfo(
prevEntry.snapshot().getSnapshotId().getName(),
prevEntry.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards)
);
RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId());
listener.onResponse(response);
} else {
// restore not completed yet, wait for next cluster state update
}
}
}

Expand All @@ -76,8 +84,11 @@ public void clusterChanged(ClusterChangedEvent changedEvent) {
public static void createAndRegisterListener(
ClusterService clusterService,
RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener
ActionListener<RestoreSnapshotResponse> listener,
ThreadContext threadContext
) {
clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener));
clusterService.addListener(
new RestoreClusterStateListener(clusterService, response, listener, threadContext.newRestorableContext(true))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ protected void masterOperation(
) {
restoreService.restoreSnapshot(request, listener.delegateFailure((delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener);
RestoreClusterStateListener.createAndRegisterListener(
clusterService,
restoreCompletionResponse,
delegatedListener,
threadPool.getThreadContext()
);
} else {
delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void removeApplier(ClusterStateApplier applier) {
}

/**
* Add a listener for updated cluster states
* Add a listener for updated cluster states. Listeners are executed in the system thread context.
*/
public void addListener(ClusterStateListener listener) {
clusterStateListeners.add(listener);
Expand All @@ -222,7 +222,7 @@ public void addListener(ClusterStateListener listener) {
/**
* Removes a listener for updated cluster states.
*/
public void removeListener(ClusterStateListener listener) {
public void removeListener(final ClusterStateListener listener) {
clusterStateListeners.remove(listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand Down Expand Up @@ -2957,7 +2958,8 @@ static Map<String, DataStreamAlias> filterDataStreamAliases(
* @param listener listener
*/
private void addListener(Snapshot snapshot, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>()).add(listener);
snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>())
.add(ContextPreservingActionListener.wrapPreservingContext(listener, threadPool.getThreadContext()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,20 @@ public String getReasonPhrase() {
* Get a list of all of the values of all warning headers returned in the response.
*/
public List<String> getWarningHeaders() {
List<String> warningHeaders = new ArrayList<>();
return getHeaders("Warning");
}

/**
* Get a list of all the values of a given header returned in the response.
*/
public List<String> getHeaders(String name) {
List<String> headers = new ArrayList<>();
for (Header header : response.getHeaders()) {
if (header.getName().equals("Warning")) {
warningHeaders.add(header.getValue());
if (header.getName().equalsIgnoreCase(name)) {
headers.add(header.getValue());
}
}
return warningHeaders;
return headers;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ public void execute(ClientYamlTestExecutionContext executionContext) throws IOEx
final String testPath = executionContext.getClientYamlTestCandidate() != null
? executionContext.getClientYamlTestCandidate().getTestPath()
: null;
checkElasticProductHeader(response.getHeaders("X-elastic-product"));
checkWarningHeaders(response.getWarningHeaders(), testPath);
} catch (ClientYamlTestResponseException e) {
ClientYamlTestResponse restTestResponse = e.getRestTestResponse();
Expand All @@ -392,6 +393,31 @@ public void execute(ClientYamlTestExecutionContext executionContext) throws IOEx
}
}

void checkElasticProductHeader(final List<String> productHeaders) {
if (productHeaders.isEmpty()) {
fail("Response is missing required X-Elastic-Product response header");
}
boolean headerPresent = false;
final List<String> unexpected = new ArrayList<>();
for (String header : productHeaders) {
if (header.equals("Elasticsearch")) {
headerPresent = true;
break;
} else {
unexpected.add(header);
}
}
if (headerPresent == false) {
StringBuilder failureMessage = new StringBuilder();
appendBadHeaders(
failureMessage,
unexpected,
"did not get expected product header [Elasticsearch], found header" + (unexpected.size() > 1 ? "s" : "")
);
fail(failureMessage.toString());
}
}

void checkWarningHeaders(final List<String> warningHeaders) {
checkWarningHeaders(warningHeaders, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ public void testNodeSelectorByVersion() throws IOException {
doSection.getApiCallSection().getNodeSelector()
)
).thenReturn(mockResponse);
when(mockResponse.getHeaders("X-elastic-product")).thenReturn(List.of("Elasticsearch"));
doSection.execute(context);
verify(context).callApi(
"indices.get_field_mapping",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ public void onFailure(Exception e) {
assert restoreInfo.failedShards() > 0 : "Should have failed shards";
delegatedListener.onResponse(new PutFollowAction.Response(true, false, false));
}
})
}),
threadPool.getThreadContext()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
Expand Down Expand Up @@ -178,10 +179,16 @@ private void removeRetentionLeaseForShard(
) {
logger.trace("{} removing retention lease [{}] while unfollowing leader index", followerShardId, retentionLeaseId);
final ThreadContext threadContext = threadPool.getThreadContext();
// We're about to stash the thread context for this retention lease removal. The listener will be completed while the
// context is stashed. The context needs to be restored in the listener when it is completing or else it is simply wiped.
final ActionListener<ActionResponse.Empty> preservedListener = new ContextPreservingActionListener<>(
threadContext.newRestorableContext(true),
listener
);
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
// we have to execute under the system context so that if security is enabled the removal is authorized
threadContext.markAsSystemContext();
CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, listener);
CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, preservedListener);
}
}

Expand Down
Loading