Skip to content

Commit 99dc1d3

Browse files
committed
Make finalize step of recovery source non-blocking.
Backport of elastic/elasticsearch#37388
1 parent 23a2b07 commit 99dc1d3

10 files changed

Lines changed: 309 additions & 43 deletions

File tree

es/es-server/src/main/java/org/elasticsearch/action/ActionListener.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.ExceptionsHelper;
2323
import org.elasticsearch.common.CheckedConsumer;
2424
import org.elasticsearch.common.CheckedFunction;
25+
import org.elasticsearch.common.CheckedSupplier;
2526

2627
import java.util.ArrayList;
2728
import java.util.List;
@@ -201,4 +202,17 @@ protected void innerOnFailure(Exception e) {
201202
}
202203
};
203204
}
205+
206+
/**
207+
* Completes the given listener with the result from the provided supplier accordingly.
208+
* This method is mainly used to complete a listener with a block of synchronous code.
209+
*/
210+
static <Response> void completeWith(ActionListener<Response> listener,
211+
CheckedSupplier<Response, ? extends Exception> supplier) {
212+
try {
213+
listener.onResponse(supplier.get());
214+
} catch (Exception e) {
215+
listener.onFailure(e);
216+
}
217+
}
204218
}

es/es-server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,19 @@ public class ActionListenerResponseHandler<Response extends TransportResponse> i
3737

3838
private final ActionListener<? super Response> listener;
3939
private final Writeable.Reader<Response> reader;
40+
private final String executor;
4041

41-
public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader) {
42+
public ActionListenerResponseHandler(ActionListener<? super Response> listener,
43+
Writeable.Reader<Response> reader,
44+
String executor) {
4245
this.listener = Objects.requireNonNull(listener);
4346
this.reader = Objects.requireNonNull(reader);
47+
this.executor = Objects.requireNonNull(executor);
48+
}
49+
50+
public ActionListenerResponseHandler(ActionListener<? super Response> listener,
51+
Writeable.Reader<Response> reader) {
52+
this(listener, reader, ThreadPool.Names.SAME);
4453
}
4554

4655
@Override
@@ -60,6 +69,6 @@ public Response read(StreamInput in) throws IOException {
6069

6170
@Override
6271
public String executor() {
63-
return ThreadPool.Names.SAME;
72+
return executor;
6473
}
6574
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action;
21+
22+
import org.elasticsearch.common.CheckedConsumer;
23+
import org.elasticsearch.common.util.concurrent.EsExecutors;
24+
import org.elasticsearch.common.util.concurrent.FutureUtils;
25+
import org.elasticsearch.common.util.concurrent.ListenableFuture;
26+
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.function.Consumer;
29+
30+
/**
31+
* A {@link StepListener} provides a simple way to write a flow consisting of
32+
* multiple asynchronous steps without having nested callbacks. For example:
33+
*
34+
* <pre>{@code
35+
* void asyncFlowMethod(... ActionListener<R> flowListener) {
36+
* StepListener<R1> step1 = new StepListener<>();
37+
* asyncStep1(..., step1);
38+
39+
* StepListener<R2> step2 = new StepListener<>();
40+
* step1.whenComplete(r1 -> {
41+
* asyncStep2(r1, ..., step2);
42+
* }, flowListener::onFailure);
43+
*
44+
* step2.whenComplete(r2 -> {
45+
* R1 r1 = step1.result();
46+
* R r = combine(r1, r2);
47+
* flowListener.onResponse(r);
48+
* }, flowListener::onFailure);
49+
* }
50+
* }</pre>
51+
*/
52+
53+
public final class StepListener<Response> implements ActionListener<Response> {
54+
private final ListenableFuture<Response> delegate;
55+
56+
public StepListener() {
57+
this.delegate = new ListenableFuture<>();
58+
}
59+
60+
@Override
61+
public void onResponse(Response response) {
62+
delegate.onResponse(response);
63+
}
64+
65+
@Override
66+
public void onFailure(Exception e) {
67+
delegate.onFailure(e);
68+
}
69+
70+
/**
71+
* Registers the given actions which are called when this step is completed. If this step is completed successfully,
72+
* the {@code onResponse} is called with the result; otherwise the {@code onFailure} is called with the failure.
73+
*
74+
* @param onResponse is called when this step is completed successfully
75+
* @param onFailure is called when this step is completed with a failure
76+
*/
77+
public void whenComplete(CheckedConsumer<Response, Exception> onResponse, Consumer<Exception> onFailure) {
78+
delegate.addListener(ActionListener.wrap(onResponse, onFailure), EsExecutors.newDirectExecutorService(), null);
79+
}
80+
81+
/**
82+
* Gets the result of this step. This method will throw {@link IllegalStateException} if this step is not completed yet.
83+
*/
84+
public Response result() {
85+
if (delegate.isDone() == false) {
86+
throw new IllegalStateException("step is not completed yet");
87+
}
88+
return FutureUtils.get(delegate, 0L, TimeUnit.NANOSECONDS); // this future is done already - use a non-blocking method.
89+
}
90+
}

es/es-server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -501,11 +501,17 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler<Recovery
501501

502502
@Override
503503
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
504-
try (RecoveryRef recoveryRef =
505-
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
506-
recoveryRef.target().finalizeRecovery(request.globalCheckpoint());
504+
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
505+
final ActionListener<TransportResponse> listener =
506+
new HandledTransportAction.ChannelActionListener<>(channel, Actions.FINALIZE, request);
507+
recoveryRef.target().finalizeRecovery(
508+
request.globalCheckpoint(),
509+
ActionListener.wrap(
510+
nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE),
511+
listener::onFailure
512+
)
513+
);
507514
}
508-
channel.sendResponse(TransportResponse.Empty.INSTANCE);
509515
}
510516
}
511517

