Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.mapper;

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.index.mapper.extrasource.BytesValue;
import org.opensearch.index.mapper.extrasource.ExtraFieldValues;
import org.opensearch.index.mapper.extrasource.ExtraFieldValuesMapperPlugin;
import org.opensearch.index.mapper.extrasource.FloatArrayValue;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.is;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 1)
public class ExtraFieldValuesRequestIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(ExtraFieldValuesMapperPlugin.class);
}

private static void assertTestMapping(OpenSearchIntegTestCase testCase, String index, Settings settings) throws Exception {
assertAcked(
testCase.prepareCreate(index)
.setSettings(settings)
.setMapping(
jsonBuilder().startObject()
.startObject("properties")

.startObject("field")
.field("type", ExtraFieldValuesMapperPlugin.EXTRA_FIELDS_TEST)
.endObject()

.startObject("field_type")
.field("type", "keyword")
.field("store", true)
.endObject()

.startObject("field_len")
.field("type", "integer")
.field("store", true)
.endObject()

.startObject("field_dim")
.field("type", "integer")
.field("store", true)
.endObject()

.startObject("field_f0")
.field("type", "float")
.field("store", true)
.endObject()

.endObject()
.endObject()
)
.get()
);
}

public void testIndexRequestExtraFieldValues() throws Exception {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();

String index = "test";
assertTestMapping(this, index, settings);

ExtraFieldValues efv = new ExtraFieldValues(Map.of("field", new BytesValue(new BytesArray(new byte[] { 1, 2, 3, 4 }))));

IndexRequest req = new IndexRequest(index).id("1").source("{\"other\":\"x\"}", XContentType.JSON).extraFieldValues(efv);

DocWriteResponse resp = client().index(req).actionGet();
assertThat(resp.getResult(), anyOf(is(DocWriteResponse.Result.CREATED), is(DocWriteResponse.Result.UPDATED)));
refresh(index);

GetResponse get = client().prepareGet(index, "1").setStoredFields("field_type", "field_len").get();

assertThat(get.isExists(), is(true));
assertThat(get.getFields().keySet(), hasItems("field_type", "field_len"));
assertThat(get.getField("field_type").getValue().toString(), is("BYTES"));
assertThat(((Number) get.getField("field_len").getValue()).intValue(), is(4));
}

public void testUpdateRequestDocExtraFieldValues() throws Exception {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();

String index = "test2";
assertTestMapping(this, index, settings);

// create doc without "field"
client().prepareIndex(index).setId("1").setSource("{\"other\":\"x\"}", XContentType.JSON).get();
refresh(index);

ExtraFieldValues efv = new ExtraFieldValues(Map.of("field", FloatArrayValue.fromFloatArray(new float[] { 10.5f, 20.25f })));

UpdateRequest ur = new UpdateRequest(index, "1").doc("{\"other\":\"y\"}", XContentType.JSON).docExtraFieldValues(efv);

client().update(ur).actionGet();
refresh(index);

GetResponse get = client().prepareGet(index, "1").setStoredFields("field_type", "field_dim", "field_f0").get();

assertThat(get.isExists(), is(true));
assertThat(get.getFields().keySet(), hasItems("field_type", "field_dim", "field_f0"));
assertThat(get.getField("field_type").getValue().toString(), is("FLOAT_ARRAY"));
assertThat(((Number) get.getField("field_dim").getValue()).intValue(), is(2));
assertEquals(10.5f, ((Number) get.getField("field_f0").getValue()).floatValue(), 0.0f);
}

public void testUpdateRequestUpsertExtraFieldValues() throws Exception {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();

String index = "test3";
assertTestMapping(this, index, settings);

ExtraFieldValues efv = new ExtraFieldValues(Map.of("field", new BytesValue(new BytesArray(new byte[] { 9 }))));

UpdateRequest ur = new UpdateRequest(index, "1").doc("{\"other\":\"x\"}", XContentType.JSON)
.upsert("{\"other\":\"x\"}", XContentType.JSON)
.upsertExtraFieldValues(efv);

client().update(ur).actionGet();
refresh(index);

GetResponse get = client().prepareGet(index, "1").setStoredFields("field_type", "field_len").get();

assertThat(get.isExists(), is(true));
assertThat(get.getFields().keySet(), hasItems("field_type", "field_len"));
assertThat(get.getField("field_type").getValue().toString(), is("BYTES"));
assertThat(((Number) get.getField("field_len").getValue()).intValue(), is(1));
}

