diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index 0068b9d3da172..d994a621fcff6 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -426,6 +426,12 @@ private boolean resolvePipelinesForActionRequests( } if (docRequest != null) { if (updateRequest.docAsUpsert()) { + // Save the original doc source before pipeline processing. + // When the document already exists, the merge in UpdateHelper should use the + // original user-provided source rather than the pipeline-modified source. + // This fixes the inconsistency between single upsert and bulk upsert behavior + // with ingest pipelines (see #10864). + updateRequest.saveRawDocSource(); indexRequestHasPipeline |= ingestService.resolvePipelines(actionRequest, docRequest, metadata); } else { // In the case when doc as upsert is false or not defined, we only resolve system ingest pipelines. diff --git a/server/src/main/java/org/opensearch/action/update/UpdateHelper.java b/server/src/main/java/org/opensearch/action/update/UpdateHelper.java index 37d77649fadd6..7a0d723fd60f4 100644 --- a/server/src/main/java/org/opensearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/opensearch/action/update/UpdateHelper.java @@ -220,7 +220,23 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu final XContentType updateSourceContentType = sourceAndContent.v1(); final Map updatedSourceAsMap = sourceAndContent.v2(); - final boolean noop = !XContentHelper.update(updatedSourceAsMap, currentRequest.sourceAsMap(), detectNoop); + // When docAsUpsert is true and the document exists, use the original (pre-pipeline) doc source + // for merging. This ensures that the ingest pipeline does not corrupt the partial doc before + // it is merged with the existing document, matching the behavior of single upsert operations + // where the pipeline runs on the full merged document. See #10864. + final Map docSourceForMerge; + if (request.docAsUpsert() && request.rawDocSource() != null && request.rawDocSourceContentType() != null) { + Tuple> rawSourceAndContent = XContentHelper.convertToMap( + request.rawDocSource(), + true, + request.rawDocSourceContentType() + ); + docSourceForMerge = rawSourceAndContent.v2(); + } else { + docSourceForMerge = currentRequest.sourceAsMap(); + } + + final boolean noop = !XContentHelper.update(updatedSourceAsMap, docSourceForMerge, detectNoop); // We can only actually turn the update into a noop if detectNoop is true to preserve backwards compatibility and to handle cases // where users repopulating multi-fields or adding synonyms, etc. diff --git a/server/src/main/java/org/opensearch/action/update/UpdateRequest.java b/server/src/main/java/org/opensearch/action/update/UpdateRequest.java index 9fe894a5c2db0..39d50d02becb6 100644 --- a/server/src/main/java/org/opensearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/opensearch/action/update/UpdateRequest.java @@ -48,6 +48,7 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.ParseField; import org.opensearch.core.common.Strings; +import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.index.shard.ShardId; @@ -158,6 +159,18 @@ public class UpdateRequest extends InstanceShardOperationRequest @Nullable private IndexRequest doc; + /** + * Stores the original (pre-pipeline) source of the doc when docAsUpsert is true. + * This is used to ensure that when a bulk update with doc_as_upsert merges the doc + * into an existing document, the merge uses the original user-provided source rather + * than the pipeline-modified source. + */ + @Nullable + private BytesReference rawDocSource; + + @Nullable + private MediaType rawDocSourceContentType; + public UpdateRequest() {} public UpdateRequest(StreamInput in) throws IOException { @@ -191,6 +204,12 @@ public UpdateRequest(@Nullable ShardId shardId, StreamInput in) throws IOExcepti detectNoop = in.readBoolean(); scriptedUpsert = in.readBoolean(); requireAlias = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_3_6_0)) { + rawDocSource = in.readOptionalBytesReference(); + if (rawDocSource != null) { + rawDocSourceContentType = in.readMediaType(); + } + } } public UpdateRequest(String index, String id) { @@ -839,6 +858,34 @@ public UpdateRequest docAsUpsert(boolean shouldUpsertDoc) { return this; } + /** + * Saves the original (pre-pipeline) source of the doc for use during update merging. + * When docAsUpsert is true and the document already exists, the merge should use the + * original source rather than the pipeline-modified source. + */ + public void saveRawDocSource() { + if (this.doc != null) { + this.rawDocSource = this.doc.source(); + this.rawDocSourceContentType = this.doc.getContentType(); + } + } + + /** + * Returns the original (pre-pipeline) source of the doc, or null if not saved. + */ + @Nullable + public BytesReference rawDocSource() { + return this.rawDocSource; + } + + /** + * Returns the content type of the original (pre-pipeline) doc source. + */ + @Nullable + public MediaType rawDocSourceContentType() { + return this.rawDocSourceContentType; + } + public boolean scriptedUpsert() { return this.scriptedUpsert; } @@ -917,6 +964,12 @@ private void doWrite(StreamOutput out, boolean thin) throws IOException { out.writeBoolean(detectNoop); out.writeBoolean(scriptedUpsert); out.writeBoolean(requireAlias); + if (out.getVersion().onOrAfter(Version.V_3_6_0)) { + out.writeOptionalBytesReference(rawDocSource); + if (rawDocSource != null) { + rawDocSourceContentType.writeTo(out); + } + } } @Override