es/es-server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.ExceptionsHelper;
3333
import org.elasticsearch.Version;
3434
import org.elasticsearch.action.ActionListener;
35+
import org.elasticsearch.action.StepListener;
3536
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
3637
import org.elasticsearch.cluster.routing.ShardRouting;
3738
import org.elasticsearch.common.StopWatch;
@@ -71,6 +72,7 @@
7172
import java.util.concurrent.CopyOnWriteArrayList;
7273
import java.util.concurrent.atomic.AtomicLong;
7374
import java.util.concurrent.atomic.AtomicReference;
75+
import java.util.function.Consumer;
7476
import java.util.function.Supplier;
7577
import java.util.stream.StreamSupport;
7678

@@ -143,6 +145,9 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
143145
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
144146
throw e;
145147
});
148+
final Consumer<Exception> onFailure = e ->
149+
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
150+
146151
runUnderPrimaryPermit(
147152
() -> {
148153
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
@@ -255,16 +260,29 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
255260
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
256261
}
257262

258-
finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint);
259-
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
260-
assert resources.isEmpty() : "not every resource is released [" + resources + "]";
261-
IOUtils.close(resources);
262-
wrappedListener.onResponse(
263-
new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
264-
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
265-
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, prepareEngineTime.millis(),
266-
sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis())
267-
);
263+
final StepListener<Void> finalizeStep = new StepListener<>();
264+
finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint, finalizeStep);
265+
finalizeStep.whenComplete(r -> {
266+
assert resources.isEmpty() : "not every resource is released [" + resources + "]";
267+
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
268+
final RecoveryResponse response = new RecoveryResponse(
269+
sendFileResult.phase1FileNames,
270+
sendFileResult.phase1FileSizes,
271+
sendFileResult.phase1ExistingFileNames,
272+
sendFileResult.phase1ExistingFileSizes,
273+
sendFileResult.totalSize,
274+
sendFileResult.existingTotalSize,
275+
sendFileResult.took.millis(),
276+
phase1ThrottlingWaitTime,
277+
prepareEngineTime.millis(),
278+
sendSnapshotResult.totalOperations,
279+
sendSnapshotResult.tookTime.millis());
280+
try {
281+
wrappedListener.onResponse(response);
282+
} finally {
283+
IOUtils.close(resources);
284+
}
285+
}, onFailure);
268286
} catch (Exception e) {
269287
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
270288
}
@@ -620,7 +638,7 @@ SendSnapshotResult phase2(long startingSeqNo,
620638
/*
621639
* finalizes the recovery process
622640
*/
623-
private void finalizeRecovery(final long targetLocalCheckpoint) throws IOException {
641+
void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener<Void> listener) throws IOException {
624642
if (shard.state() == IndexShardState.CLOSED) {
625643
throw new IndexShardClosedException(request.shardId());
626644
}
@@ -636,21 +654,26 @@ private void finalizeRecovery(final long targetLocalCheckpoint) throws IOExcepti
636654
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
637655
shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger);
638656
final long globalCheckpoint = shard.getGlobalCheckpoint();
639-
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint));
640-
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
641-
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);
642-
643-
if (request.isPrimaryRelocation()) {
644-
logger.trace("performing relocation hand-off");
645-
// this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
646-
cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext));
647-
/*
648-
* if the recovery process fails after disabling primary mode on the source shard, both relocation source and
649-
* target are failed (see {@link IndexShard#updateRoutingEntry}).
650-
*/
651-
}
652-
stopWatch.stop();
653-
logger.trace("finalizing recovery took [{}]", stopWatch.totalTime());
657+
final StepListener<Void> finalizeListener = new StepListener<>();
658+
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, finalizeListener));
659+
finalizeListener.whenComplete(r -> {
660+
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
661+
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);
662+
663+
if (request.isPrimaryRelocation()) {
664+
logger.trace("performing relocation hand-off");
665+
// TODO: make relocated async
666+
// this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
667+
cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext));
668+
/*
669+
* if the recovery process fails after disabling primary mode on the source shard, both relocation source and
670+
* target are failed (see {@link IndexShard#updateRoutingEntry}).
671+
*/
672+
}
673+
stopWatch.stop();
674+
logger.trace("finalizing recovery took [{}]", stopWatch.totalTime());
675+
listener.onResponse(null);
676+
}, listener::onFailure);
654677
}
655678

