Skip to content

Commit 6157ef7

Browse files
ashking94nknize
authored andcommitted
[Remote Translog] Handle translog upload during primary relocation for remote-backed indexes (opensearch-project#5804)
* Upload translog only if primaryMode is true Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent b7a9ff2 commit 6157ef7

18 files changed

+190
-32
lines changed

server/src/main/java/org/opensearch/index/engine/EngineConfig.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161

6262
import java.util.List;
6363
import java.util.Objects;
64+
import java.util.function.BooleanSupplier;
6465
import java.util.function.LongSupplier;
6566
import java.util.function.Supplier;
6667

@@ -100,6 +101,7 @@ public final class EngineConfig {
100101
private final LongSupplier globalCheckpointSupplier;
101102
private final Supplier<RetentionLeases> retentionLeasesSupplier;
102103
private final boolean isReadOnlyReplica;
104+
private final BooleanSupplier primaryModeSupplier;
103105

104106
/**
105107
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
@@ -200,6 +202,7 @@ private EngineConfig(Builder builder) {
200202
this.primaryTermSupplier = builder.primaryTermSupplier;
201203
this.tombstoneDocSupplier = builder.tombstoneDocSupplier;
202204
this.isReadOnlyReplica = builder.isReadOnlyReplica;
205+
this.primaryModeSupplier = builder.primaryModeSupplier;
203206
this.translogFactory = builder.translogFactory;
204207
}
205208

@@ -405,6 +408,14 @@ public boolean isReadOnlyReplica() {
405408
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
406409
}
407410

411+
/**
412+
* Returns the underlying primaryModeSupplier.
413+
* @return the primary mode supplier.
414+
*/
415+
public BooleanSupplier getPrimaryModeSupplier() {
416+
return primaryModeSupplier;
417+
}
418+
408419
/**
409420
* Returns the underlying translog factory
410421
* @return the translog factory
@@ -470,6 +481,7 @@ public static class Builder {
470481
private TombstoneDocSupplier tombstoneDocSupplier;
471482
private TranslogDeletionPolicyFactory translogDeletionPolicyFactory;
472483
private boolean isReadOnlyReplica;
484+
private BooleanSupplier primaryModeSupplier;
473485
private TranslogFactory translogFactory = new InternalTranslogFactory();
474486

475487
public Builder shardId(ShardId shardId) {
@@ -592,6 +604,11 @@ public Builder readOnlyReplica(boolean isReadOnlyReplica) {
592604
return this;
593605
}
594606

607+
public Builder primaryModeSupplier(BooleanSupplier primaryModeSupplier) {
608+
this.primaryModeSupplier = primaryModeSupplier;
609+
return this;
610+
}
611+
595612
public Builder translogFactory(TranslogFactory translogFactory) {
596613
this.translogFactory = translogFactory;
597614
return this;

server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.Collections;
3939
import java.util.List;
4040
import java.util.Optional;
41+
import java.util.function.BooleanSupplier;
4142
import java.util.function.LongSupplier;
4243
import java.util.function.Supplier;
4344

@@ -149,6 +150,7 @@ public EngineConfig newEngineConfig(
149150
LongSupplier primaryTermSupplier,
150151
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
151152
boolean isReadOnlyReplica,
153+
BooleanSupplier primaryModeSupplier,
152154
TranslogFactory translogFactory
153155
) {
154156
CodecService codecServiceToUse = codecService;
@@ -180,6 +182,7 @@ public EngineConfig newEngineConfig(
180182
.primaryTermSupplier(primaryTermSupplier)
181183
.tombstoneDocSupplier(tombstoneDocSupplier)
182184
.readOnlyReplica(isReadOnlyReplica)
185+
.primaryModeSupplier(primaryModeSupplier)
183186
.translogFactory(translogFactory)
184187
.build();
185188
}

server/src/main/java/org/opensearch/index/engine/InternalEngine.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,11 +286,12 @@ public void onFailure(String reason, Exception ex) {
286286
translogDeletionPolicy,
287287
shardId,
288288
readLock,
289-
() -> getLocalCheckpointTracker(),
289+
this::getLocalCheckpointTracker,
290290
translogUUID,
291291
new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId),
292292
this::ensureOpen,
293-
engineConfig.getTranslogFactory()
293+
engineConfig.getTranslogFactory(),
294+
engineConfig.getPrimaryModeSupplier()
294295
);
295296
this.translogManager = translogManagerRef;
296297
this.softDeletesPolicy = newSoftDeletesPolicy();

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ public void onAfterTranslogSync() {
107107
}
108108
},
109109
this,
110-
engineConfig.getTranslogFactory()
110+
engineConfig.getTranslogFactory(),
111+
engineConfig.getPrimaryModeSupplier()
111112
);
112113
this.translogManager = translogManagerRef;
113114
} catch (IOException e) {

server/src/main/java/org/opensearch/index/engine/NoOpEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,8 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
202202
translogDeletionPolicy,
203203
engineConfig.getGlobalCheckpointSupplier(),
204204
engineConfig.getPrimaryTermSupplier(),
205-
seqNo -> {}
205+
seqNo -> {},
206+
engineConfig.getPrimaryModeSupplier()
206207
)
207208
) {
208209
translog.trimUnreferencedReaders();

server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,8 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
277277
translogDeletionPolicy,
278278
config.getGlobalCheckpointSupplier(),
279279
config.getPrimaryTermSupplier(),
280-
seqNo -> {}
280+
seqNo -> {},
281+
config.getPrimaryModeSupplier()
281282
)
282283
) {
283284
return translog.stats();

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@
220220
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
221221
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
222222
import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;
223+
import static org.opensearch.index.translog.Translog.Durability;
223224

224225
/**
225226
* An OpenSearch index shard
@@ -768,6 +769,14 @@ public void relocated(
768769
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
769770
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
770771
forceRefreshes.close();
772+
773+
boolean syncTranslog = isRemoteTranslogEnabled() && Durability.ASYNC == indexSettings.getTranslogDurability();
774+
// Since all the index permits are acquired at this point, the translog buffer will not change.
775+
// It is safe to perform sync of translogs now as this will ensure for remote-backed indexes, the
776+
// translogs has been uploaded to the remote store.
777+
if (syncTranslog) {
778+
maybeSync();
779+
}
771780
// no shard operation permits are being held here, move state from started to relocated
772781
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
773782
: "in-flight operations in progress while moving shard state to relocated";
@@ -808,6 +817,16 @@ public void relocated(
808817
}
809818
}
810819

820+
private void maybeSync() {
821+
try {
822+
if (isSyncNeeded()) {
823+
sync();
824+
}
825+
} catch (IOException e) {
826+
logger.warn("failed to sync translog", e);
827+
}
828+
}
829+
811830
private void verifyRelocatingState() {
812831
if (state != IndexShardState.STARTED) {
813832
throw new IndexShardNotStartedException(shardId, state);
@@ -2918,7 +2937,7 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
29182937
assert assertPrimaryMode();
29192938
// only sync if there are no operations in flight, or when using async durability
29202939
final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
2921-
final boolean asyncDurability = indexSettings().getTranslogDurability() == Translog.Durability.ASYNC;
2940+
final boolean asyncDurability = indexSettings().getTranslogDurability() == Durability.ASYNC;
29222941
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint() || asyncDurability) {
29232942
final ObjectLongMap<String> globalCheckpoints = getInSyncGlobalCheckpoints();
29242943
final long globalCheckpoint = replicationTracker.getGlobalCheckpoint();
@@ -3011,7 +3030,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
30113030
+ routingEntry()
30123031
+ "]";
30133032
assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint()
3014-
|| indexSettings().getTranslogDurability() == Translog.Durability.ASYNC : "local checkpoint ["
3033+
|| indexSettings().getTranslogDurability() == Durability.ASYNC : "local checkpoint ["
30153034
+ getLocalCheckpoint()
30163035
+ "] does not match checkpoint from primary context ["
30173036
+ primaryContext
@@ -3431,6 +3450,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
34313450
() -> getOperationPrimaryTerm(),
34323451
tombstoneDocSupplier(),
34333452
isReadOnlyReplica,
3453+
replicationTracker::isPrimaryMode,
34343454
translogFactorySupplier.apply(indexSettings, shardRouting)
34353455
);
34363456
}
@@ -3836,7 +3856,7 @@ public boolean isSyncNeeded() {
38363856
/**
38373857
* Returns the current translog durability mode
38383858
*/
3839-
public Translog.Durability getTranslogDurability() {
3859+
public Durability getTranslogDurability() {
38403860
return indexSettings.getTranslogDurability();
38413861
}
38423862

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public void beforeRefresh() throws IOException {
8989
public void afterRefresh(boolean didRefresh) {
9090
synchronized (this) {
9191
try {
92-
if (indexShard.shardRouting.primary()) {
92+
if (indexShard.getReplicationTracker().isPrimaryMode()) {
9393
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
9494
this.primaryTerm = indexShard.getOperationPrimaryTerm();
9595
this.remoteDirectory.init();

server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.index.translog;
1010

1111
import java.io.IOException;
12+
import java.util.function.BooleanSupplier;
1213
import java.util.function.LongConsumer;
1314
import java.util.function.LongSupplier;
1415

@@ -26,7 +27,8 @@ public Translog newTranslog(
2627
TranslogDeletionPolicy translogDeletionPolicy,
2728
LongSupplier globalCheckpointSupplier,
2829
LongSupplier primaryTermSupplier,
29-
LongConsumer persistedSequenceNumberConsumer
30+
LongConsumer persistedSequenceNumberConsumer,
31+
BooleanSupplier primaryModeSupplier
3032
) throws IOException {
3133

3234
return new LocalTranslog(

server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.Closeable;
2323
import java.io.IOException;
2424
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.function.BooleanSupplier;
2526
import java.util.function.LongConsumer;
2627
import java.util.function.LongSupplier;
2728
import java.util.function.Supplier;
@@ -55,7 +56,8 @@ public InternalTranslogManager(
5556
String translogUUID,
5657
TranslogEventListener translogEventListener,
5758
LifecycleAware engineLifeCycleAware,
58-
TranslogFactory translogFactory
59+
TranslogFactory translogFactory,
60+
BooleanSupplier primaryModeSupplier
5961
) throws IOException {
6062
this.shardId = shardId;
6163
this.readLock = readLock;
@@ -68,7 +70,7 @@ public InternalTranslogManager(
6870
if (tracker != null) {
6971
tracker.markSeqNoAsPersisted(seqNo);
7072
}
71-
}, translogUUID, translogFactory);
73+
}, translogUUID, translogFactory, primaryModeSupplier);
7274
assert translog.getGeneration() != null;
7375
this.translog = translog;
7476
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
@@ -345,16 +347,17 @@ protected Translog openTranslog(
345347
LongSupplier globalCheckpointSupplier,
346348
LongConsumer persistedSequenceNumberConsumer,
347349
String translogUUID,
348-
TranslogFactory translogFactory
350+
TranslogFactory translogFactory,
351+
BooleanSupplier primaryModeSupplier
349352
) throws IOException {
350-
351353
return translogFactory.newTranslog(
352354
translogConfig,
353355
translogUUID,
354356
translogDeletionPolicy,
355357
globalCheckpointSupplier,
356358
primaryTermSupplier,
357-
persistedSequenceNumberConsumer
359+
persistedSequenceNumberConsumer,
360+
primaryModeSupplier
358361
);
359362
}
360363

0 commit comments

Comments
 (0)