public void testBulkIndexRequestCarriesExtraFieldValues() throws Exception {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();

String index = "test4";
assertTestMapping(this, index, settings);

BulkRequest bulk = new BulkRequest();

bulk.add(
new IndexRequest(index).id("1")
.source("{\"other\":\"x\"}", XContentType.JSON)
.extraFieldValues(new ExtraFieldValues(Map.of("field", new BytesValue(new BytesArray(new byte[] { 1, 2 })))))
);

bulk.add(
new IndexRequest(index).id("2")
.source("{\"other\":\"y\"}", XContentType.JSON)
.extraFieldValues(new ExtraFieldValues(Map.of("field", FloatArrayValue.fromFloatArray(new float[] { 3.0f }))))
);

BulkResponse resp = client().bulk(bulk).actionGet();
assertThat(resp.buildFailureMessage(), resp.hasFailures(), is(false));
refresh(index);

GetResponse g1 = client().prepareGet(index, "1").setStoredFields("field_type", "field_len").get();
assertThat(g1.getFields().keySet(), hasItems("field_type", "field_len"));
assertThat(g1.getField("field_type").getValue().toString(), is("BYTES"));
assertThat(((Number) g1.getField("field_len").getValue()).intValue(), is(2));

GetResponse g2 = client().prepareGet(index, "2").setStoredFields("field_type", "field_dim", "field_f0").get();
assertThat(g2.getFields().keySet(), hasItems("field_type", "field_dim", "field_f0"));
assertThat(g2.getField("field_type").getValue().toString(), is("FLOAT_ARRAY"));
assertThat(((Number) g2.getField("field_dim").getValue()).intValue(), is(1));
assertEquals(3.0f, ((Number) g2.getField("field_f0").getValue()).floatValue(), 0.0f);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.geometry.utils.Geohash;
import org.opensearch.index.MergePolicyProvider;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.mapper.extrasource.BytesValue;
import org.opensearch.index.mapper.extrasource.ExtraFieldValues;
import org.opensearch.plugins.Plugin;
import org.opensearch.script.MockScriptPlugin;
import org.opensearch.script.Script;
Expand Down Expand Up @@ -544,6 +547,22 @@ public void testUpdateRequestWithScriptAndShouldUpsertDoc() throws Exception {
}
}

public void testScriptWithExtraFieldValuesRejected() throws Exception {
createTestIndex();
ensureGreen();

Script script = new Script(ScriptType.INLINE, UPDATE_SCRIPTS, FIELD_INC_SCRIPT, Collections.singletonMap("field", "field"));

UpdateRequest req = new UpdateRequest(indexOrAlias(), "1").script(script)
.upsert(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject());

// attach extra field values to the upsert IndexRequest (since script+doc is invalid)
req.upsertRequest().extraFieldValues(new ExtraFieldValues(Map.of("k", new BytesValue(new BytesArray(new byte[] { 1, 2, 3 })))));

ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class, () -> client().update(req).actionGet());
assertThat(e.getMessage(), containsString("ExtraFieldValues are not supported with scripted updates"));
}

