Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 3.x]
### Added
- Upgrade opensearch-protobufs dependency to 0.7.0 and update transport-grpc module compatibility ([#19003](https://github.com/opensearch-project/OpenSearch/pull/19003))

### Changed

Expand Down
2 changes: 1 addition & 1 deletion modules/transport-grpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies {
implementation "io.grpc:grpc-stub:${versions.grpc}"
implementation "io.grpc:grpc-util:${versions.grpc}"
implementation "io.perfmark:perfmark-api:0.27.0"
implementation "org.opensearch:protobufs:0.6.0"
implementation "org.opensearch:protobufs:0.7.0"
testImplementation project(':test:framework')
}

Expand Down
1 change: 0 additions & 1 deletion modules/transport-grpc/licenses/protobufs-0.6.0.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions modules/transport-grpc/licenses/protobufs-0.7.0.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
d83e08f04da8ec6a5b8925934e5a599b7592aba0
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ public void testDocumentServiceBulk() throws Exception {
DocumentServiceGrpc.DocumentServiceBlockingStub documentStub = DocumentServiceGrpc.newBlockingStub(channel);

// Create a bulk request with an index operation
IndexOperation indexOp = IndexOperation.newBuilder().setIndex(indexName).setId("1").build();
IndexOperation indexOp = IndexOperation.newBuilder().setUnderscoreIndex(indexName).setUnderscoreId("1").build();

BulkRequestBody requestBody = BulkRequestBody.newBuilder()
.setIndex(indexOp)
.setDoc(com.google.protobuf.ByteString.copyFromUtf8(DEFAULT_DOCUMENT_SOURCE))
.setOperationContainer(org.opensearch.protobufs.OperationContainer.newBuilder().setIndex(indexOp).build())
.setObject(com.google.protobuf.ByteString.copyFromUtf8(DEFAULT_DOCUMENT_SOURCE))
.build();

BulkRequest bulkRequest = BulkRequest.newBuilder().addRequestBody(requestBody).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,25 @@ public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.
String[] sourceIncludes = null;

// Set up source context if source parameters are provided
if (request.hasSource()) {
switch (request.getSource().getSourceConfigParamCase()) {
case SourceConfigParam.SourceConfigParamCase.BOOL_VALUE:
fetchSource = request.getSource().getBoolValue();
if (request.hasUnderscoreSource()) {
switch (request.getUnderscoreSource().getSourceConfigParamCase()) {
case BOOL_VALUE:
fetchSource = request.getUnderscoreSource().getBoolValue();
break;
case SourceConfigParam.SourceConfigParamCase.STRING_ARRAY:
sourceIncludes = request.getSource().getStringArray().getStringArrayList().toArray(new String[0]);
case STRING_ARRAY:
sourceIncludes = request.getUnderscoreSource().getStringArray().getStringArrayList().toArray(new String[0]);
break;
default:
throw new UnsupportedOperationException("Invalid sourceConfig provided.");
}
}

if (request.getSourceIncludesList().size() > 0) {
sourceIncludes = request.getSourceIncludesList().toArray(new String[0]);
if (request.getUnderscoreSourceIncludesCount() > 0) {
sourceIncludes = request.getUnderscoreSourceIncludesList().toArray(new String[0]);
}

if (request.getSourceExcludesList().size() > 0) {
sourceExcludes = request.getSourceExcludesList().toArray(new String[0]);
if (request.getUnderscoreSourceExcludesCount() > 0) {
sourceExcludes = request.getUnderscoreSourceExcludesList().toArray(new String[0]);
}
if (fetchSource != null || sourceIncludes != null || sourceExcludes != null) {
return new FetchSourceContext(fetchSource == null ? true : fetchSource, sourceIncludes, sourceExcludes);
Expand Down Expand Up @@ -118,22 +118,16 @@ public static FetchSourceContext fromProto(SourceConfig sourceConfig) {
String[] excludes = Strings.EMPTY_ARRAY;
if (sourceConfig.getSourceConfigCase() == SourceConfig.SourceConfigCase.FETCH) {
fetchSource = sourceConfig.getFetch();
} else if (sourceConfig.hasIncludes()) {
ArrayList<String> list = new ArrayList<>();
for (String string : sourceConfig.getIncludes().getStringArrayList()) {
list.add(string);
}
includes = list.toArray(new String[0]);
} else if (sourceConfig.hasFilter()) {
SourceFilter sourceFilter = sourceConfig.getFilter();
if (!sourceFilter.getIncludesList().isEmpty()) {
if (sourceFilter.getIncludesCount() > 0) {
List<String> includesList = new ArrayList<>();
for (String s : sourceFilter.getIncludesList()) {
includesList.add(s);
}
includes = includesList.toArray(new String[0]);
}
if (!sourceFilter.getExcludesList().isEmpty()) {
if (sourceFilter.getExcludesCount() > 0) {
List<String> excludesList = new ArrayList<>();
for (String s : sourceFilter.getExcludesList()) {
excludesList.add(s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ private OpTypeProtoUtils() {
*/
public static DocWriteRequest.OpType fromProto(OpType opType) {

switch (opType) {
switch (opType.getOpTypeCase()) {
case OP_TYPE_CREATE:
return DocWriteRequest.OpType.CREATE;
case OP_TYPE_INDEX:
return DocWriteRequest.OpType.INDEX;
case OPTYPE_NOT_SET:
default:
throw new UnsupportedOperationException("Invalid optype: " + opType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ private RefreshProtoUtils() {
* @return The corresponding OpenSearch refresh policy string value
*/
public static String getRefreshPolicy(org.opensearch.protobufs.Refresh refresh) {
switch (refresh) {
switch (refresh.getRefreshCase()) {
case REFRESH_TRUE:
return WriteRequest.RefreshPolicy.IMMEDIATE.getValue();
case REFRESH_WAIT_FOR:
return WriteRequest.RefreshPolicy.WAIT_UNTIL.getValue();
case REFRESH_FALSE:
case REFRESH_UNSPECIFIED:
case REFRESH_NOT_SET:
default:
return WriteRequest.RefreshPolicy.NONE.getValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public static Script parseFromProtoRequest(org.opensearch.protobufs.Script scrip
private static Script parseFromProtoRequest(org.opensearch.protobufs.Script script, String defaultLang) {
Objects.requireNonNull(defaultLang);

if (script.hasInlineScript()) {
return parseInlineScript(script.getInlineScript(), defaultLang);
} else if (script.hasStoredScriptId()) {
return parseStoredScriptId(script.getStoredScriptId());
if (script.hasInline()) {
return parseInlineScript(script.getInline(), defaultLang);
} else if (script.hasStored()) {
return parseStoredScriptId(script.getStored());
} else {
throw new UnsupportedOperationException("No valid script type detected");
}
Expand Down Expand Up @@ -118,10 +118,10 @@ public static Script parseStoredScriptId(StoredScriptId storedScriptId) {
* @throws UnsupportedOperationException if no language was specified
*/
public static String parseScriptLanguage(ScriptLanguage language, String defaultLang) {
if (language.hasStringValue()) {
return language.getStringValue();
if (language.hasCustom()) {
return language.getCustom();
}
switch (language.getBuiltinScriptLanguage()) {
switch (language.getBuiltin()) {
case BUILTIN_SCRIPT_LANGUAGE_EXPRESSION:
return "expression";
case BUILTIN_SCRIPT_LANGUAGE_JAVA:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,18 @@ private ActiveShardCountProtoUtils() {
public static ActiveShardCount parseProto(WaitForActiveShards waitForActiveShards) {

switch (waitForActiveShards.getWaitForActiveShardsCase()) {
case WaitForActiveShards.WaitForActiveShardsCase.WAIT_FOR_ACTIVE_SHARD_OPTIONS:
switch (waitForActiveShards.getWaitForActiveShardOptions()) {
case WAIT_FOR_ACTIVE_SHARD_OPTIONS_UNSPECIFIED:
throw new UnsupportedOperationException("No mapping for WAIT_FOR_ACTIVE_SHARD_OPTIONS_UNSPECIFIED");
case WAIT_FOR_ACTIVE_SHARD_OPTIONS:
switch (waitForActiveShards.getWaitForActiveShardOptions().getWaitForActiveShardOptionsCase()) {
case WAIT_FOR_ACTIVE_SHARD_OPTIONS_ALL:
return ActiveShardCount.ALL;
case NULL_VALUE:
case WAITFORACTIVESHARDOPTIONS_NOT_SET:
default:
return ActiveShardCount.DEFAULT;
}
case WaitForActiveShards.WaitForActiveShardsCase.INT32_VALUE:
case INT32_VALUE:
return ActiveShardCount.from(waitForActiveShards.getInt32Value());
case WAITFORACTIVESHARDS_NOT_SET:
default:
return ActiveShardCount.DEFAULT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.protobufs.BulkRequest;
import org.opensearch.protobufs.BulkRequestBody;
import org.opensearch.protobufs.CreateOperation;
import org.opensearch.protobufs.DeleteOperation;
import org.opensearch.protobufs.IndexOperation;
import org.opensearch.protobufs.OpType;
import org.opensearch.protobufs.OperationContainer;
import org.opensearch.protobufs.UpdateAction;
import org.opensearch.protobufs.UpdateOperation;
import org.opensearch.protobufs.WriteOperation;
import org.opensearch.script.Script;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
import org.opensearch.transport.grpc.proto.request.common.FetchSourceContextProtoUtils;
Expand Down Expand Up @@ -121,12 +123,12 @@ public static DocWriteRequest<?>[] getDocWriteRequests(
String pipeline = valueOrDefault(defaultPipeline, request.getPipeline());
Boolean requireAlias = valueOrDefault(defaultRequireAlias, request.getRequireAlias());

// Parse the operation type: create, index, update, delete, or none provided (which is invalid).
switch (bulkRequestBodyEntry.getOperationContainerCase()) {
OperationContainer operationContainer = bulkRequestBodyEntry.getOperationContainer();
switch (operationContainer.getOperationContainerCase()) {
case CREATE:
docWriteRequest = buildCreateRequest(
bulkRequestBodyEntry.getCreate(),
bulkRequestBodyEntry.getDoc().toByteArray(),
operationContainer.getCreate(),
bulkRequestBodyEntry.getObject().toByteArray(),
index,
id,
routing,
Expand All @@ -140,8 +142,8 @@ public static DocWriteRequest<?>[] getDocWriteRequests(
break;
case INDEX:
docWriteRequest = buildIndexRequest(
bulkRequestBodyEntry.getIndex(),
bulkRequestBodyEntry.getDoc().toByteArray(),
operationContainer.getIndex(),
bulkRequestBodyEntry.getObject().toByteArray(),
opType,
index,
id,
Expand All @@ -156,8 +158,8 @@ public static DocWriteRequest<?>[] getDocWriteRequests(
break;
case UPDATE:
docWriteRequest = buildUpdateRequest(
bulkRequestBodyEntry.getUpdate(),
bulkRequestBodyEntry.getDoc().toByteArray(),
operationContainer.getUpdate(),
bulkRequestBodyEntry.getObject().toByteArray(),
bulkRequestBodyEntry,
index,
id,
Expand All @@ -172,7 +174,7 @@ public static DocWriteRequest<?>[] getDocWriteRequests(
break;
case DELETE:
docWriteRequest = buildDeleteRequest(
bulkRequestBodyEntry.getDelete(),
operationContainer.getDelete(),
index,
id,
routing,
Expand Down Expand Up @@ -211,7 +213,7 @@ public static DocWriteRequest<?>[] getDocWriteRequests(
* @return The constructed IndexRequest
*/
public static IndexRequest buildCreateRequest(
CreateOperation createOperation,
WriteOperation createOperation,
byte[] document,
String index,
String id,
Expand All @@ -223,17 +225,10 @@ public static IndexRequest buildCreateRequest(
long ifPrimaryTerm,
boolean requireAlias
) {
index = createOperation.hasIndex() ? createOperation.getIndex() : index;
id = createOperation.hasId() ? createOperation.getId() : id;
index = createOperation.hasUnderscoreIndex() ? createOperation.getUnderscoreIndex() : index;
id = createOperation.hasUnderscoreId() ? createOperation.getUnderscoreId() : id;
routing = createOperation.hasRouting() ? createOperation.getRouting() : routing;
version = createOperation.hasVersion() ? createOperation.getVersion() : version;
if (createOperation.hasVersionType()) {
versionType = VersionTypeProtoUtils.fromProto(createOperation.getVersionType());

}
pipeline = createOperation.hasPipeline() ? createOperation.getPipeline() : pipeline;
ifSeqNo = createOperation.hasIfSeqNo() ? createOperation.getIfSeqNo() : ifSeqNo;
ifPrimaryTerm = createOperation.hasIfPrimaryTerm() ? createOperation.getIfPrimaryTerm() : ifPrimaryTerm;
requireAlias = createOperation.hasRequireAlias() ? createOperation.getRequireAlias() : requireAlias;

IndexRequest indexRequest = new IndexRequest(index).id(id)
Expand Down Expand Up @@ -281,8 +276,8 @@ public static IndexRequest buildIndexRequest(
boolean requireAlias
) {
opType = indexOperation.hasOpType() ? indexOperation.getOpType() : opType;
index = indexOperation.hasIndex() ? indexOperation.getIndex() : index;
id = indexOperation.hasId() ? indexOperation.getId() : id;
index = indexOperation.hasUnderscoreIndex() ? indexOperation.getUnderscoreIndex() : index;
id = indexOperation.hasUnderscoreId() ? indexOperation.getUnderscoreId() : id;
routing = indexOperation.hasRouting() ? indexOperation.getRouting() : routing;
version = indexOperation.hasVersion() ? indexOperation.getVersion() : version;
if (indexOperation.hasVersionType()) {
Expand All @@ -309,7 +304,7 @@ public static IndexRequest buildIndexRequest(
.routing(routing)
.version(version)
.versionType(versionType)
.create(opType.equals(OpType.OP_TYPE_CREATE))
.create(opType.getOpTypeCase() == OpType.OpTypeCase.OP_TYPE_CREATE)
.setPipeline(pipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
Expand Down Expand Up @@ -350,11 +345,11 @@ public static UpdateRequest buildUpdateRequest(
long ifPrimaryTerm,
boolean requireAlias
) {
index = updateOperation.hasIndex() ? updateOperation.getIndex() : index;
id = updateOperation.hasId() ? updateOperation.getId() : id;
index = updateOperation.hasUnderscoreIndex() ? updateOperation.getUnderscoreIndex() : index;
id = updateOperation.hasUnderscoreId() ? updateOperation.getUnderscoreId() : id;
routing = updateOperation.hasRouting() ? updateOperation.getRouting() : routing;
fetchSourceContext = bulkRequestBody.hasSource()
? FetchSourceContextProtoUtils.fromProto(bulkRequestBody.getSource())
fetchSourceContext = bulkRequestBody.hasUpdateAction() && bulkRequestBody.getUpdateAction().hasUnderscoreSource()
? FetchSourceContextProtoUtils.fromProto(bulkRequestBody.getUpdateAction().getUnderscoreSource())
: fetchSourceContext;
retryOnConflict = updateOperation.hasRetryOnConflict() ? updateOperation.getRetryOnConflict() : retryOnConflict;
ifSeqNo = updateOperation.hasIfSeqNo() ? updateOperation.getIfSeqNo() : ifSeqNo;
Expand Down Expand Up @@ -400,36 +395,36 @@ public static UpdateRequest fromProto(
BulkRequestBody bulkRequestBody,
UpdateOperation updateOperation
) {
if (bulkRequestBody.hasScript()) {
Script script = ScriptProtoUtils.parseFromProtoRequest(bulkRequestBody.getScript());
updateRequest.script(script);
}
if (bulkRequestBody.hasUpdateAction()) {
UpdateAction updateAction = bulkRequestBody.getUpdateAction();

if (bulkRequestBody.hasScriptedUpsert()) {
updateRequest.scriptedUpsert(bulkRequestBody.getScriptedUpsert());
}
if (updateAction.hasScript()) {
Script script = ScriptProtoUtils.parseFromProtoRequest(updateAction.getScript());
updateRequest.script(script);
}

if (bulkRequestBody.hasUpsert()) {
updateRequest.upsert(bulkRequestBody.getUpsert(), MediaTypeRegistry.JSON);
}
if (updateAction.hasScriptedUpsert()) {
updateRequest.scriptedUpsert(updateAction.getScriptedUpsert());
}

updateRequest.doc(document, MediaTypeRegistry.JSON);
if (updateAction.hasUpsert()) {
updateRequest.upsert(updateAction.getUpsert(), MediaTypeRegistry.JSON);
}

if (bulkRequestBody.hasDocAsUpsert()) {
updateRequest.docAsUpsert(bulkRequestBody.getDocAsUpsert());
}
if (updateAction.hasDocAsUpsert()) {
updateRequest.docAsUpsert(updateAction.getDocAsUpsert());
}

if (bulkRequestBody.hasDetectNoop()) {
updateRequest.detectNoop(bulkRequestBody.getDetectNoop());
}
if (updateAction.hasDetectNoop()) {
updateRequest.detectNoop(updateAction.getDetectNoop());
}

if (bulkRequestBody.hasDocAsUpsert()) {
updateRequest.docAsUpsert(bulkRequestBody.getDocAsUpsert());
if (updateAction.hasUnderscoreSource()) {
updateRequest.fetchSource(FetchSourceContextProtoUtils.fromProto(updateAction.getUnderscoreSource()));
}
}

if (bulkRequestBody.hasSource()) {
updateRequest.fetchSource(FetchSourceContextProtoUtils.fromProto(bulkRequestBody.getSource()));
}
updateRequest.doc(document, MediaTypeRegistry.JSON);

if (updateOperation.hasIfSeqNo()) {
updateRequest.setIfSeqNo(updateOperation.getIfSeqNo());
Expand Down Expand Up @@ -465,8 +460,8 @@ public static DeleteRequest buildDeleteRequest(
long ifSeqNo,
long ifPrimaryTerm
) {
index = deleteOperation.hasIndex() ? deleteOperation.getIndex() : index;
id = deleteOperation.hasId() ? deleteOperation.getId() : id;
index = deleteOperation.hasUnderscoreIndex() ? deleteOperation.getUnderscoreIndex() : index;
id = deleteOperation.hasUnderscoreId() ? deleteOperation.getUnderscoreId() : id;
routing = deleteOperation.hasRouting() ? deleteOperation.getRouting() : routing;
version = deleteOperation.hasVersion() ? deleteOperation.getVersion() : version;
if (deleteOperation.hasVersionType()) {
Expand Down
Loading
Loading