Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
Expand Down Expand Up @@ -140,6 +141,11 @@ public interface DocWriteRequest<T> extends IndicesRequest, Accountable {
*/
boolean isRequireAlias();

/**
* Pick the appropriate shard id to receive this request.
*/
int route(IndexRouting indexRouting);

/**
* Requested operation type to perform on the document
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,6 @@ protected void doRun() {

IndexRouting indexRouting = concreteIndices.routing(concreteIndex);

int shardId;
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
Expand All @@ -534,24 +533,17 @@ protected void doRun() {
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
indexRequest.resolveRouting(metadata);
indexRequest.process();
shardId = indexRouting.indexShard(
docWriteRequest.id(),
docWriteRequest.routing(),
indexRequest.getContentType(),
indexRequest.source()
);
break;
case UPDATE:
docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
shardId = indexRouting.updateShard(docWriteRequest.id(), docWriteRequest.routing());
break;
case DELETE:
docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
shardId = indexRouting.deleteShard(docWriteRequest.id(), docWriteRequest.routing());
break;
default:
throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
}
int shardId = docWriteRequest.route(indexRouting);
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(
new ShardId(concreteIndex, shardId),
shard -> new ArrayList<>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -232,6 +233,11 @@ public boolean isRequireAlias() {
return false;
}

@Override
public int route(IndexRouting indexRouting) {
return indexRouting.deleteShard(id, routing);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -722,6 +723,12 @@ public boolean isRequireAlias() {
return requireAlias;
}

@Override
public int route(IndexRouting indexRouting) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add comment here that resolveRouting and process must be called prior to this.

Also I think we can cement parts of that by asserting that id != null?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

I'd sort of been hoping to fold those two methods together. But that is a thing for another time.

assert id != null : "route must be called after process";
return indexRouting.indexShard(id, routing, contentType, source);
}

public IndexRequest setRequireAlias(boolean requireAlias) {
this.requireAlias = requireAlias;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -830,6 +831,11 @@ public boolean isRequireAlias() {
return requireAlias;
}

@Override
public int route(IndexRouting indexRouting) {
return indexRouting.updateShard(id, routing);
}

public UpdateRequest setRequireAlias(boolean requireAlias) {
this.requireAlias = requireAlias;
return this;
Expand Down