public void testContextVariables() throws Exception {
assertAcked(prepareCreate("test").addAlias(new Alias("alias")));
ensureGreen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,14 @@ static boolean executeBulkItemRequest(
result = primary.applyIndexOperationOnPrimary(
version,
request.versionType(),
new SourceToParse(request.index(), request.id(), request.source(), request.getContentType(), request.routing()),
new SourceToParse(
request.index(),
request.id(),
request.source(),
request.getContentType(),
request.routing(),
request.extraFieldValues()
),
request.ifSeqNo(),
request.ifPrimaryTerm(),
request.getAutoGeneratedTimestamp(),
Expand Down Expand Up @@ -909,7 +916,8 @@ private static Engine.Result performOpOnReplica(
indexRequest.id(),
indexRequest.source(),
indexRequest.getContentType(),
indexRequest.routing()
indexRequest.routing(),
indexRequest.extraFieldValues()
);
result = replica.applyIndexOperationOnReplica(
primaryResponse.getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.VersionType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.extrasource.ExtraFieldValues;
import org.opensearch.transport.client.Client;
import org.opensearch.transport.client.Requests;

Expand Down Expand Up @@ -112,6 +113,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private String routing;

private BytesReference source;
private ExtraFieldValues extraFieldValues = ExtraFieldValues.EMPTY;

private OpType opType = OpType.INDEX;

Expand Down Expand Up @@ -153,6 +155,11 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
id = in.readOptionalString();
routing = in.readOptionalString();
source = in.readBytesReference();
if (in.getVersion().onOrAfter(Version.V_3_7_0)) {
extraFieldValues = Objects.requireNonNullElse(in.readOptionalWriteable(ExtraFieldValues::new), ExtraFieldValues.EMPTY);
} else {
extraFieldValues = ExtraFieldValues.EMPTY;
}
opType = OpType.fromId(in.readByte());
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
Expand Down Expand Up @@ -496,6 +503,23 @@ public IndexRequest source(byte[] source, int offset, int length, MediaType medi
return source(new BytesArray(source, offset, length), mediaType);
}

/**
* Sets extra field values to be ingested outside of {@code _source}.
* <p>
* {@code null} clears the values and resets to {@link ExtraFieldValues#EMPTY}.
*/
public IndexRequest extraFieldValues(ExtraFieldValues values) {
this.extraFieldValues = values == null ? ExtraFieldValues.EMPTY : values;
return this;
}

/**
* Returns the extra field values associated with this request, or {@link ExtraFieldValues#EMPTY} if none.
*/
public ExtraFieldValues extraFieldValues() {
return extraFieldValues;
}

/**
* Sets the type of operation to perform.
*/
Expand Down Expand Up @@ -677,6 +701,9 @@ private void writeBody(StreamOutput out) throws IOException {
out.writeOptionalString(id);
out.writeOptionalString(routing);
out.writeBytesReference(source);
if (out.getVersion().onOrAfter(Version.V_3_7_0)) {
out.writeOptionalWriteable(extraFieldValues.isEmpty() ? null : extraFieldValues);
}
out.writeByte(opType.getId());
out.writeLong(version);
out.writeByte(versionType.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu
.id(request.id())
.routing(routing)
.source(updatedSourceAsMap, updateSourceContentType)
.extraFieldValues(currentRequest.extraFieldValues())
.setIfSeqNo(getResult.getSeqNo())
.setIfPrimaryTerm(getResult.getPrimaryTerm())
.waitForActiveShards(request.waitForActiveShards())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.VersionType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.extrasource.ExtraFieldValues;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
Expand Down Expand Up @@ -235,6 +236,13 @@ public ActionRequestValidationException validate() {
if (doc == null && docAsUpsert) {
validationException = addValidationError("doc must be specified if doc_as_upsert is enabled", validationException);
}
// ExtraFieldValues not supported for scripted updates for now
if (script != null) {
if ((doc != null && !doc.extraFieldValues().isEmpty())
|| (upsertRequest != null && !upsertRequest.extraFieldValues().isEmpty())) {
validationException = addValidationError("ExtraFieldValues are not supported with scripted updates", validationException);
}
}

validationException = DocWriteRequest.validateDocIdLength(id, validationException);

Expand Down Expand Up @@ -713,6 +721,17 @@ public UpdateRequest doc(MediaType mediaType, Object... source) {
return this;
}

/**
* Sets extra field values for the partial document update ({@code doc}).
* <p>
* These values are applied only when the update is executed using {@code doc} (i.e., no script).
* {@code null} clears the values and resets to {@link ExtraFieldValues#EMPTY}.
*/
public UpdateRequest docExtraFieldValues(ExtraFieldValues values) {
safeDoc().extraFieldValues(values);
return this;
}
Comment thread
msfroh marked this conversation as resolved.

public IndexRequest doc() {
return this.doc;
}
Expand Down Expand Up @@ -799,6 +818,16 @@ public UpdateRequest upsert(MediaType mediaType, Object... source) {
return this;
}

/**
* Sets extra field values for the upsert document ({@code upsert}).
* <p>
* {@code null} clears the values and resets to {@link ExtraFieldValues#EMPTY}.
*/
public UpdateRequest upsertExtraFieldValues(ExtraFieldValues values) {
safeUpsertRequest().extraFieldValues(values);
return this;
}

public IndexRequest upsertRequest() {
return this.upsertRequest;
}
Expand Down
Loading
Loading