Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,99 +36,50 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.common.Nullable;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.MediaTypeRegistry;

import java.io.IOException;
import java.util.Objects;

/**
* Transport request for a Single bulk item
*
* @opensearch.internal
*/
public class BulkItemRequest implements Writeable, Accountable {
public record BulkItemRequest(int id, DocWriteRequest<?> request, BulkItemResponse primaryResponse) implements Writeable, Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkItemRequest.class);

private int id;
private DocWriteRequest<?> request;
private volatile BulkItemResponse primaryResponse;

/**
* @param shardId the shard id
*/
BulkItemRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
id = in.readVInt();
request = DocWriteRequest.readDocumentRequest(shardId, in);
this(in.readVInt(), DocWriteRequest.readDocumentRequest(shardId, in), readPrimaryResponse(shardId, in));
}

private static BulkItemResponse readPrimaryResponse(ShardId shardId, StreamInput in) throws IOException {
if (in.readBoolean()) {
if (shardId == null) {
primaryResponse = new BulkItemResponse(in);
return new BulkItemResponse(in);
} else {
primaryResponse = new BulkItemResponse(shardId, in);
return new BulkItemResponse(shardId, in);
}
}
return null;
}

// NOTE: public for testing only
public BulkItemRequest(int id, DocWriteRequest<?> request) {
this.id = id;
this.request = request;
}

public int id() {
return id;
}

public DocWriteRequest<?> request() {
return request;
this(id, request, null);
}

public String index() {
assert request.indices().length == 1;
return request.indices()[0];
}

BulkItemResponse getPrimaryResponse() {
return primaryResponse;
}

void setPrimaryResponse(BulkItemResponse primaryResponse) {
this.primaryResponse = primaryResponse;
}

