From a32852893c5f9ba9b1a3745402da480a80e5bed3 Mon Sep 17 00:00:00 2001 From: zheliu2 <770120041@qq.com> Date: Mon, 9 Mar 2026 12:21:18 -0400 Subject: [PATCH 1/6] Fix bulk upsert pipeline inconsistency (#10864) When a bulk update with doc_as_upsert=true targets an existing document, the ingest pipeline was running on the partial doc before it was merged with the existing document. This caused the pipeline to operate on incomplete data, producing incorrect results compared to a single upsert operation where the pipeline runs on the full merged document. The fix saves the original (pre-pipeline) doc source on the UpdateRequest before pipeline processing. When UpdateHelper.prepareUpdateIndexRequest() merges the doc into the existing document, it uses the saved original source instead of the pipeline-modified source, preventing the pipeline from corrupting the partial doc before merge. Signed-off-by: zheliu2 <770120041@qq.com> --- .../action/bulk/TransportBulkAction.java | 6 +++ .../action/update/UpdateHelper.java | 18 +++++++- .../action/update/UpdateRequest.java | 41 +++++++++++++++++++ 3 files changed, 64 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index 0068b9d3da172..d994a621fcff6 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -426,6 +426,12 @@ private boolean resolvePipelinesForActionRequests( } if (docRequest != null) { if (updateRequest.docAsUpsert()) { + // Save the original doc source before pipeline processing. + // When the document already exists, the merge in UpdateHelper should use the + // original user-provided source rather than the pipeline-modified source. + // This fixes the inconsistency between single upsert and bulk upsert behavior + // with ingest pipelines (see #10864). + updateRequest.saveRawDocSource(); indexRequestHasPipeline |= ingestService.resolvePipelines(actionRequest, docRequest, metadata); } else { // In the case when doc as upsert is false or not defined, we only resolve system ingest pipelines. diff --git a/server/src/main/java/org/opensearch/action/update/UpdateHelper.java b/server/src/main/java/org/opensearch/action/update/UpdateHelper.java index 37d77649fadd6..fc8fef3d40776 100644 --- a/server/src/main/java/org/opensearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/opensearch/action/update/UpdateHelper.java @@ -220,7 +220,23 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu final XContentType updateSourceContentType = sourceAndContent.v1(); final Map updatedSourceAsMap = sourceAndContent.v2(); - final boolean noop = !XContentHelper.update(updatedSourceAsMap, currentRequest.sourceAsMap(), detectNoop); + // When docAsUpsert is true and the document exists, use the original (pre-pipeline) doc source + // for merging. This ensures that the ingest pipeline does not corrupt the partial doc before + // it is merged with the existing document, matching the behavior of single upsert operations + // where the pipeline runs on the full merged document. See #10864. + final Map docSourceForMerge; + if (request.docAsUpsert() && request.rawDocSource() != null) { + Tuple> rawSourceAndContent = XContentHelper.convertToMap( + request.rawDocSource(), + true, + request.rawDocSourceContentType() + ); + docSourceForMerge = rawSourceAndContent.v2(); + } else { + docSourceForMerge = currentRequest.sourceAsMap(); + } + + final boolean noop = !XContentHelper.update(updatedSourceAsMap, docSourceForMerge, detectNoop); // We can only actually turn the update into a noop if detectNoop is true to preserve backwards compatibility and to handle cases // where users repopulating multi-fields or adding synonyms, etc. diff --git a/server/src/main/java/org/opensearch/action/update/UpdateRequest.java b/server/src/main/java/org/opensearch/action/update/UpdateRequest.java index 9fe894a5c2db0..ebddb88c98fec 100644 --- a/server/src/main/java/org/opensearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/opensearch/action/update/UpdateRequest.java @@ -48,6 +48,7 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.ParseField; import org.opensearch.core.common.Strings; +import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.index.shard.ShardId; @@ -158,6 +159,18 @@ public class UpdateRequest extends InstanceShardOperationRequest @Nullable private IndexRequest doc; + /** + * Stores the original (pre-pipeline) source of the doc when docAsUpsert is true. + * This is used to ensure that when a bulk update with doc_as_upsert merges the doc + * into an existing document, the merge uses the original user-provided source rather + * than the pipeline-modified source. This is transient and not serialized. + */ + @Nullable + private transient BytesReference rawDocSource; + + @Nullable + private transient MediaType rawDocSourceContentType; + public UpdateRequest() {} public UpdateRequest(StreamInput in) throws IOException { @@ -839,6 +852,34 @@ public UpdateRequest docAsUpsert(boolean shouldUpsertDoc) { return this; } + /** + * Saves the original (pre-pipeline) source of the doc for use during update merging. + * When docAsUpsert is true and the document already exists, the merge should use the + * original source rather than the pipeline-modified source. + */ + public void saveRawDocSource() { + if (this.doc != null) { + this.rawDocSource = this.doc.source(); + this.rawDocSourceContentType = this.doc.getContentType(); + } + } + + /** + * Returns the original (pre-pipeline) source of the doc, or null if not saved. + */ + @Nullable + public BytesReference rawDocSource() { + return this.rawDocSource; + } + + /** + * Returns the content type of the original (pre-pipeline) doc source. + */ + @Nullable + public MediaType rawDocSourceContentType() { + return this.rawDocSourceContentType; + } + public boolean scriptedUpsert() { return this.scriptedUpsert; } From c9c2af0f05a7096c7760a98f82f3e6f4852fe7ae Mon Sep 17 00:00:00 2001 From: zheliu2 <770120041@qq.com> Date: Mon, 9 Mar 2026 13:29:29 -0400 Subject: [PATCH 2/6] Retrigger CI (attempt 2) Signed-off-by: zheliu2 <770120041@qq.com> From a496a8d9c02c70f253a8ba6bbe8a8c2e98c70ee3 Mon Sep 17 00:00:00 2001 From: zheliu2 <770120041@qq.com> Date: Mon, 9 Mar 2026 14:57:18 -0400 Subject: [PATCH 3/6] Retrigger CI Signed-off-by: zheliu2 <770120041@qq.com> From 644271b2b53fe059a4cb126f404454d1597e2551 Mon Sep 17 00:00:00 2001 From: zheliu2 <770120041@qq.com> Date: Mon, 9 Mar 2026 15:02:31 -0400 Subject: [PATCH 4/6] Fix rawDocSource serialization for distributed correctness Serialize rawDocSource and rawDocSourceContentType fields in UpdateRequest so they survive node-to-node transport. Previously these were transient, meaning the pre-pipeline source fix would silently fall back to pipeline-modified source when the request was forwarded to a different node. Signed-off-by: zheliu2 <770120041@qq.com> --- .../opensearch/action/update/UpdateHelper.java | 2 +- .../action/update/UpdateRequest.java | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/update/UpdateHelper.java b/server/src/main/java/org/opensearch/action/update/UpdateHelper.java index fc8fef3d40776..7a0d723fd60f4 100644 --- a/server/src/main/java/org/opensearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/opensearch/action/update/UpdateHelper.java @@ -225,7 +225,7 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu // it is merged with the existing document, matching the behavior of single upsert operations // where the pipeline runs on the full merged document. See #10864. final Map docSourceForMerge; - if (request.docAsUpsert() && request.rawDocSource() != null) { + if (request.docAsUpsert() && request.rawDocSource() != null && request.rawDocSourceContentType() != null) { Tuple> rawSourceAndContent = XContentHelper.convertToMap( request.rawDocSource(), true, diff --git a/server/src/main/java/org/opensearch/action/update/UpdateRequest.java b/server/src/main/java/org/opensearch/action/update/UpdateRequest.java index ebddb88c98fec..39d50d02becb6 100644 --- a/server/src/main/java/org/opensearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/opensearch/action/update/UpdateRequest.java @@ -163,13 +163,13 @@ public class UpdateRequest extends InstanceShardOperationRequest * Stores the original (pre-pipeline) source of the doc when docAsUpsert is true. * This is used to ensure that when a bulk update with doc_as_upsert merges the doc * into an existing document, the merge uses the original user-provided source rather - * than the pipeline-modified source. This is transient and not serialized. + * than the pipeline-modified source. */ @Nullable - private transient BytesReference rawDocSource; + private BytesReference rawDocSource; @Nullable - private transient MediaType rawDocSourceContentType; + private MediaType rawDocSourceContentType; public UpdateRequest() {} @@ -204,6 +204,12 @@ public UpdateRequest(@Nullable ShardId shardId, StreamInput in) throws IOExcepti detectNoop = in.readBoolean(); scriptedUpsert = in.readBoolean(); requireAlias = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_3_6_0)) { + rawDocSource = in.readOptionalBytesReference(); + if (rawDocSource != null) { + rawDocSourceContentType = in.readMediaType(); + } + } } public UpdateRequest(String index, String id) { @@ -958,6 +964,12 @@ private void doWrite(StreamOutput out, boolean thin) throws IOException { out.writeBoolean(detectNoop); out.writeBoolean(scriptedUpsert); out.writeBoolean(requireAlias); + if (out.getVersion().onOrAfter(Version.V_3_6_0)) { + out.writeOptionalBytesReference(rawDocSource); + if (rawDocSource != null) { + rawDocSourceContentType.writeTo(out); + } + } } @Override From d524da573bb179a4a7c32dad294d3f934e1043bf Mon Sep 17 00:00:00 2001 From: zheliu2 <770120041@qq.com> Date: Mon, 9 Mar 2026 16:28:38 -0400 Subject: [PATCH 5/6] Retrigger CI (attempt 2) Signed-off-by: zheliu2 <770120041@qq.com> From fcfa50d622bca3be3f6bf045ce823a9b6161a53c Mon Sep 17 00:00:00 2001 From: zheliu2 <770120041@qq.com> Date: Mon, 9 Mar 2026 17:42:05 -0400 Subject: [PATCH 6/6] Retrigger CI (attempt 3) Signed-off-by: zheliu2 <770120041@qq.com>