Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions src/main/kotlin/org/opensearch/replication/ReplicationEngine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

package org.opensearch.replication

import org.opensearch.index.engine.DeletionStrategy
import org.opensearch.index.engine.EngineConfig
import org.opensearch.index.engine.IndexingStrategy
import org.opensearch.index.engine.InternalEngine
import org.opensearch.index.seqno.SequenceNumbers

Expand All @@ -28,12 +30,30 @@ class ReplicationEngine(config: EngineConfig) : InternalEngine(config) {
return operation.seqNo()
}

/**
* Route all index operations through the non-primary planning path. CCR replays operations with
* PRIMARY origin but they need non-primary planning to avoid version conflict checks and in-flight
* doc acquisition. We create a copy with REPLICA origin to satisfy the assertions in both the engine
* and the new OperationStrategyPlanner.
*/
override fun indexingStrategyForOperation(index: Index): IndexingStrategy {
return planIndexingAsNonPrimary(index)
val replicaIndex = Index(
index.uid(), index.parsedDoc(), index.seqNo(), index.primaryTerm(),
index.version(), null, Operation.Origin.REPLICA,
index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry,
SequenceNumbers.UNASSIGNED_SEQ_NO, 0
)
return planIndexingAsNonPrimary(replicaIndex)
}

// Same as above for delete operations.
override fun deletionStrategyForOperation(delete: Delete): DeletionStrategy {
return planDeletionAsNonPrimary(delete)
Comment thread
mohit10011999 marked this conversation as resolved.
val replicaDelete = Delete(
delete.id(), delete.uid(), delete.seqNo(), delete.primaryTerm(),
delete.version(), null, Operation.Origin.REPLICA,
delete.startTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0
)
return planDeletionAsNonPrimary(replicaDelete)
}

override fun assertNonPrimaryOrigin(operation: Operation): Boolean {
Expand Down
Loading