diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index d2fab169b60a0..3d05f5e2558e9 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -141,6 +141,11 @@ public interface DocWriteRequest extends IndicesRequest, Accountable { */ boolean isRequireAlias(); + /** + * Finalize the request before executing or routing it. + */ + void process(); + /** * Pick the appropriate shard id to receive this request. */ diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index b16891db54918..851a45af20ec3 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteRequest.OpType; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.AutoCreateAction; @@ -523,26 +524,14 @@ protected void doRun() { throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams"); } - IndexRouting indexRouting = concreteIndices.routing(concreteIndex); - - switch (docWriteRequest.opType()) { - case CREATE: - case INDEX: - prohibitAppendWritesInBackingIndices(docWriteRequest, metadata); - prohibitCustomRoutingOnDataStream(docWriteRequest, metadata); - IndexRequest indexRequest = (IndexRequest) docWriteRequest; - indexRequest.resolveRouting(metadata); - indexRequest.process(); - break; - case UPDATE: - docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index())); - break; - case DELETE: - docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index())); - break; - default: - throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]"); + if (docWriteRequest.opType() == OpType.CREATE || docWriteRequest.opType() == OpType.INDEX) { + prohibitAppendWritesInBackingIndices(docWriteRequest, metadata); + prohibitCustomRoutingOnDataStream(docWriteRequest, metadata); } + docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index())); + docWriteRequest.process(); + + IndexRouting indexRouting = concreteIndices.routing(concreteIndex); int shardId = docWriteRequest.route(indexRouting); List shardRequests = requestsByShard.computeIfAbsent( new ShardId(concreteIndex, shardId), diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index b93fa14b19d5d..7a095b5281a33 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -265,6 +265,7 @@ static boolean executeBulkItemRequest( ) throws Exception { final DocWriteRequest.OpType opType = context.getCurrent().opType(); + // Translate update requests into index or delete requests which can be executed directly final UpdateHelper.Result updateResult; if (opType == DocWriteRequest.OpType.UPDATE) { final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); @@ -279,24 +280,14 @@ static boolean executeBulkItemRequest( context.markAsCompleted(context.getExecutionResult()); return true; } - // execute translated update request - switch (updateResult.getResponseResult()) { - case CREATED: - case UPDATED: - IndexRequest indexRequest = updateResult.action(); - indexRequest.process(); - context.setRequestToExecute(indexRequest); - break; - case DELETED: - context.setRequestToExecute(updateResult.action()); - break; - case NOOP: - context.markOperationAsNoOp(updateResult.action()); - context.markAsCompleted(context.getExecutionResult()); - return true; - default: - throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult()); + if (updateResult.getResponseResult() == DocWriteResponse.Result.NOOP) { + context.markOperationAsNoOp(updateResult.action()); + context.markAsCompleted(context.getExecutionResult()); + return true; } + DocWriteRequest translated = updateResult.action(); + translated.process(); + context.setRequestToExecute(translated); } else { context.setRequestToExecute(context.getCurrent()); updateResult = null; diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 35d7e5c332d92..7a48eb90ab722 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -233,6 +233,11 @@ public boolean isRequireAlias() { return false; } + @Override + public void process() { + // Nothing to do + } + @Override public int route(IndexRouting indexRouting) { return indexRouting.deleteShard(id, routing); diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index f5dc23c1cafc7..b9ac0e9dec2a5 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -18,7 +18,6 @@ import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; @@ -588,6 +587,7 @@ public VersionType versionType() { return this.versionType; } + @Override public void process() { if ("".equals(id)) { throw new IllegalArgumentException("if _id is specified it must not be empty"); @@ -604,11 +604,6 @@ public void process() { } } - /* resolve the routing if needed */ - public void resolveRouting(Metadata metadata) { - routing(metadata.resolveWriteIndexRouting(routing, index)); - } - public void checkAutoIdWithOpTypeCreateSupportedByVersion(Version version) { if (id == null && opType == OpType.CREATE && version.before(Version.V_7_5_0)) { throw new IllegalArgumentException( diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 80dde6124cac7..a0014e8e46d5a 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -831,6 +831,11 @@ public boolean isRequireAlias() { return requireAlias; } + @Override + public void process() { + // Nothing to do + } + @Override public int route(IndexRouting indexRouting) { return indexRouting.updateShard(id, routing);