Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.indices.recovery.FileChunkWriter;
import org.opensearch.indices.recovery.MultiChunkTransfer;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transports;

Expand Down Expand Up @@ -104,16 +105,24 @@ class SegmentReplicationSourceHandler {
* @param listener {@link ActionListener} that completes with the list of files sent.
*/
public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListener<GetSegmentFilesResponse> listener) {
final ReplicationTimer timer = new ReplicationTimer();
if (isReplicating.compareAndSet(false, true) == false) {
throw new OpenSearchException("Replication to {} is already running.", shard.shardId());
}
future.addListener(listener, OpenSearchExecutors.newDirectExecutorService());
final Closeable releaseResources = () -> IOUtils.close(resources);
try {

timer.start();
final Consumer<Exception> onFailure = e -> {
assert Transports.assertNotTransportThread(SegmentReplicationSourceHandler.this + "[onFailure]");
IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));
timer.stop();
logger.trace(
"[replication id {}] Source node failed to send files to target node [{}], timing: {}",
request.getReplicationId(),
request.getTargetNode().getId(),
timer.time()
);
};

RunUnderPrimaryPermit.run(() -> {
Expand Down Expand Up @@ -151,6 +160,13 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata)));
} finally {
IOUtils.close(resources);
timer.stop();
logger.trace(
"[replication id {}] Source node completed sending files to target node [{}], timing: {}",
request.getReplicationId(),
request.getTargetNode().getId(),
timer.time()
);
}
}, onFailure);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
Expand All @@ -25,6 +26,7 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RetryableTransportClient;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportChannel;
Expand Down Expand Up @@ -86,6 +88,8 @@ public SegmentReplicationSourceService(
private class CheckpointInfoRequestHandler implements TransportRequestHandler<CheckpointInfoRequest> {
@Override
public void messageReceived(CheckpointInfoRequest request, TransportChannel channel, Task task) throws Exception {
final ReplicationTimer timer = new ReplicationTimer();
timer.start();
final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = new RemoteSegmentFileChunkWriter(
request.getReplicationId(),
recoverySettings,
Expand All @@ -109,6 +113,16 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan
copyState.getPendingDeleteFiles()
)
);
timer.stop();
logger.trace(
new ParameterizedMessage(
"[replication id {}] Source node sent checkpoint info [{}] to target node [{}], timing: {}",
request.getReplicationId(),
copyState.getCheckpoint(),
request.getTargetNode().getId(),
timer.time()
)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@

package org.opensearch.indices.replication;

import org.opensearch.common.collect.Tuple;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.indices.replication.common.ReplicationTimer;

import java.util.ArrayList;
import java.util.List;

/**
* ReplicationState implementation to track Segment Replication events.
*
Expand All @@ -26,10 +30,12 @@ public class SegmentReplicationState implements ReplicationState {
*/
public enum Stage {
DONE((byte) 0),

INIT((byte) 1),

REPLICATING((byte) 2);
REPLICATING((byte) 2),
GET_CHECKPOINT_INFO((byte) 3),
FILE_DIFF((byte) 4),
GET_FILES((byte) 5),
FINALIZE_REPLICATION((byte) 6);

private static final Stage[] STAGES = new Stage[Stage.values().length];

Expand Down Expand Up @@ -60,23 +66,45 @@ public static Stage fromId(byte id) {

private Stage stage;
private final ReplicationLuceneIndex index;
private final ReplicationTimer timer;
private final ReplicationTimer overallTimer;
private final ReplicationTimer stageTimer;
private final List<Tuple<String, Long>> timingData;
private long replicationId;

public SegmentReplicationState(ReplicationLuceneIndex index) {
stage = Stage.INIT;
this.index = index;
timer = new ReplicationTimer();
timer.start();
// Timing data will have as many entries as stages, plus one
// additional entry for the overall timer
timingData = new ArrayList<>(Stage.values().length + 1);
overallTimer = new ReplicationTimer();
stageTimer = new ReplicationTimer();
stageTimer.start();
// set an invalid value by default
this.replicationId = -1L;
}

public SegmentReplicationState(ReplicationLuceneIndex index, long replicationId) {
this(index);
this.replicationId = replicationId;
}

@Override
public ReplicationLuceneIndex getIndex() {
return index;
}

public long getReplicationId() {
return replicationId;
}

@Override
public ReplicationTimer getTimer() {
return timer;
return overallTimer;
}

public List<Tuple<String, Long>> getTimingData() {
return timingData;
}

public Stage getStage() {
Expand All @@ -90,23 +118,42 @@ protected void validateAndSetStage(Stage expected, Stage next) {
"can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])"
);
}
// save the timing data for the current step
stageTimer.stop();
timingData.add(new Tuple<>(stage.name(), stageTimer.time()));
// restart the step timer
stageTimer.reset();
stageTimer.start();
stage = next;
}

public void setStage(Stage stage) {
switch (stage) {
case INIT:
this.stage = Stage.INIT;
getIndex().reset();
break;
case REPLICATING:
validateAndSetStage(Stage.INIT, stage);
getIndex().start();
// only start the overall timer once we've started replication
overallTimer.start();
break;
case DONE:
case GET_CHECKPOINT_INFO:
validateAndSetStage(Stage.REPLICATING, stage);
getIndex().stop();
getTimer().stop();
break;
case FILE_DIFF:
validateAndSetStage(Stage.GET_CHECKPOINT_INFO, stage);
break;
case GET_FILES:
validateAndSetStage(Stage.FILE_DIFF, stage);
break;
case FINALIZE_REPLICATION:
validateAndSetStage(Stage.GET_FILES, stage);
break;
case DONE:
validateAndSetStage(Stage.FINALIZE_REPLICATION, stage);
// add the overall timing data
overallTimer.stop();
timingData.add(new Tuple<>("OVERALL", overallTimer.time()));
break;
default:
throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public SegmentReplicationTarget(
super("replication_target", indexShard, new ReplicationLuceneIndex(), listener);
this.checkpoint = checkpoint;
this.source = source;
this.state = new SegmentReplicationState(stateIndex);
this.state = new SegmentReplicationState(stateIndex, getId());
this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, getPrefix(), logger, this::ensureRefCount);
}

Expand Down Expand Up @@ -139,7 +139,9 @@ public void startReplication(ActionListener<Void> listener) {
final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>();
final StepListener<Void> finalizeListener = new StepListener<>();

logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId());
// Get list of files to copy from this checkpoint.
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);

checkpointInfoListener.whenComplete(checkpointInfo -> getFiles(checkpointInfo, getFilesListener), listener::onFailure);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is an issue between stage demarcation and handlng the unhappy path. @kartg please correct me if I am wrong but in case of failure checkpointInfoListener (and others) would delegate to listener::onFailure without marking the current stage as "finished", correct?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a conscious choice. We're treating the DONE stage as a successful replication run. I opted to avoid an explicit FAILED stage since the target and state objects are local to each replication, and the replication stages are currently only relevant when tracking timing data

Copy link
Copy Markdown
Contributor

@reta reta Aug 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine (I think), the dangling stage is what concerns me: since we publish timing on success only, we do nothing on failure (we don't even know if stage was run, which could be even more important than success for troubleshooting). In any case, leaving the decision up to you.

Expand All @@ -152,14 +154,16 @@ public void startReplication(ActionListener<Void> listener) {

private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSegmentFilesResponse> getFilesListener)
throws IOException {
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot();
Store.MetadataSnapshot localMetadata = getMetadataSnapshot();
final Store.RecoveryDiff diff = snapshot.segmentReplicationDiff(localMetadata);
logger.debug("Replication diff {}", diff);
// Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming snapshot
// from
// source that means the local copy of the segment has been corrupted/changed in some way and we throw an IllegalStateException to
// fail the shard
logger.trace("Replication diff {}", diff);
Comment thread
kartg marked this conversation as resolved.
Outdated
/*
* Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming
* snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an
* IllegalStateException to fail the shard
*/
if (diff.different.isEmpty() == false) {
getFilesListener.onFailure(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly an existing code, but it seems like there is a bug here: we don't thrown an exception but call the listener direclty and move on ... @kartg wdyt?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta Good catch! We shouldn't be proceeding if the failure listener is invoked. Would you mind if i incorporated the fix for this as a follow-up PR? I'd like to keep the scope of this PR focused on the seg-rep stages and timing data.

new IllegalStateException(
Expand All @@ -177,15 +181,18 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
.collect(Collectors.toSet());

filesToFetch.addAll(pendingDeleteFiles);
logger.trace("Files to fetch {}", filesToFetch);
Comment thread
kartg marked this conversation as resolved.
Outdated

for (StoreFileMetadata file : filesToFetch) {
state.getIndex().addFileDetail(file.name(), file.length(), false);
}
// always send a req even if not fetching files so the primary can clear the copyState for this shard.
state.setStage(SegmentReplicationState.Stage.GET_FILES);
source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, store, getFilesListener);
}

private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener<Void> listener) {
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
ActionListener.completeWith(listener, () -> {
multiFileWriter.renameAllTempFiles();
final Store store = store();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
* @param replicaShard replica shard on which checkpoint is received
*/
public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) {

logger.trace(() -> new ParameterizedMessage("Replica received new replication checkpoint from primary [{}]", receivedCheckpoint));
// Checks if received checkpoint is already present and ahead then it replaces old received checkpoint
if (latestReceivedCheckpoint.get(replicaShard.shardId()) != null) {
if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) {
Expand All @@ -139,6 +139,14 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
replicaShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
// if we received a checkpoint during the copy event that is ahead of this
// try and process it.
if (latestReceivedCheckpoint.get(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) {
Expand All @@ -154,6 +162,14 @@ public void onReplicationDone(SegmentReplicationState state) {

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
replicaShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
logger.error("replication failure", e);
replicaShard.failShard("replication failure", e);
Expand All @@ -172,9 +188,9 @@ public void startReplication(
startReplication(new SegmentReplicationTarget(checkpoint, indexShard, sourceFactory.get(indexShard), listener));
}

public void startReplication(final SegmentReplicationTarget target) {
// pkg-private for integration tests
void startReplication(final SegmentReplicationTarget target) {
final long replicationId = onGoingReplications.start(target, recoverySettings.activityTimeout());
logger.trace(() -> new ParameterizedMessage("Starting replication {}", replicationId));
threadPool.generic().execute(new ReplicationRunner(replicationId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -103,7 +104,10 @@ final void publish(IndexShard indexShard) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
PublishCheckpointRequest request = new PublishCheckpointRequest(indexShard.getLatestReplicationCheckpoint());
final ReplicationCheckpoint checkpoint = request.getCheckpoint();
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request);
final ReplicationTimer timer = new ReplicationTimer();
timer.start();
transportService.sendChildRequest(
clusterService.localNode(),
transportPrimaryAction,
Expand All @@ -123,12 +127,23 @@ public String executor() {

@Override
public void handleResponse(ReplicationResponse response) {
timer.stop();
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] Completed publishing checkpoint [{}], timing: {}",
indexShard.shardId().getId(),
checkpoint,
timer.time()
)
);
task.setPhase("finished");
taskManager.unregister(task);
}

@Override
public void handleException(TransportException e) {
timer.stop();
logger.trace("[shardId {}] Failed to publish checkpoint, timing: {}", indexShard.shardId().getId(), timer.time());
task.setPhase("finished");
taskManager.unregister(task);
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
Expand All @@ -151,6 +166,13 @@ public void handleException(TransportException e) {
}
}
);
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] Publishing replication checkpoint [{}]",
checkpoint.getShardId().getId(),
checkpoint
)
);
}
}

Expand All @@ -168,7 +190,7 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
ActionListener.completeWith(listener, () -> {
logger.trace("Checkpoint received on replica {}", request);
logger.trace(() -> new ParameterizedMessage("Checkpoint {} received on replica {}", request, replica.shardId()));
if (request.getCheckpoint().getShardId().equals(replica.shardId())) {
replicationService.onNewCheckpoint(request.getCheckpoint(), replica);
}
Expand Down
Loading