Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,12 @@ private boolean resolvePipelinesForActionRequests(
}
if (docRequest != null) {
if (updateRequest.docAsUpsert()) {
// Save the original doc source before pipeline processing.
// When the document already exists, the merge in UpdateHelper should use the
// original user-provided source rather than the pipeline-modified source.
// This fixes the inconsistency between single upsert and bulk upsert behavior
// with ingest pipelines (see #10864).
updateRequest.saveRawDocSource();
indexRequestHasPipeline |= ingestService.resolvePipelines(actionRequest, docRequest, metadata);
} else {
// In the case when doc as upsert is false or not defined, we only resolve system ingest pipelines.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,23 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu
final XContentType updateSourceContentType = sourceAndContent.v1();
final Map<String, Object> updatedSourceAsMap = sourceAndContent.v2();

final boolean noop = !XContentHelper.update(updatedSourceAsMap, currentRequest.sourceAsMap(), detectNoop);
// When docAsUpsert is true and the document exists, use the original (pre-pipeline) doc source
// for merging. This ensures that the ingest pipeline does not corrupt the partial doc before
// it is merged with the existing document, matching the behavior of single upsert operations
// where the pipeline runs on the full merged document. See #10864.
final Map<String, Object> docSourceForMerge;
if (request.docAsUpsert() && request.rawDocSource() != null && request.rawDocSourceContentType() != null) {
Tuple<? extends MediaType, Map<String, Object>> rawSourceAndContent = XContentHelper.convertToMap(
request.rawDocSource(),
true,
request.rawDocSourceContentType()
);
docSourceForMerge = rawSourceAndContent.v2();
} else {
docSourceForMerge = currentRequest.sourceAsMap();
}

final boolean noop = !XContentHelper.update(updatedSourceAsMap, docSourceForMerge, detectNoop);

// We can only actually turn the update into a noop if detectNoop is true to preserve backwards compatibility and to handle cases
// where users repopulating multi-fields or adding synonyms, etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.shard.ShardId;
Expand Down Expand Up @@ -158,6 +159,18 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
@Nullable
private IndexRequest doc;

/**
* Stores the original (pre-pipeline) source of the doc when docAsUpsert is true.
* This is used to ensure that when a bulk update with doc_as_upsert merges the doc
* into an existing document, the merge uses the original user-provided source rather
* than the pipeline-modified source.
*/
@Nullable
private BytesReference rawDocSource;

@Nullable
private MediaType rawDocSourceContentType;

public UpdateRequest() {}

public UpdateRequest(StreamInput in) throws IOException {
Expand Down Expand Up @@ -191,6 +204,12 @@ public UpdateRequest(@Nullable ShardId shardId, StreamInput in) throws IOExcepti
detectNoop = in.readBoolean();
scriptedUpsert = in.readBoolean();
requireAlias = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_3_6_0)) {
rawDocSource = in.readOptionalBytesReference();
if (rawDocSource != null) {
rawDocSourceContentType = in.readMediaType();
}
}
}

public UpdateRequest(String index, String id) {
Expand Down Expand Up @@ -839,6 +858,34 @@ public UpdateRequest docAsUpsert(boolean shouldUpsertDoc) {
return this;
}

/**
* Saves the original (pre-pipeline) source of the doc for use during update merging.
* When docAsUpsert is true and the document already exists, the merge should use the
* original source rather than the pipeline-modified source.
*/
public void saveRawDocSource() {
if (this.doc != null) {
this.rawDocSource = this.doc.source();
this.rawDocSourceContentType = this.doc.getContentType();
}
}

/**
* Returns the original (pre-pipeline) source of the doc, or null if not saved.
*/
@Nullable
public BytesReference rawDocSource() {
return this.rawDocSource;
}

/**
* Returns the content type of the original (pre-pipeline) doc source.
*/
@Nullable
public MediaType rawDocSourceContentType() {
return this.rawDocSourceContentType;
}

public boolean scriptedUpsert() {
return this.scriptedUpsert;
}
Expand Down Expand Up @@ -917,6 +964,12 @@ private void doWrite(StreamOutput out, boolean thin) throws IOException {
out.writeBoolean(detectNoop);
out.writeBoolean(scriptedUpsert);
out.writeBoolean(requireAlias);
if (out.getVersion().onOrAfter(Version.V_3_6_0)) {
out.writeOptionalBytesReference(rawDocSource);
if (rawDocSource != null) {
rawDocSourceContentType.writeTo(out);
}
}
}

@Override
Expand Down