656679
static final class SendSnapshotResult {

es/es-server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -371,12 +371,15 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra
371371
}
372372

373373
@Override
374-
public void finalizeRecovery(final long globalCheckpoint) throws IOException {
375-
final IndexShard indexShard = indexShard();
376-
indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
377-
// Persist the global checkpoint.
378-
indexShard.sync();
379-
indexShard.finalizeRecovery();
374+
public void finalizeRecovery(final long globalCheckpoint, ActionListener<Void> listener) {
375+
ActionListener.completeWith(listener, () -> {
376+
final IndexShard indexShard = indexShard();
377+
indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
378+
// Persist the global checkpoint.
379+
indexShard.sync();
380+
indexShard.finalizeRecovery();
381+
return null;
382+
});
380383
}
381384

382385
@Override

es/es-server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ public interface RecoveryTargetHandler {
4444
* updates the global checkpoint.
4545
*
4646
* @param globalCheckpoint the global checkpoint on the recovery source
47+
* @param listener the listener which will be notified when this method is completed
4748
*/
48-
void finalizeRecovery(long globalCheckpoint) throws IOException;
49+
void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener);
4950

5051
/**
5152
* Blockingly waits for cluster state with at least clusterStateVersion to be available

es/es-server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.index.store.Store;
3131
import org.elasticsearch.index.store.StoreFileMetaData;
3232
import org.elasticsearch.index.translog.Translog;
33+
import org.elasticsearch.threadpool.ThreadPool;
3334
import org.elasticsearch.transport.EmptyTransportResponseHandler;
3435
import org.elasticsearch.transport.TransportFuture;
3536
import org.elasticsearch.transport.TransportRequestOptions;
@@ -87,11 +88,17 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra
8788
}
8889

8990
@Override
90-
public void finalizeRecovery(final long globalCheckpoint) {
91-
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE,
91+
public void finalizeRecovery(final long globalCheckpoint, final ActionListener<Void> listener) {
92+
transportService.submitRequest(
93+
targetNode,
94+
PeerRecoveryTargetService.Actions.FINALIZE,
9295
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint),
9396
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
94-
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
97+
new ActionListenerResponseHandler<>(
98+
ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure),
99+
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC
100+
)
101+
);
95102
}
96103

97104
@Override

0 commit comments

Comments
 (0)