diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index 87611099b2207..d2fab169b60a0 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.uid.Versions; @@ -140,6 +141,11 @@ public interface DocWriteRequest extends IndicesRequest, Accountable { */ boolean isRequireAlias(); + /** + * Pick the appropriate shard id to receive this request. + */ + int route(IndexRouting indexRouting); + /** * Requested operation type to perform on the document */ diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 1043107d4e7b4..a0121b0e2d85a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -525,7 +525,6 @@ protected void doRun() { IndexRouting indexRouting = concreteIndices.routing(concreteIndex); - int shardId; switch (docWriteRequest.opType()) { case CREATE: case INDEX: @@ -534,24 +533,17 @@ protected void doRun() { IndexRequest indexRequest = (IndexRequest) docWriteRequest; indexRequest.resolveRouting(metadata); indexRequest.process(); - shardId = indexRouting.indexShard( - docWriteRequest.id(), - docWriteRequest.routing(), - indexRequest.getContentType(), - indexRequest.source() - ); break; case UPDATE: docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index())); - shardId = indexRouting.updateShard(docWriteRequest.id(), docWriteRequest.routing()); break; case DELETE: docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index())); - shardId = indexRouting.deleteShard(docWriteRequest.id(), docWriteRequest.routing()); break; default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]"); } + int shardId = docWriteRequest.route(indexRouting); List shardRequests = requestsByShard.computeIfAbsent( new ShardId(concreteIndex, shardId), shard -> new ArrayList<>() diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 9ca64ece1b5e5..35d7e5c332d92 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; +import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -232,6 +233,11 @@ public boolean isRequireAlias() { return false; } + @Override + public int route(IndexRouting indexRouting) { + return indexRouting.deleteShard(id, routing); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 9605844ce32ec..f5dc23c1cafc7 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -722,6 +723,12 @@ public boolean isRequireAlias() { return requireAlias; } + @Override + public int route(IndexRouting indexRouting) { + assert id != null : "route must be called after process"; + return indexRouting.indexShard(id, routing, contentType, source); + } + public IndexRequest setRequireAlias(boolean requireAlias) { this.requireAlias = requireAlias; return this; diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index fa096b921574a..80dde6124cac7 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest; +import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; @@ -830,6 +831,11 @@ public boolean isRequireAlias() { return requireAlias; } + @Override + public int route(IndexRouting indexRouting) { + return indexRouting.updateShard(id, routing); + } + public UpdateRequest setRequireAlias(boolean requireAlias) { this.requireAlias = requireAlias; return this;