Skip to content

Commit b202fd1

Browse files
opensearch-trigger-bot[bot]kartgmch2
authored
[Backport 2.x] Added timing data and more granular stages to SegmentReplicationState (#4367)
* Added timing data and more granular stages to SegmentReplicationState (#4222) * Added timing data and more granular stages to SegmentReplicationState This change introduces instrumentation logging that measures the latency of the various stages of segment replication as seen by each replica. Logs have also been added to the source node for checkpoint publishing and checkpoint metadata responses. All logging is currently at the TRACE level. Signed-off-by: Kartik Ganesh <gkart@amazon.com> * Fixing SegmentReplicationTarget tests Signed-off-by: Kartik Ganesh <gkart@amazon.com> * Incorporated PR feedback Signed-off-by: Kartik Ganesh <gkart@amazon.com> * Fixing SegmentReplicationTargetService tests Signed-off-by: Kartik Ganesh <gkart@amazon.com> Signed-off-by: Kartik Ganesh <gkart@amazon.com> (cherry picked from commit a2ba3a8) * Update changelog for backport pr. Signed-off-by: Marc Handalian <handalm@amazon.com> Signed-off-by: Marc Handalian <handalm@amazon.com> Co-authored-by: Kartik Ganesh <gkart@amazon.com> Co-authored-by: Marc Handalian <handalm@amazon.com>
1 parent f98340e commit b202fd1

9 files changed

Lines changed: 176 additions & 34 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
2727
## [2.x]
2828
### Added
2929
- Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085))
30+
- Add timing data and more granular stages to SegmentReplicationState ([#4367](https://github.com/opensearch-project/OpenSearch/pull/4367))
3031

3132
### Changed
3233

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.opensearch.indices.recovery.FileChunkWriter;
2828
import org.opensearch.indices.recovery.MultiChunkTransfer;
2929
import org.opensearch.indices.replication.common.CopyState;
30+
import org.opensearch.indices.replication.common.ReplicationTimer;
3031
import org.opensearch.threadpool.ThreadPool;
3132
import org.opensearch.transport.Transports;
3233

@@ -104,16 +105,24 @@ class SegmentReplicationSourceHandler {
104105
* @param listener {@link ActionListener} that completes with the list of files sent.
105106
*/
106107
public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListener<GetSegmentFilesResponse> listener) {
108+
final ReplicationTimer timer = new ReplicationTimer();
107109
if (isReplicating.compareAndSet(false, true) == false) {
108110
throw new OpenSearchException("Replication to {} is already running.", shard.shardId());
109111
}
110112
future.addListener(listener, OpenSearchExecutors.newDirectExecutorService());
111113
final Closeable releaseResources = () -> IOUtils.close(resources);
112114
try {
113-
115+
timer.start();
114116
final Consumer<Exception> onFailure = e -> {
115117
assert Transports.assertNotTransportThread(SegmentReplicationSourceHandler.this + "[onFailure]");
116118
IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));
119+
timer.stop();
120+
logger.trace(
121+
"[replication id {}] Source node failed to send files to target node [{}], timing: {}",
122+
request.getReplicationId(),
123+
request.getTargetNode().getId(),
124+
timer.time()
125+
);
117126
};
118127

119128
RunUnderPrimaryPermit.run(() -> {
@@ -151,6 +160,13 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
151160
future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata)));
152161
} finally {
153162
IOUtils.close(resources);
163+
timer.stop();
164+
logger.trace(
165+
"[replication id {}] Source node completed sending files to target node [{}], timing: {}",
166+
request.getReplicationId(),
167+
request.getTargetNode().getId(),
168+
timer.time()
169+
);
154170
}
155171
}, onFailure);
156172
} catch (Exception e) {

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13+
import org.apache.logging.log4j.message.ParameterizedMessage;
1314
import org.opensearch.action.support.ChannelActionListener;
1415
import org.opensearch.cluster.ClusterChangedEvent;
1516
import org.opensearch.cluster.ClusterStateListener;
@@ -25,6 +26,7 @@
2526
import org.opensearch.indices.recovery.RecoverySettings;
2627
import org.opensearch.indices.recovery.RetryableTransportClient;
2728
import org.opensearch.indices.replication.common.CopyState;
29+
import org.opensearch.indices.replication.common.ReplicationTimer;
2830
import org.opensearch.tasks.Task;
2931
import org.opensearch.threadpool.ThreadPool;
3032
import org.opensearch.transport.TransportChannel;
@@ -86,6 +88,8 @@ public SegmentReplicationSourceService(
8688
private class CheckpointInfoRequestHandler implements TransportRequestHandler<CheckpointInfoRequest> {
8789
@Override
8890
public void messageReceived(CheckpointInfoRequest request, TransportChannel channel, Task task) throws Exception {
91+
final ReplicationTimer timer = new ReplicationTimer();
92+
timer.start();
8993
final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = new RemoteSegmentFileChunkWriter(
9094
request.getReplicationId(),
9195
recoverySettings,
@@ -109,6 +113,16 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan
109113
copyState.getPendingDeleteFiles()
110114
)
111115
);
116+
timer.stop();
117+
logger.trace(
118+
new ParameterizedMessage(
119+
"[replication id {}] Source node sent checkpoint info [{}] to target node [{}], timing: {}",
120+
request.getReplicationId(),
121+
copyState.getCheckpoint(),
122+
request.getTargetNode().getId(),
123+
timer.time()
124+
)
125+
);
112126
}
113127
}
114128

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@
88

99
package org.opensearch.indices.replication;
1010

11+
import org.opensearch.common.collect.Tuple;
1112
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
1213
import org.opensearch.indices.replication.common.ReplicationState;
1314
import org.opensearch.indices.replication.common.ReplicationTimer;
1415

16+
import java.util.ArrayList;
17+
import java.util.List;
18+
1519
/**
1620
* ReplicationState implementation to track Segment Replication events.
1721
*
@@ -26,10 +30,12 @@ public class SegmentReplicationState implements ReplicationState {
2630
*/
2731
public enum Stage {
2832
DONE((byte) 0),
29-
3033
INIT((byte) 1),
31-
32-
REPLICATING((byte) 2);
34+
REPLICATING((byte) 2),
35+
GET_CHECKPOINT_INFO((byte) 3),
36+
FILE_DIFF((byte) 4),
37+
GET_FILES((byte) 5),
38+
FINALIZE_REPLICATION((byte) 6);
3339

3440
private static final Stage[] STAGES = new Stage[Stage.values().length];
3541

@@ -60,23 +66,45 @@ public static Stage fromId(byte id) {
6066

6167
private Stage stage;
6268
private final ReplicationLuceneIndex index;
63-
private final ReplicationTimer timer;
69+
private final ReplicationTimer overallTimer;
70+
private final ReplicationTimer stageTimer;
71+
private final List<Tuple<String, Long>> timingData;
72+
private long replicationId;
6473

6574
public SegmentReplicationState(ReplicationLuceneIndex index) {
6675
stage = Stage.INIT;
6776
this.index = index;
68-
timer = new ReplicationTimer();
69-
timer.start();
77+
// Timing data will have as many entries as stages, plus one
78+
// additional entry for the overall timer
79+
timingData = new ArrayList<>(Stage.values().length + 1);
80+
overallTimer = new ReplicationTimer();
81+
stageTimer = new ReplicationTimer();
82+
stageTimer.start();
83+
// set an invalid value by default
84+
this.replicationId = -1L;
85+
}
86+
87+
public SegmentReplicationState(ReplicationLuceneIndex index, long replicationId) {
88+
this(index);
89+
this.replicationId = replicationId;
7090
}
7191

7292
@Override
7393
public ReplicationLuceneIndex getIndex() {
7494
return index;
7595
}
7696

97+
public long getReplicationId() {
98+
return replicationId;
99+
}
100+
77101
@Override
78102
public ReplicationTimer getTimer() {
79-
return timer;
103+
return overallTimer;
104+
}
105+
106+
public List<Tuple<String, Long>> getTimingData() {
107+
return timingData;
80108
}
81109

82110
public Stage getStage() {
@@ -90,23 +118,42 @@ protected void validateAndSetStage(Stage expected, Stage next) {
90118
"can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])"
91119
);
92120
}
121+
// save the timing data for the current step
122+
stageTimer.stop();
123+
timingData.add(new Tuple<>(stage.name(), stageTimer.time()));
124+
// restart the step timer
125+
stageTimer.reset();
126+
stageTimer.start();
93127
stage = next;
94128
}
95129

96130
public void setStage(Stage stage) {
97131
switch (stage) {
98132
case INIT:
99133
this.stage = Stage.INIT;
100-
getIndex().reset();
101134
break;
102135
case REPLICATING:
103136
validateAndSetStage(Stage.INIT, stage);
104-
getIndex().start();
137+
// only start the overall timer once we've started replication
138+
overallTimer.start();
105139
break;
106-
case DONE:
140+
case GET_CHECKPOINT_INFO:
107141
validateAndSetStage(Stage.REPLICATING, stage);
108-
getIndex().stop();
109-
getTimer().stop();
142+
break;
143+
case FILE_DIFF:
144+
validateAndSetStage(Stage.GET_CHECKPOINT_INFO, stage);
145+
break;
146+
case GET_FILES:
147+
validateAndSetStage(Stage.FILE_DIFF, stage);
148+
break;
149+
case FINALIZE_REPLICATION:
150+
validateAndSetStage(Stage.GET_FILES, stage);
151+
break;
152+
case DONE:
153+
validateAndSetStage(Stage.FINALIZE_REPLICATION, stage);
154+
// add the overall timing data
155+
overallTimer.stop();
156+
timingData.add(new Tuple<>("OVERALL", overallTimer.time()));
110157
break;
111158
default:
112159
throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]");

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public SegmentReplicationTarget(
6464
super("replication_target", indexShard, new ReplicationLuceneIndex(), listener);
6565
this.checkpoint = checkpoint;
6666
this.source = source;
67-
this.state = new SegmentReplicationState(stateIndex);
67+
this.state = new SegmentReplicationState(stateIndex, getId());
6868
this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, getPrefix(), logger, this::ensureRefCount);
6969
}
7070

@@ -139,7 +139,9 @@ public void startReplication(ActionListener<Void> listener) {
139139
final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>();
140140
final StepListener<Void> finalizeListener = new StepListener<>();
141141

142+
logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId());
142143
// Get list of files to copy from this checkpoint.
144+
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
143145
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);
144146

145147
checkpointInfoListener.whenComplete(checkpointInfo -> getFiles(checkpointInfo, getFilesListener), listener::onFailure);
@@ -152,14 +154,16 @@ public void startReplication(ActionListener<Void> listener) {
152154

153155
private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSegmentFilesResponse> getFilesListener)
154156
throws IOException {
157+
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
155158
final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot();
156159
Store.MetadataSnapshot localMetadata = getMetadataSnapshot();
157160
final Store.RecoveryDiff diff = snapshot.segmentReplicationDiff(localMetadata);
158-
logger.debug("Replication diff {}", diff);
159-
// Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming snapshot
160-
// from
161-
// source that means the local copy of the segment has been corrupted/changed in some way and we throw an IllegalStateException to
162-
// fail the shard
161+
logger.trace("Replication diff {}", diff);
162+
/*
163+
* Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming
164+
* snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an
165+
* IllegalStateException to fail the shard
166+
*/
163167
if (diff.different.isEmpty() == false) {
164168
getFilesListener.onFailure(
165169
new IllegalStateException(
@@ -177,15 +181,18 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
177181
.collect(Collectors.toSet());
178182

179183
filesToFetch.addAll(pendingDeleteFiles);
184+
logger.trace("Files to fetch {}", filesToFetch);
180185

181186
for (StoreFileMetadata file : filesToFetch) {
182187
state.getIndex().addFileDetail(file.name(), file.length(), false);
183188
}
184189
// always send a req even if not fetching files so the primary can clear the copyState for this shard.
190+
state.setStage(SegmentReplicationState.Stage.GET_FILES);
185191
source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, store, getFilesListener);
186192
}
187193

188194
private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener<Void> listener) {
195+
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
189196
ActionListener.completeWith(listener, () -> {
190197
multiFileWriter.renameAllTempFiles();
191198
final Store store = store();

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
116116
* @param replicaShard replica shard on which checkpoint is received
117117
*/
118118
public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) {
119-
119+
logger.trace(() -> new ParameterizedMessage("Replica received new replication checkpoint from primary [{}]", receivedCheckpoint));
120120
// Checks if received checkpoint is already present and ahead then it replaces old received checkpoint
121121
if (latestReceivedCheckpoint.get(replicaShard.shardId()) != null) {
122122
if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) {
@@ -139,6 +139,14 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
139139
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
140140
@Override
141141
public void onReplicationDone(SegmentReplicationState state) {
142+
logger.trace(
143+
() -> new ParameterizedMessage(
144+
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
145+
replicaShard.shardId().getId(),
146+
state.getReplicationId(),
147+
state.getTimingData()
148+
)
149+
);
142150
// if we received a checkpoint during the copy event that is ahead of this
143151
// try and process it.
144152
if (latestReceivedCheckpoint.get(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) {
@@ -154,6 +162,14 @@ public void onReplicationDone(SegmentReplicationState state) {
154162

155163
@Override
156164
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
165+
logger.trace(
166+
() -> new ParameterizedMessage(
167+
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
168+
replicaShard.shardId().getId(),
169+
state.getReplicationId(),
170+
state.getTimingData()
171+
)
172+
);
157173
if (sendShardFailure == true) {
158174
logger.error("replication failure", e);
159175
replicaShard.failShard("replication failure", e);
@@ -172,9 +188,9 @@ public void startReplication(
172188
startReplication(new SegmentReplicationTarget(checkpoint, indexShard, sourceFactory.get(indexShard), listener));
173189
}
174190

175-
public void startReplication(final SegmentReplicationTarget target) {
191+
// pkg-private for integration tests
192+
void startReplication(final SegmentReplicationTarget target) {
176193
final long replicationId = onGoingReplications.start(target, recoverySettings.activityTimeout());
177-
logger.trace(() -> new ParameterizedMessage("Starting replication {}", replicationId));
178194
threadPool.generic().execute(new ReplicationRunner(replicationId));
179195
}
180196

0 commit comments

Comments
 (0)