From d85601bdf405ce50f15733ece13306ed722bfdb4 Mon Sep 17 00:00:00 2001 From: Shashank Gowri Date: Thu, 12 Feb 2026 11:35:30 +0530 Subject: [PATCH] Introducing indexing & deletion strategy planner interfaces Signed-off-by: Shashank Gowri --- CHANGELOG.md | 1 + .../index/engine/DeletionStrategy.java | 110 ++++ .../index/engine/DeletionStrategyPlanner.java | 146 +++++ .../index/engine/IndexingStrategy.java | 122 ++++ .../index/engine/IndexingStrategyPlanner.java | 257 ++++++++ .../index/engine/InternalEngine.java | 553 +++--------------- .../index/engine/OpVsEngineDocStatus.java | 22 + .../index/engine/OperationStrategy.java | 58 ++ .../engine/OperationStrategyPlanner.java | 44 ++ .../engine/DeletionStrategyPlannerTests.java | 334 +++++++++++ .../engine/IndexingStrategyPlannerTests.java | 378 ++++++++++++ 11 files changed, 1547 insertions(+), 478 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/engine/DeletionStrategy.java create mode 100644 server/src/main/java/org/opensearch/index/engine/DeletionStrategyPlanner.java create mode 100644 server/src/main/java/org/opensearch/index/engine/IndexingStrategy.java create mode 100644 server/src/main/java/org/opensearch/index/engine/IndexingStrategyPlanner.java create mode 100644 server/src/main/java/org/opensearch/index/engine/OpVsEngineDocStatus.java create mode 100644 server/src/main/java/org/opensearch/index/engine/OperationStrategy.java create mode 100644 server/src/main/java/org/opensearch/index/engine/OperationStrategyPlanner.java create mode 100644 server/src/test/java/org/opensearch/index/engine/DeletionStrategyPlannerTests.java create mode 100644 server/src/test/java/org/opensearch/index/engine/IndexingStrategyPlannerTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 7aab93c3fd5f6..72f25f832a1dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Added TopN selection logic for streaming terms aggregations ([#20481](https://github.com/opensearch-project/OpenSearch/pull/20481)) - Added support for Intra Segment Search ([#19704](https://github.com/opensearch-project/OpenSearch/pull/19704)) - Introduce AdditionalCodecs and EnginePlugin::getAdditionalCodecs hook to allow additional Codec registration ([#20411](https://github.com/opensearch-project/OpenSearch/pull/20411)) +- Introduced strategy planner interfaces for indexing and deletion ([#20585](https://github.com/opensearch-project/OpenSearch/pull/20585)) ### Changed - Move Randomness from server to libs/common ([#20570](https://github.com/opensearch-project/OpenSearch/pull/20570)) diff --git a/server/src/main/java/org/opensearch/index/engine/DeletionStrategy.java b/server/src/main/java/org/opensearch/index/engine/DeletionStrategy.java new file mode 100644 index 0000000000000..bc7c1a2cfd734 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/DeletionStrategy.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.opensearch.common.lucene.uid.Versions; +import org.opensearch.index.seqno.SequenceNumbers; + +import java.util.Objects; + +/** + * The deletion strategy + * + * @opensearch.internal + */ +public final class DeletionStrategy extends OperationStrategy { + + public final boolean currentlyDeleted; + + private DeletionStrategy( + boolean deleteFromEngine, + boolean addStaleOpToEngine, + boolean currentlyDeleted, + long versionOfDeletion, + int reservedDocs, + Engine.DeleteResult earlyResultOnPreflightError + ) { + super(deleteFromEngine, addStaleOpToEngine, versionOfDeletion, earlyResultOnPreflightError, reservedDocs); + assert (deleteFromEngine && earlyResultOnPreflightError != null) == false + : "can only delete from engine or have a preflight result but not both." + + "deleteFromEngine: " + + deleteFromEngine + + " earlyResultOnPreFlightError:" + + earlyResultOnPreflightError; + assert reservedDocs == 0 || deleteFromEngine || addStaleOpToEngine : reservedDocs; + this.currentlyDeleted = currentlyDeleted; + } + + static DeletionStrategy skipDueToVersionConflict(VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) { + final Engine.DeleteResult deleteResult = new Engine.DeleteResult( + e, + currentVersion, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + SequenceNumbers.UNASSIGNED_SEQ_NO, + currentlyDeleted == false + ); + return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, 0, deleteResult); + } + + static DeletionStrategy processNormally(boolean currentlyDeleted, long versionOfDeletion, int reservedDocs) { + return new DeletionStrategy(true, false, currentlyDeleted, versionOfDeletion, reservedDocs, null); + + } + + static DeletionStrategy processButSkipEngine(boolean currentlyDeleted, long versionOfDeletion) { + return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, 0, null); + } + + static DeletionStrategy processAsStaleOp(long versionOfDeletion) { + return new DeletionStrategy(false, true, false, versionOfDeletion, 0, null); + } + + static DeletionStrategy failAsTooManyDocs(Exception e) { + final Engine.DeleteResult deleteResult = new Engine.DeleteResult( + e, + Versions.NOT_FOUND, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + SequenceNumbers.UNASSIGNED_SEQ_NO, + false + ); + return new DeletionStrategy(false, false, false, Versions.NOT_FOUND, 0, deleteResult); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + DeletionStrategy that = (DeletionStrategy) o; + return currentlyDeleted == that.currentlyDeleted; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), currentlyDeleted); + } + + @Override + public String toString() { + return "DeletionStrategy{" + + "currentlyDeleted=" + + currentlyDeleted + + ", executeOpOnEngine=" + + executeOpOnEngine + + ", addStaleOpToEngine=" + + addStaleOpToEngine + + ", version=" + + version + + ", earlyResultOnPreFlightError=" + + earlyResultOnPreFlightError + + ", reservedDocs=" + + reservedDocs + + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/DeletionStrategyPlanner.java b/server/src/main/java/org/opensearch/index/engine/DeletionStrategyPlanner.java new file mode 100644 index 0000000000000..1fb78c52a0101 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/DeletionStrategyPlanner.java @@ -0,0 +1,146 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.opensearch.common.CheckedBiFunction; +import org.opensearch.common.CheckedFunction; +import org.opensearch.common.lucene.uid.Versions; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.seqno.SequenceNumbers; + +import java.io.IOException; +import java.util.function.BiFunction; +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** + * Plans execution strategies for deletion operations. + * The planner produces {@link DeletionStrategy} instances that guide the engine's + * execution of delete operations on both primary and replica shards. + * + * @opensearch.internal + */ +public class DeletionStrategyPlanner implements OperationStrategyPlanner { + + private final IndexSettings indexSettings; + private final ShardId shardId; + private final Predicate hasBeenProcessedBefore; + private final CheckedFunction opVsEngineDocStatusFunction; + private final CheckedBiFunction docVersionSupplier; + private final BiFunction tryAcquireInFlightDocs; + private final Supplier incrementVersionLookup; + + public DeletionStrategyPlanner( + IndexSettings indexSettings, + ShardId shardId, + Predicate hasBeenProcessedBefore, + CheckedFunction opVsEngineDocStatusFunction, + CheckedBiFunction docVersionSupplier, + BiFunction tryAcquireInFlightDocs, + Supplier incrementVersionLookup + ) { + this.indexSettings = indexSettings; + this.shardId = shardId; + this.hasBeenProcessedBefore = hasBeenProcessedBefore; + this.opVsEngineDocStatusFunction = opVsEngineDocStatusFunction; + this.docVersionSupplier = docVersionSupplier; + this.tryAcquireInFlightDocs = tryAcquireInFlightDocs; + this.incrementVersionLookup = incrementVersionLookup; + } + + @Override + public DeletionStrategy planOperationAsPrimary(Engine.Delete delete) throws IOException { + assert delete.origin() == Engine.Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); + // resolve operation from external to internal + final VersionValue versionValue = docVersionSupplier.apply(delete, delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO); + assert incrementVersionLookup.get(); + final long currentVersion; + final boolean currentlyDeleted; + if (versionValue == null) { + currentVersion = Versions.NOT_FOUND; + currentlyDeleted = true; + } else { + currentVersion = versionValue.version; + currentlyDeleted = versionValue.isDelete(); + } + final DeletionStrategy plan; + if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && currentlyDeleted) { + final VersionConflictEngineException e = new VersionConflictEngineException( + shardId, + delete.id(), + delete.getIfSeqNo(), + delete.getIfPrimaryTerm(), + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ); + plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, true); + } else if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO + && (versionValue.seqNo != delete.getIfSeqNo() || versionValue.term != delete.getIfPrimaryTerm())) { + final VersionConflictEngineException e = new VersionConflictEngineException( + shardId, + delete.id(), + delete.getIfSeqNo(), + delete.getIfPrimaryTerm(), + versionValue.seqNo, + versionValue.term + ); + plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted); + } else if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) { + final VersionConflictEngineException e = new VersionConflictEngineException( + shardId, + delete, + currentVersion, + currentlyDeleted + ); + plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted); + } else { + final Exception reserveError = tryAcquireInFlightDocs.apply(delete, 1); + if (reserveError != null) { + plan = DeletionStrategy.failAsTooManyDocs(reserveError); + } else { + final long versionOfDeletion = delete.versionType().updateVersion(currentVersion, delete.version()); + plan = DeletionStrategy.processNormally(currentlyDeleted, versionOfDeletion, 1); + } + } + return plan; + } + + @Override + public DeletionStrategy planOperationAsNonPrimary(Engine.Delete delete) throws IOException { + assert assertNonPrimaryOrigin(delete); + final DeletionStrategy plan; + if (hasBeenProcessedBefore.test(delete)) { + // the operation seq# was processed thus this operation was already put into lucene + // this can happen during recovery where older operations are sent from the translog that are already + // part of the lucene commit (either from a peer recovery or a local translog) + // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in + // question may have been deleted in an out of order op that is not replayed. + // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery + // See testRecoveryWithOutOfOrderDelete for an example of peer recovery + plan = DeletionStrategy.processButSkipEngine(false, delete.version()); + } else { + boolean segRepEnabled = indexSettings.isSegRepEnabledOrRemoteNode(); + final OpVsEngineDocStatus opVsLucene = opVsEngineDocStatusFunction.apply(delete); + if (opVsLucene == OpVsEngineDocStatus.OP_STALE_OR_EQUAL) { + if (segRepEnabled) { + // For segrep based indices, we can't completely rely on localCheckpointTracker + // as the preserved checkpoint may not have all the operations present in lucene + // we don't need to index it again as stale op as it would create multiple documents for same seq no + plan = DeletionStrategy.processButSkipEngine(false, delete.version()); + } else { + plan = DeletionStrategy.processAsStaleOp(delete.version()); + } + } else { + plan = DeletionStrategy.processNormally(opVsLucene == OpVsEngineDocStatus.DOC_NOT_FOUND, delete.version(), 0); + } + } + return plan; + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/IndexingStrategy.java b/server/src/main/java/org/opensearch/index/engine/IndexingStrategy.java new file mode 100644 index 0000000000000..2258b1a1e136b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/IndexingStrategy.java @@ -0,0 +1,122 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.opensearch.common.lucene.uid.Versions; + +import java.util.Objects; + +/** + * The indexing strategy + * + * @opensearch.internal + */ +public final class IndexingStrategy extends OperationStrategy { + + public final boolean currentNotFoundOrDeleted; + public final boolean useUpdateDocument; + + private IndexingStrategy( + boolean currentNotFoundOrDeleted, + boolean useUpdateDocument, + boolean indexIntoEngine, + boolean addStaleOpToEngine, + long versionForIndexing, + int reservedDocs, + Engine.IndexResult earlyResultOnPreFlightError + ) { + super(indexIntoEngine, addStaleOpToEngine, versionForIndexing, earlyResultOnPreFlightError, reservedDocs); + assert useUpdateDocument == false || indexIntoEngine : "use update is set to true, but we're not indexing into engine"; + assert (indexIntoEngine && earlyResultOnPreFlightError != null) == false + : "can only index into engine or have a preflight result but not both." + + "indexIntoEngine: " + + indexIntoEngine + + " earlyResultOnPreFlightError:" + + earlyResultOnPreFlightError; + assert reservedDocs == 0 || indexIntoEngine || addStaleOpToEngine : reservedDocs; + this.currentNotFoundOrDeleted = currentNotFoundOrDeleted; + this.useUpdateDocument = useUpdateDocument; + } + + static IndexingStrategy optimizedAppendOnly(long versionForIndexing, int reservedDocs) { + return new IndexingStrategy(true, false, true, false, versionForIndexing, reservedDocs, null); + } + + static IndexingStrategy skipDueToVersionConflict( + VersionConflictEngineException e, + boolean currentNotFoundOrDeleted, + long currentVersion + ) { + final Engine.IndexResult result = new Engine.IndexResult(e, currentVersion); + return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, Versions.NOT_FOUND, 0, result); + } + + static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, long versionForIndexing, int reservedDocs) { + return new IndexingStrategy( + currentNotFoundOrDeleted, + currentNotFoundOrDeleted == false, + true, + false, + versionForIndexing, + reservedDocs, + null + ); + } + + static IndexingStrategy processButSkipEngine(boolean currentNotFoundOrDeleted, long versionForIndexing) { + return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, versionForIndexing, 0, null); + } + + static IndexingStrategy processAsStaleOp(long versionForIndexing) { + return new IndexingStrategy(false, false, false, true, versionForIndexing, 0, null); + } + + static IndexingStrategy failAsTooManyDocs(Exception e) { + final Engine.IndexResult result = new Engine.IndexResult(e, Versions.NOT_FOUND); + return new IndexingStrategy(false, false, false, false, Versions.NOT_FOUND, 0, result); + } + + static IndexingStrategy failAsIndexAppendOnly(Engine.IndexResult result, long versionForIndexing, int reservedDocs) { + return new IndexingStrategy(false, false, false, true, versionForIndexing, reservedDocs, result); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + IndexingStrategy that = (IndexingStrategy) o; + return currentNotFoundOrDeleted == that.currentNotFoundOrDeleted && useUpdateDocument == that.useUpdateDocument; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), currentNotFoundOrDeleted, useUpdateDocument); + } + + @Override + public String toString() { + return "IndexingStrategy{" + + "currentNotFoundOrDeleted=" + + currentNotFoundOrDeleted + + ", useUpdateDocument=" + + useUpdateDocument + + ", executeOpOnEngine=" + + executeOpOnEngine + + ", addStaleOpToEngine=" + + addStaleOpToEngine + + ", version=" + + version + + ", earlyResultOnPreFlightError=" + + earlyResultOnPreFlightError + + ", reservedDocs=" + + reservedDocs + + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/IndexingStrategyPlanner.java b/server/src/main/java/org/opensearch/index/engine/IndexingStrategyPlanner.java new file mode 100644 index 0000000000000..93b534ba755e6 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/IndexingStrategyPlanner.java @@ -0,0 +1,257 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.opensearch.action.index.IndexRequest; +import org.opensearch.common.CheckedBiFunction; +import org.opensearch.common.CheckedFunction; +import org.opensearch.common.lucene.uid.Versions; +import org.opensearch.core.index.AppendOnlyIndexOperationRetryException; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.VersionType; +import org.opensearch.index.seqno.SequenceNumbers; + +import java.io.IOException; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** + * Plans execution strategies for indexing operations. + * The planner produces {@link IndexingStrategy} instances that guide the engine's + * execution of index operations on both primary and replica shards. + * + * @opensearch.internal + */ +public class IndexingStrategyPlanner implements OperationStrategyPlanner { + + private final IndexSettings indexSettings; + private final ShardId shardId; + private final LiveVersionMap versionMap; + private final Supplier maxUnsafeAutoIdTimestampSupplier; + private final Supplier maxSeqNoOfUpdatesOrDeletesSupplier; + private final Supplier processedCheckpointSupplier; + private final Predicate hasBeenProcessedBefore; + private final CheckedFunction opVsEngineDocStatusFunction; + private final CheckedBiFunction docVersionSupplier; + private final BiConsumer updateAutoIdTimestampConsumer; + private final BiFunction tryAcquireInFlightDocs; + + public IndexingStrategyPlanner( + IndexSettings indexSettings, + ShardId shardId, + LiveVersionMap versionMap, + Supplier maxUnsafeAutoIdTimestampSupplier, + Supplier maxSeqNoOfUpdatesOrDeletesSupplier, + Supplier processedCheckpointSupplier, + Predicate hasBeenProcessedBefore, + CheckedFunction opVsEngineDocStatusFunction, + CheckedBiFunction docVersionSupplier, + BiConsumer updateAutoIdTimestampConsumer, + BiFunction tryAcquireInFlightDocs + ) { + this.indexSettings = indexSettings; + this.shardId = shardId; + this.versionMap = versionMap; + this.maxUnsafeAutoIdTimestampSupplier = maxUnsafeAutoIdTimestampSupplier; + this.maxSeqNoOfUpdatesOrDeletesSupplier = maxSeqNoOfUpdatesOrDeletesSupplier; + this.processedCheckpointSupplier = processedCheckpointSupplier; + this.hasBeenProcessedBefore = hasBeenProcessedBefore; + this.opVsEngineDocStatusFunction = opVsEngineDocStatusFunction; + this.docVersionSupplier = docVersionSupplier; + this.updateAutoIdTimestampConsumer = updateAutoIdTimestampConsumer; + this.tryAcquireInFlightDocs = tryAcquireInFlightDocs; + } + + @Override + public IndexingStrategy planOperationAsPrimary(Engine.Index index) throws IOException { + assert index.origin() == Engine.Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); + final int reservingDocs = index.parsedDoc().docs().size(); + final IndexingStrategy plan; + // resolve an external operation into an internal one which is safe to replay + final boolean canOptimizeAddDocument = canOptimizeAddDocument(index); + if (canOptimizeAddDocument && mayHaveBeenIndexedBefore(index) == false) { + final Exception reserveError = tryAcquireInFlightDocs.apply(index, reservingDocs); + if (reserveError != null) { + plan = IndexingStrategy.failAsTooManyDocs(reserveError); + } else { + plan = IndexingStrategy.optimizedAppendOnly(1L, reservingDocs); + } + } else { + versionMap.enforceSafeAccess(); + // resolves incoming version + final VersionValue versionValue = docVersionSupplier.apply(index, true); + final long currentVersion; + final boolean currentNotFoundOrDeleted; + if (versionValue == null) { + currentVersion = Versions.NOT_FOUND; + currentNotFoundOrDeleted = true; + } else { + currentVersion = versionValue.version; + currentNotFoundOrDeleted = versionValue.isDelete(); + } + if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && currentNotFoundOrDeleted) { + final VersionConflictEngineException e = new VersionConflictEngineException( + shardId, + index.id(), + index.getIfSeqNo(), + index.getIfPrimaryTerm(), + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ); + plan = IndexingStrategy.skipDueToVersionConflict(e, true, currentVersion); + } else if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO + && (versionValue.seqNo != index.getIfSeqNo() || versionValue.term != index.getIfPrimaryTerm())) { + final VersionConflictEngineException e = new VersionConflictEngineException( + shardId, + index.id(), + index.getIfSeqNo(), + index.getIfPrimaryTerm(), + versionValue.seqNo, + versionValue.term + ); + plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); + } else if (index.versionType().isVersionConflictForWrites(currentVersion, index.version(), currentNotFoundOrDeleted)) { + final VersionConflictEngineException e = new VersionConflictEngineException( + shardId, + index, + currentVersion, + currentNotFoundOrDeleted + ); + plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); + } else { + final Exception reserveError = tryAcquireInFlightDocs.apply(index, reservingDocs); + if (reserveError != null) { + plan = IndexingStrategy.failAsTooManyDocs(reserveError); + } else if (currentVersion >= 1 && indexSettings.getIndexMetadata().isAppendOnlyIndex()) { + // Retry happens for indexing requests for append only indices, since we are rejecting update requests + // at Transport layer itself. So for any retry, we are reconstructing response from already indexed + // document version for append only index. + AppendOnlyIndexOperationRetryException retryException = new AppendOnlyIndexOperationRetryException( + "Indexing operation retried for append only indices" + ); + final Engine.IndexResult result = new Engine.IndexResult( + retryException, + currentVersion, + versionValue.term, + versionValue.seqNo + ); + plan = IndexingStrategy.failAsIndexAppendOnly(result, currentVersion, 0); + } else { + plan = IndexingStrategy.processNormally( + currentNotFoundOrDeleted, + canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version()), + reservingDocs + ); + } + } + } + return plan; + } + + @Override + public IndexingStrategy planOperationAsNonPrimary(Engine.Index index) throws IOException { + assert assertNonPrimaryOrigin(index); + // needs to maintain the auto_id timestamp in case this replica becomes primary + if (canOptimizeAddDocument(index)) { + mayHaveBeenIndexedBefore(index); + } + final IndexingStrategy plan; + // unlike the primary, replicas don't really care to about creation status of documents + // this allows to ignore the case where a document was found in the live version maps in + // a delete state and return false for the created flag in favor of code simplicity + final long maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletesSupplier.get(); + if (hasBeenProcessedBefore.test(index)) { + // the operation seq# was processed and thus the same operation was already put into lucene + // this can happen during recovery where older operations are sent from the translog that are already + // part of the lucene commit (either from a peer recovery or a local translog) + // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in + // question may have been deleted in an out of order op that is not replayed. + // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery + // See testRecoveryWithOutOfOrderDelete for an example of peer recovery + plan = IndexingStrategy.processButSkipEngine(false, index.version()); + } else if (maxSeqNoOfUpdatesOrDeletes <= processedCheckpointSupplier.get()) { + // see Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers + assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes; + plan = IndexingStrategy.optimizedAppendOnly(index.version(), 0); + } else { + boolean segRepEnabled = indexSettings.isSegRepEnabledOrRemoteNode(); + versionMap.enforceSafeAccess(); + final OpVsEngineDocStatus opVsEngine = opVsEngineDocStatusFunction.apply(index); + if (opVsEngine == OpVsEngineDocStatus.OP_STALE_OR_EQUAL) { + if (segRepEnabled) { + // For segrep based indices, we can't completely rely on localCheckpointTracker + // as the preserved checkpoint may not have all the operations present in lucene + // we don't need to index it again as stale op as it would create multiple documents for same seq no + plan = IndexingStrategy.processButSkipEngine(false, index.version()); + } else { + plan = IndexingStrategy.processAsStaleOp(index.version()); + } + } else { + plan = IndexingStrategy.processNormally(opVsEngine == OpVsEngineDocStatus.DOC_NOT_FOUND, index.version(), 0); + } + } + return plan; + } + + protected boolean canOptimizeAddDocument(Engine.Index index) { + if (index.getAutoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) { + assert index.getAutoGeneratedIdTimestamp() >= 0 : "autoGeneratedIdTimestamp must be positive but was: " + + index.getAutoGeneratedIdTimestamp(); + switch (index.origin()) { + case PRIMARY: + assert assertPrimaryCanOptimizeAddDocument(index); + return true; + case PEER_RECOVERY: + case REPLICA: + assert index.version() == 1 && index.versionType() == null : "version: " + + index.version() + + " type: " + + index.versionType(); + return true; + case LOCAL_TRANSLOG_RECOVERY: + case LOCAL_RESET: + assert index.isRetry(); + return true; // allow to optimize in order to update the max safe time stamp + default: + throw new IllegalArgumentException("unknown origin " + index.origin()); + } + } + return false; + } + + protected boolean assertPrimaryCanOptimizeAddDocument(final Engine.Index index) { + assert (index.version() == Versions.MATCH_DELETED || index.version() == Versions.MATCH_ANY) + && index.versionType() == VersionType.INTERNAL : "version: " + index.version() + " type: " + index.versionType(); + return true; + } + + /** + * returns true if the indexing operation may have already be processed by this engine. + * Note that it is OK to rarely return true even if this is not the case. However a `false` + * return value must always be correct. + * + */ + private boolean mayHaveBeenIndexedBefore(Engine.Index index) { + assert canOptimizeAddDocument(index); + final boolean mayHaveBeenIndexBefore; + if (index.isRetry()) { + mayHaveBeenIndexBefore = true; + updateAutoIdTimestampConsumer.accept(index.getAutoGeneratedIdTimestamp(), true); + assert maxUnsafeAutoIdTimestampSupplier.get() >= index.getAutoGeneratedIdTimestamp(); + } else { + // in this case we force + mayHaveBeenIndexBefore = maxUnsafeAutoIdTimestampSupplier.get() >= index.getAutoGeneratedIdTimestamp(); + updateAutoIdTimestampConsumer.accept(index.getAutoGeneratedIdTimestamp(), false); + } + return mayHaveBeenIndexBefore; + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 95cf0a05d1fa0..a9b672f20af77 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -63,7 +63,6 @@ import org.apache.lucene.util.BytesRef; import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; -import org.opensearch.action.index.IndexRequest; import org.opensearch.common.Nullable; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.concurrent.GatedCloseable; @@ -87,7 +86,6 @@ import org.opensearch.core.index.AppendOnlyIndexOperationRetryException; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; -import org.opensearch.index.VersionType; import org.opensearch.index.fieldvisitor.IdOnlyFieldVisitor; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.ParseContext; @@ -217,6 +215,9 @@ public class InternalEngine extends Engine { private final int maxDocs; private final IndexWriterFactory nativeIndexWriterFactory; + private final IndexingStrategyPlanner indexingStrategyPlanner; + private final DeletionStrategyPlanner deletionStrategyPlanner; + public InternalEngine(EngineConfig engineConfig) { this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER); } @@ -348,6 +349,28 @@ public void onFailure(String reason, Exception ex) { } completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); this.externalReaderManager.addListener(completionStatsCache); + this.indexingStrategyPlanner = new IndexingStrategyPlanner( + engineConfig.getIndexSettings(), + engineConfig.getShardId(), + versionMap, + maxUnsafeAutoIdTimestamp::get, + maxSeqNoOfUpdatesOrDeletes::get, + localCheckpointTracker::getProcessedCheckpoint, + this::hasBeenProcessedBefore, + this::compareOpToLuceneDocBasedOnSeqNo, + this::resolveDocVersion, + this::updateAutoIdTimestamp, + this::tryAcquireInFlightDocs + ); + this.deletionStrategyPlanner = new DeletionStrategyPlanner( + engineConfig.getIndexSettings(), + engineConfig.getShardId(), + this::hasBeenProcessedBefore, + this::compareOpToLuceneDocBasedOnSeqNo, + this::resolveDocVersion, + this::tryAcquireInFlightDocs, + this::incrementVersionLookup + ); success = true; } finally { if (success == false) { @@ -707,23 +730,10 @@ public GetResult get(Get get, BiFunction } } - /** - * the status of the current doc version in lucene, compared to the version in an incoming - * operation - */ - enum OpVsLuceneDocStatus { - /** the op is more recent than the one that last modified the doc found in lucene*/ - OP_NEWER, - /** the op is older or the same as the one that last modified the doc found in lucene*/ - OP_STALE_OR_EQUAL, - /** no doc was found in lucene */ - LUCENE_DOC_NOT_FOUND - } - - private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long seqNo, long primaryTerm, VersionValue versionValue) { + private static OpVsEngineDocStatus compareOpToVersionMapOnSeqNo(String id, long seqNo, long primaryTerm, VersionValue versionValue) { Objects.requireNonNull(versionValue); if (seqNo > versionValue.seqNo) { - return OpVsLuceneDocStatus.OP_NEWER; + return OpVsEngineDocStatus.OP_NEWER; } else if (seqNo == versionValue.seqNo) { assert versionValue.term == primaryTerm : "primary term not matched; id=" + id @@ -733,15 +743,15 @@ private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long + primaryTerm + " existing_term=" + versionValue.term; - return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + return OpVsEngineDocStatus.OP_STALE_OR_EQUAL; } else { - return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + return OpVsEngineDocStatus.OP_STALE_OR_EQUAL; } } - private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { + private OpVsEngineDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; - final OpVsLuceneDocStatus status; + final OpVsEngineDocStatus status; VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode(); @@ -753,15 +763,15 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) { final DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.getIndexReader(), op.uid()); if (docAndSeqNo == null) { - status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; + status = OpVsEngineDocStatus.DOC_NOT_FOUND; } else if (op.seqNo() > docAndSeqNo.seqNo) { - status = OpVsLuceneDocStatus.OP_NEWER; + status = OpVsEngineDocStatus.OP_NEWER; } else if (op.seqNo() == docAndSeqNo.seqNo) { assert localCheckpointTracker.hasProcessed(op.seqNo()) || segRepEnabled : "local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id(); - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + status = OpVsEngineDocStatus.OP_STALE_OR_EQUAL; } else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + status = OpVsEngineDocStatus.OP_STALE_OR_EQUAL; } } } @@ -830,36 +840,12 @@ private VersionValue getVersionFromMap(BytesRef id) { return versionMap.getUnderLock(id); } - private boolean canOptimizeAddDocument(Index index) { - if (index.getAutoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) { - assert index.getAutoGeneratedIdTimestamp() >= 0 : "autoGeneratedIdTimestamp must be positive but was: " - + index.getAutoGeneratedIdTimestamp(); - switch (index.origin()) { - case PRIMARY: - assert assertPrimaryCanOptimizeAddDocument(index); - return true; - case PEER_RECOVERY: - case REPLICA: - assert index.version() == 1 && index.versionType() == null : "version: " - + index.version() - + " type: " - + index.versionType(); - return true; - case LOCAL_TRANSLOG_RECOVERY: - case LOCAL_RESET: - assert index.isRetry(); - return true; // allow to optimize in order to update the max safe time stamp - default: - throw new IllegalArgumentException("unknown origin " + index.origin()); - } - } - return false; - } - + /** + * @deprecated This assertion has been moved to IndexingStrategyPlanner + */ + @Deprecated(forRemoval = true) protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) { - assert (index.version() == Versions.MATCH_DELETED || index.version() == Versions.MATCH_ANY) - && index.versionType() == VersionType.INTERNAL : "version: " + index.version() + " type: " + index.versionType(); - return true; + return indexingStrategyPlanner.assertPrimaryCanOptimizeAddDocument(index); } private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) { @@ -946,7 +932,7 @@ public IndexResult index(Index index) throws IOException { final IndexResult indexResult; if (plan.earlyResultOnPreFlightError.isPresent()) { assert index.origin() == Operation.Origin.PRIMARY : index.origin(); - indexResult = plan.earlyResultOnPreFlightError.get(); + indexResult = (IndexResult) plan.earlyResultOnPreFlightError.get(); assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType(); } else { // generate or register sequence number @@ -966,7 +952,7 @@ public IndexResult index(Index index) throws IOException { index.getIfPrimaryTerm() ); - final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; + final boolean toAppend = plan.executeOpOnEngine && plan.useUpdateDocument == false; if (toAppend == false) { advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo()); } @@ -976,15 +962,10 @@ public IndexResult index(Index index) throws IOException { assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin(); - if (plan.indexIntoLucene || plan.addStaleOpToLucene) { + if (plan.executeOpOnEngine || plan.addStaleOpToEngine) { indexResult = indexIntoLucene(index, plan); } else { - indexResult = new IndexResult( - plan.versionForIndexing, - index.primaryTerm(), - index.seqNo(), - plan.currentNotFoundOrDeleted - ); + indexResult = new IndexResult(plan.version, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted); } } @@ -1009,11 +990,11 @@ public IndexResult index(Index index) throws IOException { } indexResult.setTranslogLocation(location); } - if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) { + if (plan.executeOpOnEngine && indexResult.getResultType() == Result.Type.SUCCESS) { final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null; versionMap.maybePutIndexUnderLock( index.uid().bytes(), - new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm()) + new IndexVersionValue(translogLocation, plan.version, index.seqNo(), index.primaryTerm()) ); } localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo()); @@ -1046,46 +1027,11 @@ public IndexResult index(Index index) throws IOException { protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { assert assertNonPrimaryOrigin(index); - // needs to maintain the auto_id timestamp in case this replica becomes primary - if (canOptimizeAddDocument(index)) { - mayHaveBeenIndexedBefore(index); - } - final IndexingStrategy plan; - // unlike the primary, replicas don't really care to about creation status of documents - // this allows to ignore the case where a document was found in the live version maps in - // a delete state and return false for the created flag in favor of code simplicity - final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); - if (hasBeenProcessedBefore(index)) { - // the operation seq# was processed and thus the same operation was already put into lucene - // this can happen during recovery where older operations are sent from the translog that are already - // part of the lucene commit (either from a peer recovery or a local translog) - // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in - // question may have been deleted in an out of order op that is not replayed. - // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery - // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - plan = IndexingStrategy.processButSkipLucene(false, index.version()); - } else if (maxSeqNoOfUpdatesOrDeletes <= localCheckpointTracker.getProcessedCheckpoint()) { - // see Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers - assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes; - plan = IndexingStrategy.optimizedAppendOnly(index.version(), 0); - } else { - boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode(); - versionMap.enforceSafeAccess(); - final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - if (segRepEnabled) { - // For segrep based indices, we can't completely rely on localCheckpointTracker - // as the preserved checkpoint may not have all the operations present in lucene - // we don't need to index it again as stale op as it would create multiple documents for same seq no - plan = IndexingStrategy.processButSkipLucene(false, index.version()); - } else { - plan = IndexingStrategy.processAsStaleOp(index.version()); - } - } else { - plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0); - } - } - return plan; + return indexingStrategyPlanner.planOperationAsNonPrimary(index); + } + + private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { + return indexingStrategyPlanner.planOperationAsPrimary(index); } protected IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { @@ -1097,109 +1043,29 @@ protected IndexingStrategy indexingStrategyForOperation(final Index index) throw } } - private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { - assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); - final int reservingDocs = index.parsedDoc().docs().size(); - final IndexingStrategy plan; - // resolve an external operation into an internal one which is safe to replay - final boolean canOptimizeAddDocument = canOptimizeAddDocument(index); - if (canOptimizeAddDocument && mayHaveBeenIndexedBefore(index) == false) { - final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs); - if (reserveError != null) { - plan = IndexingStrategy.failAsTooManyDocs(reserveError); - } else { - plan = IndexingStrategy.optimizedAppendOnly(1L, reservingDocs); - } - } else { - versionMap.enforceSafeAccess(); - // resolves incoming version - final VersionValue versionValue = resolveDocVersion(index, true); - final long currentVersion; - final boolean currentNotFoundOrDeleted; - if (versionValue == null) { - currentVersion = Versions.NOT_FOUND; - currentNotFoundOrDeleted = true; - } else { - currentVersion = versionValue.version; - currentNotFoundOrDeleted = versionValue.isDelete(); - } - if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && currentNotFoundOrDeleted) { - final VersionConflictEngineException e = new VersionConflictEngineException( - shardId, - index.id(), - index.getIfSeqNo(), - index.getIfPrimaryTerm(), - SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM - ); - plan = IndexingStrategy.skipDueToVersionConflict(e, true, currentVersion); - } else if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO - && (versionValue.seqNo != index.getIfSeqNo() || versionValue.term != index.getIfPrimaryTerm())) { - final VersionConflictEngineException e = new VersionConflictEngineException( - shardId, - index.id(), - index.getIfSeqNo(), - index.getIfPrimaryTerm(), - versionValue.seqNo, - versionValue.term - ); - plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); - } else if (index.versionType().isVersionConflictForWrites(currentVersion, index.version(), currentNotFoundOrDeleted)) { - final VersionConflictEngineException e = new VersionConflictEngineException( - shardId, - index, - currentVersion, - currentNotFoundOrDeleted - ); - plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); - } else { - final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs); - if (reserveError != null) { - plan = IndexingStrategy.failAsTooManyDocs(reserveError); - } else if (currentVersion >= 1 && engineConfig.getIndexSettings().getIndexMetadata().isAppendOnlyIndex()) { - // Retry happens for indexing requests for append only indices, since we are rejecting update requests - // at Transport layer itself. So for any retry, we are reconstructing response from already indexed - // document version for append only index. - AppendOnlyIndexOperationRetryException retryException = new AppendOnlyIndexOperationRetryException( - "Indexing operation retried for append only indices" - ); - final IndexResult result = new IndexResult(retryException, currentVersion, versionValue.term, versionValue.seqNo); - plan = IndexingStrategy.failAsIndexAppendOnly(result, currentVersion, 0); - } else { - plan = IndexingStrategy.processNormally( - currentNotFoundOrDeleted, - canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version()), - reservingDocs - ); - } - } - } - return plan; - } - private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws IOException { assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin(); - assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing; - assert plan.indexIntoLucene || plan.addStaleOpToLucene; + assert plan.version >= 0 : "version must be set. got " + plan.version; + assert plan.executeOpOnEngine || plan.addStaleOpToEngine; /* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. */ index.parsedDoc().updateSeqID(index.seqNo(), index.primaryTerm()); - index.parsedDoc().version().setLongValue(plan.versionForIndexing); + index.parsedDoc().version().setLongValue(plan.version); try { - if (plan.addStaleOpToLucene) { + if (plan.addStaleOpToEngine) { addStaleDocs(index.docs(), documentIndexWriter, index.uid()); - } else if (plan.useLuceneUpdateDocument) { + } else if (plan.useUpdateDocument) { assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), index.seqNo(), true, true); - updateDocs(index.uid(), index.docs(), documentIndexWriter, plan.versionForIndexing, index.seqNo(), index.primaryTerm()); + updateDocs(index.uid(), index.docs(), documentIndexWriter, plan.version, index.seqNo(), index.primaryTerm()); } else { // document does not exists, we can optimize for create, but double check if assertions are running - assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); + assert assertDocDoesNotExist(index, indexingStrategyPlanner.canOptimizeAddDocument(index) == false); addDocs(index.docs(), documentIndexWriter, index.uid()); } - return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted); + return new IndexResult(plan.version, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted); } catch (Exception ex) { if (ex instanceof AlreadyClosedException == false // TODO: Check if isClose check in getTragicException will cause any issue here @@ -1237,27 +1103,6 @@ private boolean treatDocumentFailureAsTragicError(Index index) { || index.origin() == Operation.Origin.LOCAL_RESET; } - /** - * returns true if the indexing operation may have already be processed by this engine. - * Note that it is OK to rarely return true even if this is not the case. However a `false` - * return value must always be correct. - * - */ - private boolean mayHaveBeenIndexedBefore(Index index) { - assert canOptimizeAddDocument(index); - final boolean mayHaveBeenIndexBefore; - if (index.isRetry()) { - mayHaveBeenIndexBefore = true; - updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), true); - assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); - } else { - // in this case we force - mayHaveBeenIndexBefore = maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); - updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), false); - } - return mayHaveBeenIndexBefore; - } - private void addDocs(final List docs, final DocumentIndexWriter indexWriter, Term uid) throws IOException { if (docs.size() > 1) { indexWriter.addDocuments(docs, uid); @@ -1278,92 +1123,6 @@ private void addStaleDocs(final List docs, final Document } } - /** - * The indexing strategy - * - * @opensearch.internal - */ - protected static final class IndexingStrategy { - final boolean currentNotFoundOrDeleted; - final boolean useLuceneUpdateDocument; - final long versionForIndexing; - final boolean indexIntoLucene; - final boolean addStaleOpToLucene; - final int reservedDocs; - final Optional earlyResultOnPreFlightError; - - private IndexingStrategy( - boolean currentNotFoundOrDeleted, - boolean useLuceneUpdateDocument, - boolean indexIntoLucene, - boolean addStaleOpToLucene, - long versionForIndexing, - int reservedDocs, - IndexResult earlyResultOnPreFlightError - ) { - assert useLuceneUpdateDocument == false || indexIntoLucene - : "use lucene update is set to true, but we're not indexing into lucene"; - assert (indexIntoLucene && earlyResultOnPreFlightError != null) == false - : "can only index into lucene or have a preflight result but not both." - + "indexIntoLucene: " - + indexIntoLucene - + " earlyResultOnPreFlightError:" - + earlyResultOnPreFlightError; - assert reservedDocs == 0 || indexIntoLucene || addStaleOpToLucene : reservedDocs; - this.currentNotFoundOrDeleted = currentNotFoundOrDeleted; - this.useLuceneUpdateDocument = useLuceneUpdateDocument; - this.versionForIndexing = versionForIndexing; - this.indexIntoLucene = indexIntoLucene; - this.addStaleOpToLucene = addStaleOpToLucene; - this.reservedDocs = reservedDocs; - this.earlyResultOnPreFlightError = earlyResultOnPreFlightError == null - ? Optional.empty() - : Optional.of(earlyResultOnPreFlightError); - } - - static IndexingStrategy optimizedAppendOnly(long versionForIndexing, int reservedDocs) { - return new IndexingStrategy(true, false, true, false, versionForIndexing, reservedDocs, null); - } - - public static IndexingStrategy skipDueToVersionConflict( - VersionConflictEngineException e, - boolean currentNotFoundOrDeleted, - long currentVersion - ) { - final IndexResult result = new IndexResult(e, currentVersion); - return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, Versions.NOT_FOUND, 0, result); - } - - static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, long versionForIndexing, int reservedDocs) { - return new IndexingStrategy( - currentNotFoundOrDeleted, - currentNotFoundOrDeleted == false, - true, - false, - versionForIndexing, - reservedDocs, - null - ); - } - - public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) { - return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, versionForIndexing, 0, null); - } - - static IndexingStrategy processAsStaleOp(long versionForIndexing) { - return new IndexingStrategy(false, false, false, true, versionForIndexing, 0, null); - } - - static IndexingStrategy failAsTooManyDocs(Exception e) { - final IndexResult result = new IndexResult(e, Versions.NOT_FOUND); - return new IndexingStrategy(false, false, false, false, Versions.NOT_FOUND, 0, result); - } - - static IndexingStrategy failAsIndexAppendOnly(IndexResult result, long versionForIndexing, int reservedDocs) { - return new IndexingStrategy(false, false, false, true, versionForIndexing, reservedDocs, result); - } - } - /** * Asserts that the doc in the index operation really doesn't exist */ @@ -1423,9 +1182,9 @@ public DeleteResult delete(Delete delete) throws IOException { lastWriteNanos = delete.startTime(); final DeletionStrategy plan = deletionStrategyForOperation(delete); reservedDocs = plan.reservedDocs; - if (plan.earlyResultOnPreflightError.isPresent()) { + if (plan.earlyResultOnPreFlightError.isPresent()) { assert delete.origin() == Operation.Origin.PRIMARY : delete.origin(); - deleteResult = plan.earlyResultOnPreflightError.get(); + deleteResult = (DeleteResult) plan.earlyResultOnPreFlightError.get(); } else { // generate or register sequence number if (delete.origin() == Operation.Origin.PRIMARY) { @@ -1449,14 +1208,14 @@ public DeleteResult delete(Delete delete) throws IOException { assert delete.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + delete.origin(); - if (plan.deleteFromLucene || plan.addStaleOpToLucene) { + if (plan.executeOpOnEngine || plan.addStaleOpToEngine) { deleteResult = deleteInLucene(delete, plan); - if (plan.deleteFromLucene) { + if (plan.executeOpOnEngine) { numDocDeletes.inc(); versionMap.putDeleteUnderLock( delete.uid().bytes(), new DeleteVersionValue( - plan.versionOfDeletion, + plan.version, delete.seqNo(), delete.primaryTerm(), engineConfig.getThreadPool().relativeTimeInMillis() @@ -1464,12 +1223,7 @@ public DeleteResult delete(Delete delete) throws IOException { ); } } else { - deleteResult = new DeleteResult( - plan.versionOfDeletion, - delete.primaryTerm(), - delete.seqNo(), - plan.currentlyDeleted == false - ); + deleteResult = new DeleteResult(plan.version, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false); } } if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) { @@ -1534,33 +1288,11 @@ protected DeletionStrategy deletionStrategyForOperation(final Delete delete) thr protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { assert assertNonPrimaryOrigin(delete); - final DeletionStrategy plan; - if (hasBeenProcessedBefore(delete)) { - // the operation seq# was processed thus this operation was already put into lucene - // this can happen during recovery where older operations are sent from the translog that are already - // part of the lucene commit (either from a peer recovery or a local translog) - // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in - // question may have been deleted in an out of order op that is not replayed. - // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery - // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - plan = DeletionStrategy.processButSkipLucene(false, delete.version()); - } else { - boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode(); - final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - if (segRepEnabled) { - // For segrep based indices, we can't completely rely on localCheckpointTracker - // as the preserved checkpoint may not have all the operations present in lucene - // we don't need to index it again as stale op as it would create multiple documents for same seq no - plan = DeletionStrategy.processButSkipLucene(false, delete.version()); - } else { - plan = DeletionStrategy.processAsStaleOp(delete.version()); - } - } else { - plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version(), 0); - } - } - return plan; + return deletionStrategyPlanner.planOperationAsNonPrimary(delete); + } + + private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { + return deletionStrategyPlanner.planOperationAsPrimary(delete); } protected boolean assertNonPrimaryOrigin(final Operation operation) { @@ -1568,69 +1300,13 @@ protected boolean assertNonPrimaryOrigin(final Operation operation) { return true; } - private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { - assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); - // resolve operation from external to internal - final VersionValue versionValue = resolveDocVersion(delete, delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO); - assert incrementVersionLookup(); - final long currentVersion; - final boolean currentlyDeleted; - if (versionValue == null) { - currentVersion = Versions.NOT_FOUND; - currentlyDeleted = true; - } else { - currentVersion = versionValue.version; - currentlyDeleted = versionValue.isDelete(); - } - final DeletionStrategy plan; - if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && currentlyDeleted) { - final VersionConflictEngineException e = new VersionConflictEngineException( - shardId, - delete.id(), - delete.getIfSeqNo(), - delete.getIfPrimaryTerm(), - SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM - ); - plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, true); - } else if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO - && (versionValue.seqNo != delete.getIfSeqNo() || versionValue.term != delete.getIfPrimaryTerm())) { - final VersionConflictEngineException e = new VersionConflictEngineException( - shardId, - delete.id(), - delete.getIfSeqNo(), - delete.getIfPrimaryTerm(), - versionValue.seqNo, - versionValue.term - ); - plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted); - } else if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) { - final VersionConflictEngineException e = new VersionConflictEngineException( - shardId, - delete, - currentVersion, - currentlyDeleted - ); - plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted); - } else { - final Exception reserveError = tryAcquireInFlightDocs(delete, 1); - if (reserveError != null) { - plan = DeletionStrategy.failAsTooManyDocs(reserveError); - } else { - final long versionOfDeletion = delete.versionType().updateVersion(currentVersion, delete.version()); - plan = DeletionStrategy.processNormally(currentlyDeleted, versionOfDeletion, 1); - } - } - return plan; - } - private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException { assert assertMaxSeqNoOfUpdatesIsAdvanced(delete.uid(), delete.seqNo(), false, false); try { final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.id()); assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; tombstone.updateSeqID(delete.seqNo(), delete.primaryTerm()); - tombstone.version().setLongValue(plan.versionOfDeletion); + tombstone.version().setLongValue(plan.version); final ParseContext.Document doc = tombstone.docs().get(0); assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set [" + doc @@ -1638,14 +1314,14 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws doc.add(softDeletesField); documentIndexWriter.deleteDocument( delete.uid(), - plan.addStaleOpToLucene || plan.currentlyDeleted, + plan.addStaleOpToEngine || plan.currentlyDeleted, doc, - plan.versionOfDeletion, + plan.version, delete.seqNo(), delete.primaryTerm(), softDeletesField ); - return new DeleteResult(plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false); + return new DeleteResult(plan.version, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false); } catch (final Exception ex) { /* * Document level failures when deleting are unexpected, we likely hit something fatal such as the Lucene index being corrupt, @@ -1665,85 +1341,6 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws } } - /** - * The deletion strategy - * - * @opensearch.internal - */ - protected static final class DeletionStrategy { - // of a rare double delete - final boolean deleteFromLucene; - final boolean addStaleOpToLucene; - final boolean currentlyDeleted; - final long versionOfDeletion; - final Optional earlyResultOnPreflightError; - final int reservedDocs; - - private DeletionStrategy( - boolean deleteFromLucene, - boolean addStaleOpToLucene, - boolean currentlyDeleted, - long versionOfDeletion, - int reservedDocs, - DeleteResult earlyResultOnPreflightError - ) { - assert (deleteFromLucene && earlyResultOnPreflightError != null) == false - : "can only delete from lucene or have a preflight result but not both." - + "deleteFromLucene: " - + deleteFromLucene - + " earlyResultOnPreFlightError:" - + earlyResultOnPreflightError; - this.deleteFromLucene = deleteFromLucene; - this.addStaleOpToLucene = addStaleOpToLucene; - this.currentlyDeleted = currentlyDeleted; - this.versionOfDeletion = versionOfDeletion; - this.reservedDocs = reservedDocs; - assert reservedDocs == 0 || deleteFromLucene || addStaleOpToLucene : reservedDocs; - this.earlyResultOnPreflightError = earlyResultOnPreflightError == null - ? Optional.empty() - : Optional.of(earlyResultOnPreflightError); - } - - public static DeletionStrategy skipDueToVersionConflict( - VersionConflictEngineException e, - long currentVersion, - boolean currentlyDeleted - ) { - final DeleteResult deleteResult = new DeleteResult( - e, - currentVersion, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM, - SequenceNumbers.UNASSIGNED_SEQ_NO, - currentlyDeleted == false - ); - return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, 0, deleteResult); - } - - static DeletionStrategy processNormally(boolean currentlyDeleted, long versionOfDeletion, int reservedDocs) { - return new DeletionStrategy(true, false, currentlyDeleted, versionOfDeletion, reservedDocs, null); - - } - - public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long versionOfDeletion) { - return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, 0, null); - } - - static DeletionStrategy processAsStaleOp(long versionOfDeletion) { - return new DeletionStrategy(false, true, false, versionOfDeletion, 0, null); - } - - static DeletionStrategy failAsTooManyDocs(Exception e) { - final DeleteResult deleteResult = new DeleteResult( - e, - Versions.NOT_FOUND, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM, - SequenceNumbers.UNASSIGNED_SEQ_NO, - false - ); - return new DeletionStrategy(false, false, false, Versions.NOT_FOUND, 0, deleteResult); - } - } - @Override public void maybePruneDeletes() { // It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it @@ -3058,7 +2655,7 @@ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryRead try (Releasable ignored = versionMap.acquireLock(uid)) { final VersionValue curr = versionMap.getUnderLock(uid); if (curr == null - || compareOpToVersionMapOnSeqNo(idFieldVisitor.getId(), seqNo, primaryTerm, curr) == OpVsLuceneDocStatus.OP_NEWER) { + || compareOpToVersionMapOnSeqNo(idFieldVisitor.getId(), seqNo, primaryTerm, curr) == OpVsEngineDocStatus.OP_NEWER) { if (dv.isTombstone(docId)) { // use 0L for the start time so we can prune this delete tombstone quickly // when the local checkpoint advances (i.e., after a recovery completed). diff --git a/server/src/main/java/org/opensearch/index/engine/OpVsEngineDocStatus.java b/server/src/main/java/org/opensearch/index/engine/OpVsEngineDocStatus.java new file mode 100644 index 0000000000000..ed9e4e2993041 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/OpVsEngineDocStatus.java @@ -0,0 +1,22 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +/** + * the status of the current doc version in engine, compared to the version in an incoming + * operation + */ +public enum OpVsEngineDocStatus { + /** the op is more recent than the one that last modified the doc found in engine*/ + OP_NEWER, + /** the op is older or the same as the one that last modified the doc found in engine*/ + OP_STALE_OR_EQUAL, + /** no doc was found in engine */ + DOC_NOT_FOUND +} diff --git a/server/src/main/java/org/opensearch/index/engine/OperationStrategy.java b/server/src/main/java/org/opensearch/index/engine/OperationStrategy.java new file mode 100644 index 0000000000000..1bff055c41390 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/OperationStrategy.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import java.util.Objects; +import java.util.Optional; + +/** + * Encapsulates the execution strategy for an engine operation (e.g. indexing or deletion). + * + * @opensearch.internal + */ +public class OperationStrategy { + + public final boolean executeOpOnEngine; + public final boolean addStaleOpToEngine; + public final long version; + public final Optional earlyResultOnPreFlightError; + public final int reservedDocs; + + public OperationStrategy( + boolean executeOpOnEngine, + boolean addStaleOpToEngine, + long version, + Engine.Result earlyResultOnPreFlightError, + int reservedDocs + ) { + this.executeOpOnEngine = executeOpOnEngine; + this.addStaleOpToEngine = addStaleOpToEngine; + this.version = version; + this.reservedDocs = reservedDocs; + this.earlyResultOnPreFlightError = earlyResultOnPreFlightError == null + ? Optional.empty() + : Optional.of(earlyResultOnPreFlightError); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OperationStrategy that = (OperationStrategy) o; + return executeOpOnEngine == that.executeOpOnEngine + && addStaleOpToEngine == that.addStaleOpToEngine + && version == that.version + && reservedDocs == that.reservedDocs; + } + + @Override + public int hashCode() { + return Objects.hash(executeOpOnEngine, addStaleOpToEngine, version, reservedDocs); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/OperationStrategyPlanner.java b/server/src/main/java/org/opensearch/index/engine/OperationStrategyPlanner.java new file mode 100644 index 0000000000000..09e99339d41cd --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/OperationStrategyPlanner.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import java.io.IOException; + +/** + * Plans the execution strategy for engine operations based on the shard's role. + * This interface defines how operations (indexing, deletion) should be processed + * differently depending on whether the shard is acting as a primary or replica. + * + * @opensearch.internal + */ +public interface OperationStrategyPlanner { + + /** + * Plans the execution strategy for an operation on a primary shard. + * + * @param operation the operation to plan (index or delete) + * @return the strategy for executing this operation on the primary + * @throws IOException if an I/O error occurs during planning + */ + V planOperationAsPrimary(T operation) throws IOException; + + /** + * Plans the execution strategy for an operation on a non-primary (replica) shard. + * + * @param operation the operation to plan (index or delete) + * @return the strategy for executing this operation on the replica + * @throws IOException if an I/O error occurs during planning + */ + V planOperationAsNonPrimary(T operation) throws IOException; + + default boolean assertNonPrimaryOrigin(final T operation) { + assert operation.origin() != Engine.Operation.Origin.PRIMARY : "planing as primary but got " + operation.origin(); + return true; + } +} diff --git a/server/src/test/java/org/opensearch/index/engine/DeletionStrategyPlannerTests.java b/server/src/test/java/org/opensearch/index/engine/DeletionStrategyPlannerTests.java new file mode 100644 index 0000000000000..44102ec4cb33b --- /dev/null +++ b/server/src/test/java/org/opensearch/index/engine/DeletionStrategyPlannerTests.java @@ -0,0 +1,334 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.lucene.uid.Versions; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.VersionType; +import org.opensearch.test.IndexSettingsModule; + +import java.io.IOException; + +import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + +public class DeletionStrategyPlannerTests extends EngineTestCase { + + public void testPlanOperationAsPrimary() throws IOException { + DeletionStrategyPlanner planner = constructDeletionStrategyPlanner(new DeleteVersionValue(1L, 5L, 1L, 1L)); + + Engine.Delete delete = new Engine.Delete( + "1", + newUid("1"), + UNASSIGNED_SEQ_NO, + primaryTerm.get(), + 1L, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.currentTimeMillis(), + UNASSIGNED_SEQ_NO, + 0L + ); + DeletionStrategy plannedStrategy = planner.planOperationAsPrimary(delete); + DeletionStrategy expectedStrategy = DeletionStrategy.processNormally(true, 2, 1); + + assertEquals(expectedStrategy, plannedStrategy); + } + + public void testPlanOperationAsPrimaryFailAsTooManyDocs() throws IOException { + Exception reserveError = new IllegalStateException("Too many documents"); + DeletionStrategyPlanner planner = new DeletionStrategyPlanner( + engine.engineConfig.getIndexSettings(), + engine.shardId, + op -> false, + op -> OpVsEngineDocStatus.DOC_NOT_FOUND, + (op, refresh) -> new DeleteVersionValue(1L, 5L, 1L, 1L), + (op, docs) -> reserveError, + () -> true + ); + + Engine.Delete delete = new Engine.Delete( + "1", + newUid("1"), + UNASSIGNED_SEQ_NO, + primaryTerm.get(), + 1L, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.currentTimeMillis(), + UNASSIGNED_SEQ_NO, + 0L + ); + DeletionStrategy plannedStrategy = planner.planOperationAsPrimary(delete); + DeletionStrategy expectedStrategy = DeletionStrategy.failAsTooManyDocs(reserveError); + + assertEquals(expectedStrategy, plannedStrategy); + assertTrue(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsPrimaryVersionConflict() throws IOException { + DeletionStrategyPlanner planner = constructDeletionStrategyPlanner(new IndexVersionValue(null, 1L, 1L, 1L)); + + Engine.Delete delete = new Engine.Delete( + "1", + newUid("1"), + UNASSIGNED_SEQ_NO, + primaryTerm.get(), + 2L, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.currentTimeMillis(), + UNASSIGNED_SEQ_NO, + 1L + ); + DeletionStrategy plannedStrategy = planner.planOperationAsPrimary(delete); + + assertFalse(plannedStrategy.executeOpOnEngine); + assertFalse(plannedStrategy.currentlyDeleted); + assertFalse(plannedStrategy.addStaleOpToEngine); + assertEquals(Versions.NOT_FOUND, plannedStrategy.version); + assertEquals(0, plannedStrategy.reservedDocs); + assertTrue(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsPrimaryVersionConflictDocumentDeleted() throws IOException { + DeletionStrategyPlanner planner = constructDeletionStrategyPlanner(new DeleteVersionValue(1L, 5L, 1L, 1L)); + + Engine.Delete delete = new Engine.Delete( + "1", + newUid("1"), + UNASSIGNED_SEQ_NO, + primaryTerm.get(), + 2L, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.currentTimeMillis(), + UNASSIGNED_SEQ_NO, + 1L + ); + DeletionStrategy plannedStrategy = planner.planOperationAsPrimary(delete); + + assertFalse(plannedStrategy.executeOpOnEngine); + assertTrue(plannedStrategy.currentlyDeleted); + assertFalse(plannedStrategy.addStaleOpToEngine); + assertEquals(Versions.NOT_FOUND, plannedStrategy.version); + assertEquals(0, plannedStrategy.reservedDocs); + assertTrue(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsPrimaryVersionConflictSeqNoMismatch() throws IOException { + VersionValue versionValue = new IndexVersionValue(null, 2L, 10L, 2L); + DeletionStrategyPlanner planner = constructDeletionStrategyPlanner(versionValue); + + Engine.Delete delete = new Engine.Delete( + "1", + newUid("1"), + 5L, + primaryTerm.get(), + 1L, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.currentTimeMillis(), + 5L, + 1L + ); + DeletionStrategy plannedStrategy = planner.planOperationAsPrimary(delete); + + assertFalse(plannedStrategy.executeOpOnEngine); + assertFalse(plannedStrategy.currentlyDeleted); + assertFalse(plannedStrategy.addStaleOpToEngine); + assertEquals(Versions.NOT_FOUND, plannedStrategy.version); + assertEquals(0, plannedStrategy.reservedDocs); + assertTrue(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsPrimaryVersionConflictForWrites() throws IOException { + DeletionStrategyPlanner planner = constructDeletionStrategyPlanner(new DeleteVersionValue(1L, 5L, 1L, 1L)); + + Engine.Delete delete = new Engine.Delete( + "1", + newUid("1"), + 1L, + primaryTerm.get(), + 3L, + VersionType.EXTERNAL, + Engine.Operation.Origin.PRIMARY, + System.currentTimeMillis(), + 1L, + 0L + ); + DeletionStrategy plannedStrategy = planner.planOperationAsPrimary(delete); + + assertFalse(plannedStrategy.executeOpOnEngine); + assertTrue(plannedStrategy.currentlyDeleted); + assertFalse(plannedStrategy.addStaleOpToEngine); + assertEquals(Versions.NOT_FOUND, plannedStrategy.version); + assertEquals(0, plannedStrategy.reservedDocs); + assertTrue(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsNonPrimaryProcessedBefore() throws IOException { + DeletionStrategyPlanner planner = constructDeletionStrategyPlannerForNonPrimary(true, OpVsEngineDocStatus.DOC_NOT_FOUND); + + Engine.Delete delete = new Engine.Delete( + "1", + newUid("1"), + 10L, + primaryTerm.get(), + 1L, + null, + Engine.Operation.Origin.REPLICA, + System.currentTimeMillis(), + UNASSIGNED_SEQ_NO, + 0L + ); + DeletionStrategy plannedStrategy = planner.planOperationAsNonPrimary(delete); + DeletionStrategy expectedStrategy = DeletionStrategy.processButSkipEngine(false, delete.version()); + + assertEquals(expectedStrategy, plannedStrategy); + assertFalse(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsNonPrimaryProcessAsStaleOp() throws IOException { + DeletionStrategyPlanner planner = constructDeletionStrategyPlannerForNonPrimary(false, OpVsEngineDocStatus.OP_STALE_OR_EQUAL); + + Engine.Delete delete = new Engine.Delete( + "1", + newUid("1"), + UNASSIGNED_SEQ_NO, + primaryTerm.get(), + 1L, + null, + Engine.Operation.Origin.REPLICA, + System.currentTimeMillis(), + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM + ); + DeletionStrategy plannedStrategy = planner.planOperationAsNonPrimary(delete); + DeletionStrategy expectedStrategy = DeletionStrategy.processAsStaleOp(delete.version()); + + assertEquals(expectedStrategy, plannedStrategy); + assertFalse(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsNonPrimaryProcessNormally() throws IOException { + DeletionStrategyPlanner planner = constructDeletionStrategyPlannerForNonPrimary(false, OpVsEngineDocStatus.DOC_NOT_FOUND); + + Engine.Delete delete = new Engine.Delete( + "1", + newUid("1"), + UNASSIGNED_SEQ_NO, + primaryTerm.get(), + 1L, + null, + Engine.Operation.Origin.REPLICA, + System.currentTimeMillis(), + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM + ); + DeletionStrategy plannedStrategy = planner.planOperationAsNonPrimary(delete); + DeletionStrategy expectedStrategy = DeletionStrategy.processNormally(true, delete.version(), 0); + + assertEquals(expectedStrategy, plannedStrategy); + assertFalse(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsNonPrimaryProcessNormallyDocumentExists() throws IOException { + DeletionStrategyPlanner planner = new DeletionStrategyPlanner( + replicaEngine.engineConfig.getIndexSettings(), + replicaEngine.shardId, + op -> false, + op -> OpVsEngineDocStatus.OP_NEWER, + (op, refresh) -> null, + (op, docs) -> null, + () -> true + ); + + Engine.Delete delete = new Engine.Delete( + "1", + newUid("1"), + UNASSIGNED_SEQ_NO, + primaryTerm.get(), + 1L, + null, + Engine.Operation.Origin.REPLICA, + System.currentTimeMillis(), + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM + ); + DeletionStrategy plannedStrategy = planner.planOperationAsNonPrimary(delete); + DeletionStrategy expectedStrategy = DeletionStrategy.processNormally(false, delete.version(), 0); + + assertEquals(expectedStrategy, plannedStrategy); + assertFalse(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsNonPrimaryWithSegRepEnabled() throws IOException { + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), "SEGMENT"); + final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); + EngineConfig engineConfig = config(indexSettings, store, createTempDir(), newMergePolicy(), null); + + DeletionStrategyPlanner planner = new DeletionStrategyPlanner( + engineConfig.getIndexSettings(), + replicaEngine.shardId, + op -> false, + op -> OpVsEngineDocStatus.OP_STALE_OR_EQUAL, + (op, refresh) -> null, + (op, docs) -> null, + () -> true + ); + + Engine.Delete delete = new Engine.Delete( + "1", + newUid("1"), + 10L, + primaryTerm.get(), + UNASSIGNED_SEQ_NO, + null, + Engine.Operation.Origin.REPLICA, + System.currentTimeMillis(), + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM + ); + DeletionStrategy plannedStrategy = planner.planOperationAsNonPrimary(delete); + DeletionStrategy expectedStrategy = DeletionStrategy.processButSkipEngine(false, delete.version()); + + assertEquals(expectedStrategy, plannedStrategy); + assertFalse(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + private DeletionStrategyPlanner constructDeletionStrategyPlanner(VersionValue versionValue) { + return new DeletionStrategyPlanner( + engine.engineConfig.getIndexSettings(), + engine.shardId, + op -> false, + op -> OpVsEngineDocStatus.DOC_NOT_FOUND, + (op, refresh) -> versionValue, + (op, docs) -> null, + () -> true + ); + } + + private DeletionStrategyPlanner constructDeletionStrategyPlannerForNonPrimary(boolean processedBefore, OpVsEngineDocStatus docStatus) { + return new DeletionStrategyPlanner( + replicaEngine.engineConfig.getIndexSettings(), + replicaEngine.shardId, + op -> processedBefore, + op -> docStatus, + (op, refresh) -> null, + (op, docs) -> null, + () -> true + ); + } +} diff --git a/server/src/test/java/org/opensearch/index/engine/IndexingStrategyPlannerTests.java b/server/src/test/java/org/opensearch/index/engine/IndexingStrategyPlannerTests.java new file mode 100644 index 0000000000000..8a7e283ed2881 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/engine/IndexingStrategyPlannerTests.java @@ -0,0 +1,378 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.opensearch.action.index.IndexRequest; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.CheckedBiFunction; +import org.opensearch.common.lucene.uid.Versions; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.VersionType; +import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.test.IndexSettingsModule; + +import java.io.IOException; +import java.util.function.BiFunction; +import java.util.function.Predicate; + +import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + +public class IndexingStrategyPlannerTests extends EngineTestCase { + + public void testPlanOperationAsPrimaryWithNewDocumentProcessedNormally() throws IOException { + IndexingStrategyPlanner planner = constructDefaultIndexingStrategyPlanner(); + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index index = new Engine.Index(newUid(doc), primaryTerm.get(), doc); + IndexingStrategy plannedStrategy = planner.planOperationAsPrimary(index); + IndexingStrategy expectedStrategy = IndexingStrategy.processNormally(true, 1, 1); + + assertEquals(expectedStrategy, plannedStrategy); + assertFalse(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsPrimaryWithExistingDocumentProcessedNormally() throws IOException { + VersionValue versionValue = new IndexVersionValue(null, randomLong(), 5L, 1L); + IndexingStrategyPlanner planner = constructIndexingStrategyPlanner(versionValue); + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index index = new Engine.Index(newUid(doc), primaryTerm.get(), doc); + IndexingStrategy plannedStrategy = planner.planOperationAsPrimary(index); + IndexingStrategy expectedStrategy = IndexingStrategy.processNormally(false, versionValue.version + 1, 1); + + assertEquals(expectedStrategy, plannedStrategy); + assertFalse(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsPrimaryOptimizedAppendOnly() throws IOException { + IndexingStrategyPlanner planner = constructDefaultIndexingStrategyPlanner(); + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index index = indexForDoc(doc); + IndexingStrategy plannedStrategy = planner.planOperationAsPrimary(index); + IndexingStrategy expectedStrategy = IndexingStrategy.optimizedAppendOnly(1L, 1); + + assertEquals(expectedStrategy, plannedStrategy); + assertFalse(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsPrimaryFailingWithTooManyDocs() throws IOException { + Exception tooManyDocsException = new RuntimeException("too many docs"); + IndexingStrategyPlanner planner = constructIndexingStrategyPlanner(tooManyDocsException); + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index index = indexForDoc( + doc, + UNASSIGNED_SEQ_NO, + Versions.MATCH_ANY, + VersionType.INTERNAL, + randomLongBetween(1, Long.MAX_VALUE), + 0 + ); + IndexingStrategy plannedStrategy = planner.planOperationAsPrimary(index); + IndexingStrategy expectedStrategy = IndexingStrategy.failAsTooManyDocs(tooManyDocsException); + + assertEquals(expectedStrategy, plannedStrategy); + assertTrue(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsPrimaryVersionConflictDocumentNotFound() throws IOException { + IndexingStrategyPlanner planner = constructDefaultIndexingStrategyPlanner(); + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index index = indexForDoc(doc, 5L, 1L, VersionType.INTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, 1L); + IndexingStrategy plannedStrategy = planner.planOperationAsPrimary(index); + + assertFalse(plannedStrategy.executeOpOnEngine); + assertFalse(plannedStrategy.useUpdateDocument); + assertFalse(plannedStrategy.addStaleOpToEngine); + assertEquals(Versions.NOT_FOUND, plannedStrategy.version); + assertTrue(plannedStrategy.currentNotFoundOrDeleted); + assertEquals(0, plannedStrategy.reservedDocs); + assertTrue(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsPrimaryVersionConflictSeqNoMismatch() throws IOException { + IndexingStrategyPlanner planner = constructIndexingStrategyPlanner(new IndexVersionValue(null, 2L, 10L, 2L)); + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index index = indexForDoc(doc, 5L, 1L, VersionType.INTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, 1L); + IndexingStrategy plannedStrategy = planner.planOperationAsPrimary(index); + + assertFalse(plannedStrategy.executeOpOnEngine); + assertFalse(plannedStrategy.useUpdateDocument); + assertFalse(plannedStrategy.addStaleOpToEngine); + assertEquals(Versions.NOT_FOUND, plannedStrategy.version); + assertFalse(plannedStrategy.currentNotFoundOrDeleted); + assertEquals(0, plannedStrategy.reservedDocs); + assertTrue(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsPrimaryVersionConflictForWrites() throws IOException { + IndexingStrategyPlanner planner = constructIndexingStrategyPlanner(new IndexVersionValue(null, 5L, 10L, 1L)); + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index index = indexForDoc(doc, UNASSIGNED_SEQ_NO, 3L, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, 0L); + IndexingStrategy plannedStrategy = planner.planOperationAsPrimary(index); + + assertFalse(plannedStrategy.executeOpOnEngine); + assertFalse(plannedStrategy.useUpdateDocument); + assertFalse(plannedStrategy.addStaleOpToEngine); + assertEquals(Versions.NOT_FOUND, plannedStrategy.version); + assertFalse(plannedStrategy.currentNotFoundOrDeleted); + assertEquals(0, plannedStrategy.reservedDocs); + assertTrue(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsPrimaryAppendOnlyIndexRetry() throws IOException { + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey(), "true"); + final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); + EngineConfig engineConfig = config(indexSettings, store, createTempDir(), newMergePolicy(), null); + + IndexingStrategyPlanner planner = constructIndexingStrategyPlanner(engineConfig, new IndexVersionValue(null, 5L, 10L, 1L)); + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index index = indexForDoc( + doc, + UNASSIGNED_SEQ_NO, + Versions.MATCH_ANY, + VersionType.INTERNAL, + IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, + 0L + ); + IndexingStrategy plannedStrategy = planner.planOperationAsPrimary(index); + + assertFalse(plannedStrategy.executeOpOnEngine); + assertFalse(plannedStrategy.useUpdateDocument); + assertTrue(plannedStrategy.addStaleOpToEngine); + assertEquals(5L, plannedStrategy.version); + assertFalse(plannedStrategy.currentNotFoundOrDeleted); + assertEquals(0, plannedStrategy.reservedDocs); + assertTrue(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsNonPrimaryProcessedBefore() throws IOException { + IndexingStrategyPlanner planner = constructIndexingStrategyPlanner( + operation -> true, + (operation, aBoolean) -> null, + (operation, integer) -> null + ); + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index index = replicaIndexForDoc(doc, 1L, 10L, false); + IndexingStrategy plannedStrategy = planner.planOperationAsNonPrimary(index); + IndexingStrategy expectedStrategy = IndexingStrategy.processButSkipEngine(false, index.version()); + + assertEquals(expectedStrategy, plannedStrategy); + assertFalse(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsNonPrimaryOptimizedAppendOnly() throws IOException { + IndexingStrategyPlanner planner = new IndexingStrategyPlanner( + engine.engineConfig.getIndexSettings(), + engine.shardId, + engine.versionMap, + () -> 0L, + () -> 5L, + () -> 10L, + op -> false, + op -> OpVsEngineDocStatus.DOC_NOT_FOUND, + (op, refresh) -> null, + (timestamp, append) -> {}, + (op, docs) -> null + ); + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index index = replicaIndexForDoc(doc, 1L, 15L, false); + IndexingStrategy plannedStrategy = planner.planOperationAsNonPrimary(index); + IndexingStrategy expectedStrategy = IndexingStrategy.optimizedAppendOnly(index.version(), 0); + + assertEquals(expectedStrategy, plannedStrategy); + assertFalse(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsNonPrimaryProcessAsStaleOp() throws IOException { + IndexingStrategyPlanner planner = new IndexingStrategyPlanner( + engine.engineConfig.getIndexSettings(), + engine.shardId, + engine.versionMap, + () -> 0L, + () -> 15L, + () -> 10L, + op -> false, + op -> OpVsEngineDocStatus.OP_STALE_OR_EQUAL, + (op, refresh) -> null, + (timestamp, append) -> {}, + (op, docs) -> null + ); + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index index = replicaIndexForDoc(doc, 1L, 12L, false); + IndexingStrategy plannedStrategy = planner.planOperationAsNonPrimary(index); + IndexingStrategy expectedStrategy = IndexingStrategy.processAsStaleOp(index.version()); + + assertEquals(expectedStrategy, plannedStrategy); + assertFalse(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsNonPrimaryProcessNormally() throws IOException { + IndexingStrategyPlanner planner = new IndexingStrategyPlanner( + engine.engineConfig.getIndexSettings(), + engine.shardId, + engine.versionMap, + () -> 0L, + () -> 15L, + () -> 10L, + op -> false, + op -> OpVsEngineDocStatus.DOC_NOT_FOUND, + (op, refresh) -> null, + (timestamp, append) -> {}, + (op, docs) -> null + ); + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index index = replicaIndexForDoc(doc, 1L, 12L, false); + IndexingStrategy plannedStrategy = planner.planOperationAsNonPrimary(index); + IndexingStrategy expectedStrategy = IndexingStrategy.processNormally(true, index.version(), 0); + + assertEquals(expectedStrategy, plannedStrategy); + assertFalse(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsNonPrimaryProcessNormallyDocumentExists() throws IOException { + IndexingStrategyPlanner planner = new IndexingStrategyPlanner( + engine.engineConfig.getIndexSettings(), + engine.shardId, + engine.versionMap, + () -> 0L, + () -> 15L, + () -> 10L, + op -> false, + op -> OpVsEngineDocStatus.OP_NEWER, + (op, refresh) -> null, + (timestamp, append) -> {}, + (op, docs) -> null + ); + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index index = replicaIndexForDoc(doc, 1L, 12L, false); + IndexingStrategy plannedStrategy = planner.planOperationAsNonPrimary(index); + IndexingStrategy expectedStrategy = IndexingStrategy.processNormally(false, index.version(), 0); + + assertEquals(expectedStrategy, plannedStrategy); + assertFalse(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + public void testPlanOperationAsNonPrimaryWithSegRepEnabled() throws IOException { + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), "SEGMENT"); + final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); + EngineConfig engineConfig = config(indexSettings, store, createTempDir(), newMergePolicy(), null); + + IndexingStrategyPlanner planner = new IndexingStrategyPlanner( + engineConfig.getIndexSettings(), + engine.shardId, + engine.versionMap, + () -> 0L, + () -> 15L, + () -> 10L, + op -> false, + op -> OpVsEngineDocStatus.OP_STALE_OR_EQUAL, + (op, refresh) -> null, + (timestamp, append) -> {}, + (op, docs) -> null + ); + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index index = replicaIndexForDoc(doc, 1L, 12L, false); + IndexingStrategy plannedStrategy = planner.planOperationAsNonPrimary(index); + IndexingStrategy expectedStrategy = IndexingStrategy.processButSkipEngine(false, index.version()); + + assertEquals(expectedStrategy, plannedStrategy); + assertFalse(plannedStrategy.earlyResultOnPreFlightError.isPresent()); + } + + private Engine.Index indexForDoc( + ParsedDocument doc, + long seqNo, + long version, + VersionType versionType, + long autoGeneratedIdTimestamp, + long ifPrimaryTerm + ) { + return new Engine.Index( + newUid(doc), + doc, + seqNo, + primaryTerm.get(), + version, + versionType, + Engine.Operation.Origin.PRIMARY, + System.nanoTime(), + autoGeneratedIdTimestamp, + false, + seqNo, + ifPrimaryTerm + ); + } + + private IndexingStrategyPlanner constructDefaultIndexingStrategyPlanner() { + return constructIndexingStrategyPlanner(operation -> false, (operation, aBoolean) -> null, (op, docs) -> null); + } + + private IndexingStrategyPlanner constructIndexingStrategyPlanner(VersionValue versionValue) { + return constructIndexingStrategyPlanner(operation -> false, (operation, aBoolean) -> versionValue, (op, docs) -> null); + } + + private IndexingStrategyPlanner constructIndexingStrategyPlanner(Exception e) { + return constructIndexingStrategyPlanner(operation -> false, (operation, aBoolean) -> null, (op, docs) -> e); + } + + private IndexingStrategyPlanner constructIndexingStrategyPlanner(EngineConfig engineConfig, VersionValue versionValue) { + return new IndexingStrategyPlanner( + engineConfig.getIndexSettings(), + engine.shardId, + engine.versionMap, + () -> 0L, + () -> 0L, + () -> 0L, + operation -> false, + op -> OpVsEngineDocStatus.DOC_NOT_FOUND, + (operation, aBoolean) -> versionValue, + (timestamp, append) -> {}, + (op, docs) -> null + ); + } + + private IndexingStrategyPlanner constructIndexingStrategyPlanner( + Predicate hasBeenProcessedBefore, + CheckedBiFunction docVersionSupplier, + BiFunction tryAcquireInFlightDocs + ) { + return new IndexingStrategyPlanner( + engine.engineConfig.getIndexSettings(), + engine.shardId, + engine.versionMap, + () -> 0L, + () -> 0L, + () -> 0L, + hasBeenProcessedBefore, + op -> OpVsEngineDocStatus.DOC_NOT_FOUND, + docVersionSupplier, + (timestamp, append) -> {}, + tryAcquireInFlightDocs + ); + } +}