/**
* Abort this request, and store a {@link org.opensearch.action.bulk.BulkItemResponse.Failure} response.
*
* @param index The concrete index that was resolved for this request
* @param cause The cause of the rejection (may not be null)
* @throws IllegalStateException If a response already exists for this request
*/
public void abort(String index, Exception cause) {
if (primaryResponse == null) {
final BulkItemResponse.Failure failure = new BulkItemResponse.Failure(index, request.id(), Objects.requireNonNull(cause), true);
setPrimaryResponse(new BulkItemResponse(id, request.opType(), failure));
} else {
assert primaryResponse.isFailed() && primaryResponse.getFailure().isAborted() : "response ["
+ Strings.toString(MediaTypeRegistry.JSON, primaryResponse)
+ "]; cause ["
+ cause
+ "]";
if (primaryResponse.isFailed() && primaryResponse.getFailure().isAborted()) {
primaryResponse.getFailure().getCause().addSuppressed(cause);
} else {
throw new IllegalStateException(
"aborting item that with response [" + primaryResponse + "] that was previously processed",
cause
);
}
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,6 @@ public long getTerm() {
/**
* Whether this failure is the result of an <em>abort</em>.
* If {@code true}, the request to which this failure relates should never be retried, regardless of the {@link #getCause() cause}.
* @see BulkItemRequest#abort(String, Exception)
*/
public boolean isAborted() {
return aborted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@

import java.util.Arrays;

import static org.opensearch.action.bulk.BulkShardResponse.DEFAULT_QUEUE_SIZE;
import static org.opensearch.action.bulk.BulkShardResponse.DEFAULT_SERVICE_TIME_IN_NANOS;

/**
* This is a utility class that holds the per request state needed to perform bulk operations on the primary.
* More specifically, it maintains an index to the current executing bulk item, which allows execution
Expand Down Expand Up @@ -91,19 +88,21 @@ enum ItemProcessingState {
private int currentIndex = -1;

private ItemProcessingState currentItemState;
private DocWriteRequest requestToExecute;
private DocWriteRequest<?> requestToExecute;
private BulkItemResponse executionResult;
private int retryCounter;
private final BulkItemResponse[] primaryResponses;

BulkPrimaryExecutionContext(BulkShardRequest request, IndexShard primary) {
this.request = request;
this.primary = primary;
this.primaryResponses = new BulkItemResponse[request.items().length];
advance();
}

private int findNextNonAborted(int startIndex) {
final int length = request.items().length;
while (startIndex < length && isAborted(request.items()[startIndex].getPrimaryResponse())) {
while (startIndex < length && isAborted(request.items()[startIndex].primaryResponse())) {
startIndex++;
}
return startIndex;
Expand Down Expand Up @@ -131,7 +130,19 @@ public DocWriteRequest<?> getCurrent() {
}

public BulkShardRequest getBulkShardRequest() {
return request;
BulkItemRequest[] newRequests = new BulkItemRequest[request.items().length];
for (int i = 0; i < newRequests.length; i++) {
BulkItemRequest oldRequest = request.items()[i];
newRequests[i] = new BulkItemRequest(oldRequest.id(), oldRequest.request(), primaryResponses[i]);
}
BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), request.getRefreshPolicy(), newRequests);
// Clone other properties from primary shard request
// See TransportBulkAction.BulkOperation#doRun() for construction of the primary shard request.
bulkShardRequest.waitForActiveShards(request.waitForActiveShards());
bulkShardRequest.timeout(request.timeout());
bulkShardRequest.routedBasedOnClusterVersion(request.routedBasedOnClusterVersion());
bulkShardRequest.setParentTask(request.getParentTask());
return bulkShardRequest;
}

/** returns the result of the request that has been executed on the shard */
Expand All @@ -145,21 +156,11 @@ public int getRetryCounter() {
return retryCounter;
}

/** returns true if the current request has been executed on the primary */
public boolean isOperationExecuted() {
return currentItemState == ItemProcessingState.EXECUTED;
}

/** returns true if the request needs to wait for a mapping update to arrive from the cluster-manager */
public boolean requiresWaitingForMappingUpdate() {
return currentItemState == ItemProcessingState.WAIT_FOR_MAPPING_UPDATE;
}

/** returns true if the current request should be retried without waiting for an external event */
public boolean requiresImmediateRetry() {
return currentItemState == ItemProcessingState.IMMEDIATE_RETRY;
}

/**
* returns true if the current request has been completed and it's result translated to a user
* facing response
Expand Down Expand Up @@ -206,17 +207,18 @@ public IndexShard getPrimary() {
}

/**
* sets the request that should actually be executed on the primary. This can be different then the request
* sets the request that should actually be executed on the primary. This can be different from the request
* received from the user (specifically, an update request is translated to an indexing or delete request).
*/
public void setRequestToExecute(DocWriteRequest writeRequest) {
public void setRequestToExecute(DocWriteRequest<?> writeRequest) {
assert assertInvariants(ItemProcessingState.INITIAL);
requestToExecute = writeRequest;
currentItemState = ItemProcessingState.TRANSLATED;
assert assertInvariants(ItemProcessingState.TRANSLATED);
}

/** returns the request that should be executed on the shard. */
@SuppressWarnings("unchecked")
public <T extends DocWriteRequest<T>> T getRequestToExecute() {
assert assertInvariants(ItemProcessingState.TRANSLATED);
return (T) requestToExecute;
Expand Down Expand Up @@ -252,7 +254,7 @@ public void markOperationAsNoOp(DocWriteResponse response) {
public void failOnMappingUpdate(Exception cause) {
assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE);
currentItemState = ItemProcessingState.EXECUTED;
final DocWriteRequest docWriteRequest = getCurrentItem().request();
final DocWriteRequest<?> docWriteRequest = getCurrentItem().request();
executionResult = new BulkItemResponse(
getCurrentItem().id(),
docWriteRequest.opType(),
Expand All @@ -267,7 +269,7 @@ public void failOnMappingUpdate(Exception cause) {
public void markOperationAsExecuted(Engine.Result result) {
assertInvariants(ItemProcessingState.TRANSLATED);
final BulkItemRequest current = getCurrentItem();
DocWriteRequest docWriteRequest = getRequestToExecute();
DocWriteRequest<?> docWriteRequest = getRequestToExecute();
switch (result.getResultType()) {
case SUCCESS:
final DocWriteResponse response;
Expand Down Expand Up @@ -347,24 +349,15 @@ public void markAsCompleted(BulkItemResponse translatedResponse) {
if (translatedResponse.isFailed() == false && requestToExecute != null && requestToExecute != getCurrent()) {
request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute);
}
getCurrentItem().setPrimaryResponse(translatedResponse);
primaryResponses[currentIndex] = translatedResponse;
currentItemState = ItemProcessingState.COMPLETED;
advance();
}

/** builds the bulk shard response to return to the user */
public BulkShardResponse buildShardResponse(long serviceTimeEWMAInNanos, int nodeQueueSize) {
assert hasMoreOperationsToExecute() == false;
return new BulkShardResponse(
request.shardId(),
Arrays.stream(request.items()).map(BulkItemRequest::getPrimaryResponse).toArray(BulkItemResponse[]::new),
serviceTimeEWMAInNanos,
nodeQueueSize
);
}

public BulkShardResponse buildShardResponse() {
return buildShardResponse(DEFAULT_SERVICE_TIME_IN_NANOS, DEFAULT_QUEUE_SIZE);
return new BulkShardResponse(request.shardId(), primaryResponses, serviceTimeEWMAInNanos, nodeQueueSize);
}

private boolean assertInvariants(ItemProcessingState... expectedCurrentState) {
Expand All @@ -379,26 +372,22 @@ private boolean assertInvariants(ItemProcessingState... expectedCurrentState) {
assert requestToExecute == null : requestToExecute;
assert executionResult == null : executionResult;
break;
case TRANSLATED:
case TRANSLATED, IMMEDIATE_RETRY:
assert requestToExecute != null;
assert executionResult == null : executionResult;
break;
case WAIT_FOR_MAPPING_UPDATE:
assert requestToExecute == null;
assert executionResult == null : executionResult;
break;
case IMMEDIATE_RETRY:
assert requestToExecute != null;
assert executionResult == null : executionResult;
break;
case EXECUTED:
// requestToExecute can be null if the update ended up as NOOP
assert executionResult != null;
break;
case COMPLETED:
assert requestToExecute != null;
assert executionResult != null;
assert getCurrentItem().getPrimaryResponse() != null;
assert getCurrentItem().primaryResponse() != null;
break;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,9 +860,9 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
final BulkItemRequest item = request.items()[i];
final BulkItemResponse response = item.getPrimaryResponse();
final BulkItemResponse response = item.primaryResponse();
final Engine.Result operationResult;
if (item.getPrimaryResponse().isFailed()) {
if (item.primaryResponse().isFailed()) {
if (response.getFailure().getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
continue; // ignore replication as we didn't generate a sequence number for this request.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,75 +40,18 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.core.index.AppendOnlyIndexOperationRetryException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.Translog;
import org.opensearch.test.OpenSearchTestCase;

import java.util.ArrayList;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class BulkPrimaryExecutionContextTests extends OpenSearchTestCase {

public void testAbortedSkipped() {
BulkShardRequest shardRequest = generateRandomRequest();

ArrayList<DocWriteRequest<?>> nonAbortedRequests = new ArrayList<>();
for (BulkItemRequest request : shardRequest.items()) {
if (randomBoolean()) {
request.abort("index", new OpenSearchException("bla"));
} else {
nonAbortedRequests.add(request.request());
}
}

ArrayList<DocWriteRequest<?>> visitedRequests = new ArrayList<>();
for (BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, null); context
.hasMoreOperationsToExecute();) {
visitedRequests.add(context.getCurrent());
context.setRequestToExecute(context.getCurrent());
// using failures prevents caring about types
context.markOperationAsExecuted(new Engine.IndexResult(new OpenSearchException("bla"), 1));
context.markAsCompleted(context.getExecutionResult());
}

assertThat(visitedRequests, equalTo(nonAbortedRequests));
}

public void testAppendOnlyIndexOperationRetryException() {
BulkShardRequest shardRequest = generateRandomRequest();

final IndexShard primary = mock(IndexShard.class);
when(primary.shardId()).thenReturn(shardRequest.shardId());
ArrayList<DocWriteRequest<?>> nonAbortedRequests = new ArrayList<>();
for (BulkItemRequest request : shardRequest.items()) {
if (randomBoolean()) {
request.abort("index", new AppendOnlyIndexOperationRetryException("Indexing operation retried for append only indices"));
} else {
nonAbortedRequests.add(request.request());
}
}

ArrayList<DocWriteRequest<?>> visitedRequests = new ArrayList<>();
for (BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, primary); context
.hasMoreOperationsToExecute();) {
visitedRequests.add(context.getCurrent());
context.setRequestToExecute(context.getCurrent());
// using failures prevents caring about types
context.markOperationAsExecuted(
new Engine.IndexResult(new AppendOnlyIndexOperationRetryException("Indexing operation retried for append only indices"), 1)
);
context.markAsCompleted(context.getExecutionResult());
}

assertThat(visitedRequests, equalTo(nonAbortedRequests));
}

private BulkShardRequest generateRandomRequest() {
BulkItemRequest[] items = new BulkItemRequest[randomInt(20)];
for (int i = 0; i < items.length; i++) {
Expand Down
Loading
Loading