diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java b/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java
index b0ba44c308b95..8b25f3a5b33e5 100644
--- a/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java
+++ b/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java
@@ -36,56 +36,43 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.common.Nullable;
-import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.index.shard.ShardId;
-import org.opensearch.core.xcontent.MediaTypeRegistry;
import java.io.IOException;
-import java.util.Objects;
/**
* Transport request for a Single bulk item
*
* @opensearch.internal
*/
-public class BulkItemRequest implements Writeable, Accountable {
+public record BulkItemRequest(int id, DocWriteRequest> request, BulkItemResponse primaryResponse) implements Writeable, Accountable {
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkItemRequest.class);
- private int id;
- private DocWriteRequest> request;
- private volatile BulkItemResponse primaryResponse;
-
/**
* @param shardId the shard id
*/
BulkItemRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
- id = in.readVInt();
- request = DocWriteRequest.readDocumentRequest(shardId, in);
+ this(in.readVInt(), DocWriteRequest.readDocumentRequest(shardId, in), readPrimaryResponse(shardId, in));
+ }
+
+ private static BulkItemResponse readPrimaryResponse(ShardId shardId, StreamInput in) throws IOException {
if (in.readBoolean()) {
if (shardId == null) {
- primaryResponse = new BulkItemResponse(in);
+ return new BulkItemResponse(in);
} else {
- primaryResponse = new BulkItemResponse(shardId, in);
+ return new BulkItemResponse(shardId, in);
}
}
+ return null;
}
// NOTE: public for testing only
public BulkItemRequest(int id, DocWriteRequest> request) {
- this.id = id;
- this.request = request;
- }
-
- public int id() {
- return id;
- }
-
- public DocWriteRequest> request() {
- return request;
+ this(id, request, null);
}
public String index() {
@@ -93,42 +80,6 @@ public String index() {
return request.indices()[0];
}
- BulkItemResponse getPrimaryResponse() {
- return primaryResponse;
- }
-
- void setPrimaryResponse(BulkItemResponse primaryResponse) {
- this.primaryResponse = primaryResponse;
- }
-
- /**
- * Abort this request, and store a {@link org.opensearch.action.bulk.BulkItemResponse.Failure} response.
- *
- * @param index The concrete index that was resolved for this request
- * @param cause The cause of the rejection (may not be null)
- * @throws IllegalStateException If a response already exists for this request
- */
- public void abort(String index, Exception cause) {
- if (primaryResponse == null) {
- final BulkItemResponse.Failure failure = new BulkItemResponse.Failure(index, request.id(), Objects.requireNonNull(cause), true);
- setPrimaryResponse(new BulkItemResponse(id, request.opType(), failure));
- } else {
- assert primaryResponse.isFailed() && primaryResponse.getFailure().isAborted() : "response ["
- + Strings.toString(MediaTypeRegistry.JSON, primaryResponse)
- + "]; cause ["
- + cause
- + "]";
- if (primaryResponse.isFailed() && primaryResponse.getFailure().isAborted()) {
- primaryResponse.getFailure().getCause().addSuppressed(cause);
- } else {
- throw new IllegalStateException(
- "aborting item that with response [" + primaryResponse + "] that was previously processed",
- cause
- );
- }
- }
- }
-
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java b/server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java
index e43fb2854be91..1334afbabf0be 100644
--- a/server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java
+++ b/server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java
@@ -421,7 +421,6 @@ public long getTerm() {
/**
* Whether this failure is the result of an abort.
* If {@code true}, the request to which this failure relates should never be retried, regardless of the {@link #getCause() cause}.
- * @see BulkItemRequest#abort(String, Exception)
*/
public boolean isAborted() {
return aborted;
diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java
index fcdf0bc8a4bb1..860f1f97cc62c 100644
--- a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java
+++ b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java
@@ -45,9 +45,6 @@
import java.util.Arrays;
-import static org.opensearch.action.bulk.BulkShardResponse.DEFAULT_QUEUE_SIZE;
-import static org.opensearch.action.bulk.BulkShardResponse.DEFAULT_SERVICE_TIME_IN_NANOS;
-
/**
* This is a utility class that holds the per request state needed to perform bulk operations on the primary.
* More specifically, it maintains an index to the current executing bulk item, which allows execution
@@ -91,19 +88,21 @@ enum ItemProcessingState {
private int currentIndex = -1;
private ItemProcessingState currentItemState;
- private DocWriteRequest requestToExecute;
+ private DocWriteRequest> requestToExecute;
private BulkItemResponse executionResult;
private int retryCounter;
+ private final BulkItemResponse[] primaryResponses;
BulkPrimaryExecutionContext(BulkShardRequest request, IndexShard primary) {
this.request = request;
this.primary = primary;
+ this.primaryResponses = new BulkItemResponse[request.items().length];
advance();
}
private int findNextNonAborted(int startIndex) {
final int length = request.items().length;
- while (startIndex < length && isAborted(request.items()[startIndex].getPrimaryResponse())) {
+ while (startIndex < length && isAborted(request.items()[startIndex].primaryResponse())) {
startIndex++;
}
return startIndex;
@@ -131,7 +130,19 @@ public DocWriteRequest> getCurrent() {
}
public BulkShardRequest getBulkShardRequest() {
- return request;
+ BulkItemRequest[] newRequests = new BulkItemRequest[request.items().length];
+ for (int i = 0; i < newRequests.length; i++) {
+ BulkItemRequest oldRequest = request.items()[i];
+ newRequests[i] = new BulkItemRequest(oldRequest.id(), oldRequest.request(), primaryResponses[i]);
+ }
+ BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), request.getRefreshPolicy(), newRequests);
+ // Clone other properties from primary shard request
+ // See TransportBulkAction.BulkOperation#doRun() for construction of the primary shard request.
+ bulkShardRequest.waitForActiveShards(request.waitForActiveShards());
+ bulkShardRequest.timeout(request.timeout());
+ bulkShardRequest.routedBasedOnClusterVersion(request.routedBasedOnClusterVersion());
+ bulkShardRequest.setParentTask(request.getParentTask());
+ return bulkShardRequest;
}
/** returns the result of the request that has been executed on the shard */
@@ -145,21 +156,11 @@ public int getRetryCounter() {
return retryCounter;
}
- /** returns true if the current request has been executed on the primary */
- public boolean isOperationExecuted() {
- return currentItemState == ItemProcessingState.EXECUTED;
- }
-
/** returns true if the request needs to wait for a mapping update to arrive from the cluster-manager */
public boolean requiresWaitingForMappingUpdate() {
return currentItemState == ItemProcessingState.WAIT_FOR_MAPPING_UPDATE;
}
- /** returns true if the current request should be retried without waiting for an external event */
- public boolean requiresImmediateRetry() {
- return currentItemState == ItemProcessingState.IMMEDIATE_RETRY;
- }
-
/**
* returns true if the current request has been completed and it's result translated to a user
* facing response
@@ -206,10 +207,10 @@ public IndexShard getPrimary() {
}
/**
- * sets the request that should actually be executed on the primary. This can be different then the request
+ * sets the request that should actually be executed on the primary. This can be different from the request
* received from the user (specifically, an update request is translated to an indexing or delete request).
*/
- public void setRequestToExecute(DocWriteRequest writeRequest) {
+ public void setRequestToExecute(DocWriteRequest> writeRequest) {
assert assertInvariants(ItemProcessingState.INITIAL);
requestToExecute = writeRequest;
currentItemState = ItemProcessingState.TRANSLATED;
@@ -217,6 +218,7 @@ public void setRequestToExecute(DocWriteRequest writeRequest) {
}
/** returns the request that should be executed on the shard. */
+ @SuppressWarnings("unchecked")
public > T getRequestToExecute() {
assert assertInvariants(ItemProcessingState.TRANSLATED);
return (T) requestToExecute;
@@ -252,7 +254,7 @@ public void markOperationAsNoOp(DocWriteResponse response) {
public void failOnMappingUpdate(Exception cause) {
assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE);
currentItemState = ItemProcessingState.EXECUTED;
- final DocWriteRequest docWriteRequest = getCurrentItem().request();
+ final DocWriteRequest> docWriteRequest = getCurrentItem().request();
executionResult = new BulkItemResponse(
getCurrentItem().id(),
docWriteRequest.opType(),
@@ -267,7 +269,7 @@ public void failOnMappingUpdate(Exception cause) {
public void markOperationAsExecuted(Engine.Result result) {
assertInvariants(ItemProcessingState.TRANSLATED);
final BulkItemRequest current = getCurrentItem();
- DocWriteRequest docWriteRequest = getRequestToExecute();
+ DocWriteRequest> docWriteRequest = getRequestToExecute();
switch (result.getResultType()) {
case SUCCESS:
final DocWriteResponse response;
@@ -347,7 +349,7 @@ public void markAsCompleted(BulkItemResponse translatedResponse) {
if (translatedResponse.isFailed() == false && requestToExecute != null && requestToExecute != getCurrent()) {
request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute);
}
- getCurrentItem().setPrimaryResponse(translatedResponse);
+ primaryResponses[currentIndex] = translatedResponse;
currentItemState = ItemProcessingState.COMPLETED;
advance();
}
@@ -355,16 +357,7 @@ public void markAsCompleted(BulkItemResponse translatedResponse) {
/** builds the bulk shard response to return to the user */
public BulkShardResponse buildShardResponse(long serviceTimeEWMAInNanos, int nodeQueueSize) {
assert hasMoreOperationsToExecute() == false;
- return new BulkShardResponse(
- request.shardId(),
- Arrays.stream(request.items()).map(BulkItemRequest::getPrimaryResponse).toArray(BulkItemResponse[]::new),
- serviceTimeEWMAInNanos,
- nodeQueueSize
- );
- }
-
- public BulkShardResponse buildShardResponse() {
- return buildShardResponse(DEFAULT_SERVICE_TIME_IN_NANOS, DEFAULT_QUEUE_SIZE);
+ return new BulkShardResponse(request.shardId(), primaryResponses, serviceTimeEWMAInNanos, nodeQueueSize);
}
private boolean assertInvariants(ItemProcessingState... expectedCurrentState) {
@@ -379,7 +372,7 @@ private boolean assertInvariants(ItemProcessingState... expectedCurrentState) {
assert requestToExecute == null : requestToExecute;
assert executionResult == null : executionResult;
break;
- case TRANSLATED:
+ case TRANSLATED, IMMEDIATE_RETRY:
assert requestToExecute != null;
assert executionResult == null : executionResult;
break;
@@ -387,10 +380,6 @@ private boolean assertInvariants(ItemProcessingState... expectedCurrentState) {
assert requestToExecute == null;
assert executionResult == null : executionResult;
break;
- case IMMEDIATE_RETRY:
- assert requestToExecute != null;
- assert executionResult == null : executionResult;
- break;
case EXECUTED:
// requestToExecute can be null if the update ended up as NOOP
assert executionResult != null;
@@ -398,7 +387,7 @@ private boolean assertInvariants(ItemProcessingState... expectedCurrentState) {
case COMPLETED:
assert requestToExecute != null;
assert executionResult != null;
- assert getCurrentItem().getPrimaryResponse() != null;
+ assert getCurrentItem().primaryResponse() != null;
break;
}
return true;
diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java
index 3f06354de9b5d..cf8a6801c4627 100644
--- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java
+++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java
@@ -860,9 +860,9 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
final BulkItemRequest item = request.items()[i];
- final BulkItemResponse response = item.getPrimaryResponse();
+ final BulkItemResponse response = item.primaryResponse();
final Engine.Result operationResult;
- if (item.getPrimaryResponse().isFailed()) {
+ if (item.primaryResponse().isFailed()) {
if (response.getFailure().getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
continue; // ignore replication as we didn't generate a sequence number for this request.
}
diff --git a/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java b/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java
index 9745203e91586..19020f21d38bb 100644
--- a/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java
+++ b/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java
@@ -40,75 +40,18 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
-import org.opensearch.core.index.AppendOnlyIndexOperationRetryException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.Translog;
import org.opensearch.test.OpenSearchTestCase;
-import java.util.ArrayList;
-
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class BulkPrimaryExecutionContextTests extends OpenSearchTestCase {
- public void testAbortedSkipped() {
- BulkShardRequest shardRequest = generateRandomRequest();
-
- ArrayList> nonAbortedRequests = new ArrayList<>();
- for (BulkItemRequest request : shardRequest.items()) {
- if (randomBoolean()) {
- request.abort("index", new OpenSearchException("bla"));
- } else {
- nonAbortedRequests.add(request.request());
- }
- }
-
- ArrayList> visitedRequests = new ArrayList<>();
- for (BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, null); context
- .hasMoreOperationsToExecute();) {
- visitedRequests.add(context.getCurrent());
- context.setRequestToExecute(context.getCurrent());
- // using failures prevents caring about types
- context.markOperationAsExecuted(new Engine.IndexResult(new OpenSearchException("bla"), 1));
- context.markAsCompleted(context.getExecutionResult());
- }
-
- assertThat(visitedRequests, equalTo(nonAbortedRequests));
- }
-
- public void testAppendOnlyIndexOperationRetryException() {
- BulkShardRequest shardRequest = generateRandomRequest();
-
- final IndexShard primary = mock(IndexShard.class);
- when(primary.shardId()).thenReturn(shardRequest.shardId());
- ArrayList> nonAbortedRequests = new ArrayList<>();
- for (BulkItemRequest request : shardRequest.items()) {
- if (randomBoolean()) {
- request.abort("index", new AppendOnlyIndexOperationRetryException("Indexing operation retried for append only indices"));
- } else {
- nonAbortedRequests.add(request.request());
- }
- }
-
- ArrayList> visitedRequests = new ArrayList<>();
- for (BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, primary); context
- .hasMoreOperationsToExecute();) {
- visitedRequests.add(context.getCurrent());
- context.setRequestToExecute(context.getCurrent());
- // using failures prevents caring about types
- context.markOperationAsExecuted(
- new Engine.IndexResult(new AppendOnlyIndexOperationRetryException("Indexing operation retried for append only indices"), 1)
- );
- context.markAsCompleted(context.getExecutionResult());
- }
-
- assertThat(visitedRequests, equalTo(nonAbortedRequests));
- }
-
private BulkShardRequest generateRandomRequest() {
BulkItemRequest[] items = new BulkItemRequest[randomInt(20)];
for (int i = 0; i < items.length; i++) {
diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java
index 00bd99dd4b349..966910c8687bd 100644
--- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java
+++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java
@@ -33,7 +33,6 @@
package org.opensearch.action.bulk;
import org.opensearch.OpenSearchException;
-import org.opensearch.OpenSearchStatusException;
import org.opensearch.Version;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteResponse;
@@ -112,8 +111,6 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -159,7 +156,7 @@ public void testExecuteBulkIndexRequest() throws Exception {
items[0] = primaryRequest;
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
- randomlySetIgnoredPrimaryResponse(primaryRequest);
+ items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(
@@ -170,12 +167,13 @@ public void testExecuteBulkIndexRequest() throws Exception {
listener -> {},
ASSERTING_DONE_LISTENER
);
+ BulkShardRequest completedRequest = context.getBulkShardRequest();
assertFalse(context.hasMoreOperationsToExecute());
// Translog should change, since there were no problems
assertNotNull(context.getLocationToSync());
- BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
+ BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse();
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
@@ -190,7 +188,7 @@ public void testExecuteBulkIndexRequest() throws Exception {
items[0] = primaryRequest;
bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
- randomlySetIgnoredPrimaryResponse(primaryRequest);
+ items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest);
BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(
@@ -201,13 +199,14 @@ public void testExecuteBulkIndexRequest() throws Exception {
listener -> {},
ASSERTING_DONE_LISTENER
);
+ completedRequest = secondContext.getBulkShardRequest();
assertFalse(context.hasMoreOperationsToExecute());
assertNull(secondContext.getLocationToSync());
- BulkItemRequest replicaRequest = bulkShardRequest.items()[0];
+ BulkItemRequest replicaRequest = completedRequest.items()[0];
- primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
+ primaryResponse = replicaRequest.primaryResponse();
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
@@ -222,77 +221,15 @@ public void testExecuteBulkIndexRequest() throws Exception {
assertThat(failure.getCause().getMessage(), containsString("version conflict, document already exists (current version [1])"));
assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT));
- assertThat(replicaRequest, equalTo(primaryRequest));
+ assertEquals(primaryRequest.request(), replicaRequest.request());
+ assertEquals(primaryRequest.index(), replicaRequest.index());
+ assertEquals(primaryRequest.id(), replicaRequest.id());
// Assert that the document count is still 1
assertDocCount(shard, 1);
closeShards(shard);
}
- public void testSkipBulkIndexRequestIfAborted() throws Exception {
- IndexShard shard = newStartedShard(true);
-
- BulkItemRequest[] items = new BulkItemRequest[randomIntBetween(2, 5)];
- for (int i = 0; i < items.length; i++) {
- DocWriteRequest writeRequest = new IndexRequest("index").id("id_" + i)
- .source(Requests.INDEX_CONTENT_TYPE)
- .opType(DocWriteRequest.OpType.INDEX);
- items[i] = new BulkItemRequest(i, writeRequest);
- }
- BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
-
- // Preemptively abort one of the bulk items, but allow the others to proceed
- BulkItemRequest rejectItem = randomFrom(items);
- RestStatus rejectionStatus = randomFrom(RestStatus.BAD_REQUEST, RestStatus.CONFLICT, RestStatus.FORBIDDEN, RestStatus.LOCKED);
- final OpenSearchStatusException rejectionCause = new OpenSearchStatusException("testing rejection", rejectionStatus);
- rejectItem.abort("index", rejectionCause);
-
- final CountDownLatch latch = new CountDownLatch(1);
- TransportShardBulkAction.performOnPrimary(
- bulkShardRequest,
- shard,
- null,
- threadPool::absoluteTimeInMillis,
- new NoopMappingUpdatePerformer(),
- listener -> {},
- ActionListener.runAfter(ActionTestUtils.assertNoFailureListener(result -> {
- // since at least 1 item passed, the tran log location should exist,
- assertThat(((WritePrimaryResult) result).location, notNullValue());
- // and the response should exist and match the item count
- assertThat(result.finalResponseIfSuccessful, notNullValue());
- assertThat(result.finalResponseIfSuccessful.getResponses(), arrayWithSize(items.length));
-
- // check each response matches the input item, including the rejection
- for (int i = 0; i < items.length; i++) {
- BulkItemResponse response = result.finalResponseIfSuccessful.getResponses()[i];
- assertThat(response.getItemId(), equalTo(i));
- assertThat(response.getIndex(), equalTo("index"));
- assertThat(response.getId(), equalTo("id_" + i));
- assertThat(response.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
- if (response.getItemId() == rejectItem.id()) {
- assertTrue(response.isFailed());
- assertThat(response.getFailure().getCause(), equalTo(rejectionCause));
- assertThat(response.status(), equalTo(rejectionStatus));
- } else {
- assertFalse(response.isFailed());
- }
- }
-
- // Check that the non-rejected updates made it to the shard
- try {
- assertDocCount(shard, items.length - 1);
- closeShards(shard);
- } catch (IOException e) {
- throw new AssertionError(e);
- }
- }), latch::countDown),
- threadPool,
- Names.WRITE
- );
-
- latch.await();
- }
-
public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
BulkItemRequest[] items = new BulkItemRequest[1];
@@ -313,7 +250,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
);
when(shard.mapperService()).thenReturn(mock(MapperService.class));
- randomlySetIgnoredPrimaryResponse(items[0]);
+ items[0] = randomlySetIgnoredPrimaryResponse(items[0]);
// Pretend the mappings haven't made it to the node yet
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
@@ -344,12 +281,13 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
listener -> {},
ASSERTING_DONE_LISTENER
);
+ BulkShardRequest completedRequest = context.getBulkShardRequest();
// Verify that the shard "executed" the operation only once (1 for previous invocations plus
// 1 for this execution)
verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());
- BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
+ BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse();
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
@@ -372,7 +310,7 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex
boolean errorOnWait = randomBoolean();
- randomlySetIgnoredPrimaryResponse(items[0]);
+ items[0] = randomlySetIgnoredPrimaryResponse(items[0]);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
final CountDownLatch latch = new CountDownLatch(1);
@@ -394,11 +332,12 @@ public void onFailure(final Exception e) {
);
latch.await();
assertFalse(context.hasMoreOperationsToExecute());
+ BulkShardRequest completedRequest = context.getBulkShardRequest();
// Translog shouldn't be synced, as there were conflicting mappings
assertThat(context.getLocationToSync(), nullValue());
- BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
+ BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse();
// Since this was not a conflict failure, the primary response
// should be filled out with the failure information
@@ -425,7 +364,7 @@ public void testExecuteBulkDeleteRequest() throws Exception {
Translog.Location location = new Translog.Location(0, 0, 0);
- randomlySetIgnoredPrimaryResponse(items[0]);
+ items[0] = randomlySetIgnoredPrimaryResponse(items[0]);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(
@@ -437,13 +376,14 @@ public void testExecuteBulkDeleteRequest() throws Exception {
ASSERTING_DONE_LISTENER
);
assertFalse(context.hasMoreOperationsToExecute());
+ BulkShardRequest completedRequest = context.getBulkShardRequest();
// Translog changes, even though the document didn't exist
assertThat(context.getLocationToSync(), not(location));
- BulkItemRequest replicaRequest = bulkShardRequest.items()[0];
+ BulkItemRequest replicaRequest = completedRequest.items()[0];
DocWriteRequest> replicaDeleteRequest = replicaRequest.request();
- BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse();
+ BulkItemResponse primaryResponse = replicaRequest.primaryResponse();
DeleteResponse response = primaryResponse.getResponse();
// Any version can be matched on replica
@@ -472,7 +412,7 @@ public void testExecuteBulkDeleteRequest() throws Exception {
location = context.getLocationToSync();
- randomlySetIgnoredPrimaryResponse(items[0]);
+ items[0] = randomlySetIgnoredPrimaryResponse(items[0]);
context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(
@@ -484,13 +424,14 @@ public void testExecuteBulkDeleteRequest() throws Exception {
ASSERTING_DONE_LISTENER
);
assertFalse(context.hasMoreOperationsToExecute());
+ completedRequest = context.getBulkShardRequest();
// Translog changes, because the document was deleted
assertThat(context.getLocationToSync(), not(location));
- replicaRequest = bulkShardRequest.items()[0];
+ replicaRequest = completedRequest.items()[0];
replicaDeleteRequest = replicaRequest.request();
- primaryResponse = replicaRequest.getPrimaryResponse();
+ primaryResponse = replicaRequest.primaryResponse();
response = primaryResponse.getResponse();
// Any version can be matched on replica
@@ -535,7 +476,7 @@ public void testNoopUpdateRequest() throws Exception {
BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest };
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
- randomlySetIgnoredPrimaryResponse(primaryRequest);
+ items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(
@@ -546,19 +487,19 @@ public void testNoopUpdateRequest() throws Exception {
listener -> {},
ASSERTING_DONE_LISTENER
);
+ BulkShardRequest completedRequest = context.getBulkShardRequest();
assertFalse(context.hasMoreOperationsToExecute());
// Basically nothing changes in the request since it's a noop
assertThat(context.getLocationToSync(), nullValue());
- BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
+ BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse();
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE));
assertThat(primaryResponse.getResponse(), equalTo(noopUpdateResponse));
assertThat(primaryResponse.getResponse().getResult(), equalTo(DocWriteResponse.Result.NOOP));
- assertThat(bulkShardRequest.items().length, equalTo(1));
- assertEquals(primaryRequest, bulkShardRequest.items()[0]); // check that bulk item was not mutated
+ assertThat(completedRequest.items().length, equalTo(1));
assertThat(primaryResponse.getResponse().getSeqNo(), equalTo(0L));
}
@@ -590,7 +531,7 @@ public void testUpdateRequestWithFailure() throws Exception {
BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest };
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
- randomlySetIgnoredPrimaryResponse(primaryRequest);
+ items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(
@@ -606,7 +547,8 @@ public void testUpdateRequestWithFailure() throws Exception {
// Since this was not a conflict failure, the primary response
// should be filled out with the failure information
assertNull(context.getLocationToSync());
- BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
+ BulkShardRequest completedRequest = context.getBulkShardRequest();
+ BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse();
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE));
@@ -647,7 +589,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception {
BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest };
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
- randomlySetIgnoredPrimaryResponse(primaryRequest);
+ items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(
@@ -658,10 +600,11 @@ public void testUpdateRequestWithConflictFailure() throws Exception {
listener -> listener.onResponse(null),
ASSERTING_DONE_LISTENER
);
+ BulkShardRequest completedRequest = context.getBulkShardRequest();
assertFalse(context.hasMoreOperationsToExecute());
assertNull(context.getLocationToSync());
- BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
+ BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse();
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE));
@@ -704,7 +647,7 @@ public void testUpdateRequestWithSuccess() throws Exception {
BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest };
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
- randomlySetIgnoredPrimaryResponse(primaryRequest);
+ items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(
@@ -715,14 +658,15 @@ public void testUpdateRequestWithSuccess() throws Exception {
listener -> {},
ASSERTING_DONE_LISTENER
);
+ BulkShardRequest completedRequest = context.getBulkShardRequest();
assertFalse(context.hasMoreOperationsToExecute());
// Check that the translog is successfully advanced
assertThat(context.getLocationToSync(), equalTo(resultLocation));
- assertThat(bulkShardRequest.items()[0].request(), equalTo(updateResponse));
+ assertThat(completedRequest.items()[0].request(), equalTo(updateResponse));
// Since this was not a conflict failure, the primary response
// should be filled out with the failure information
- BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
+ BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse();
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE));
@@ -760,7 +704,7 @@ public void testUpdateWithDelete() throws Exception {
BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest };
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
- randomlySetIgnoredPrimaryResponse(primaryRequest);
+ items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(
@@ -771,12 +715,13 @@ public void testUpdateWithDelete() throws Exception {
listener -> listener.onResponse(null),
ASSERTING_DONE_LISTENER
);
+ BulkShardRequest completedRequest = context.getBulkShardRequest();
assertFalse(context.hasMoreOperationsToExecute());
// Check that the translog is successfully advanced
assertThat(context.getLocationToSync(), equalTo(resultLocation));
- assertThat(bulkShardRequest.items()[0].request(), equalTo(updateResponse));
- BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
+ assertThat(completedRequest.items()[0].request(), equalTo(updateResponse));
+ BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse();
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE));
@@ -797,7 +742,7 @@ public void testFailureDuringUpdateProcessing() throws Exception {
BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest };
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
- randomlySetIgnoredPrimaryResponse(primaryRequest);
+ items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(
@@ -808,10 +753,11 @@ public void testFailureDuringUpdateProcessing() throws Exception {
listener -> {},
ASSERTING_DONE_LISTENER
);
+ BulkShardRequest completedRequest = context.getBulkShardRequest();
assertFalse(context.hasMoreOperationsToExecute());
assertNull(context.getLocationToSync());
- BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
+ BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse();
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE));
@@ -848,7 +794,7 @@ public void testFailedUpdatePreparationDoesNotTriggerRefresh() throws Exception
BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest };
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.IMMEDIATE, items);
- randomlySetIgnoredPrimaryResponse(primaryRequest);
+ items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest);
// Execute the bulk operation through performOnPrimary
CountDownLatch latch = new CountDownLatch(1);
@@ -1131,10 +1077,11 @@ public void testTranslogPositionToSync() throws Exception {
public void testNoOpReplicationOnPrimaryDocumentFailure() throws Exception {
final IndexShard shard = spy(newStartedShard(false));
- BulkItemRequest itemRequest = new BulkItemRequest(0, new IndexRequest("index").source(Requests.INDEX_CONTENT_TYPE));
final String failureMessage = "simulated primary failure";
final IOException exception = new IOException(failureMessage);
- itemRequest.setPrimaryResponse(
+ BulkItemRequest itemRequest = new BulkItemRequest(
+ 0,
+ new IndexRequest("index").source(Requests.INDEX_CONTENT_TYPE),
new BulkItemResponse(
0,
randomFrom(DocWriteRequest.OpType.CREATE, DocWriteRequest.OpType.DELETE, DocWriteRequest.OpType.INDEX),
@@ -1204,7 +1151,7 @@ public void testRetries() throws Exception {
listener -> listener.onResponse(null),
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation));
- BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse();
+ BulkItemResponse primaryResponse = result.replicaRequest().items()[0].primaryResponse();
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE));
@@ -1256,6 +1203,7 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items.toArray(BulkItemRequest[]::new));
+ BulkShardRequest[] completedRequest = new BulkShardRequest[1];
final CountDownLatch latch = new CountDownLatch(1);
Runnable runnable = () -> TransportShardBulkAction.performOnPrimary(
bulkShardRequest,
@@ -1267,8 +1215,9 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
assertEquals(nItems, result.replicaRequest().items().length);
for (BulkItemRequest item : result.replicaRequest().items()) {
- assertEquals(VersionConflictEngineException.class, item.getPrimaryResponse().getFailure().getCause().getClass());
+ assertEquals(VersionConflictEngineException.class, item.primaryResponse().getFailure().getCause().getClass());
}
+ completedRequest[0] = result.replicaRequest();
}), latch),
threadPool,
Names.WRITE
@@ -1280,8 +1229,8 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep
// timeout the request in 10 seconds if there is an infinite loop
assertTrue(latch.await(10, TimeUnit.SECONDS));
- items.forEach(item -> {
- assertEquals(item.getPrimaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class);
+ for (BulkItemRequest item : completedRequest[0].items()) {
+ assertEquals(item.primaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class);
// this assertion is based on the assumption that all bulk item requests are updates and are hence calling
// UpdateRequest::prepareRequest
@@ -1291,7 +1240,7 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep
any(IndexShard.class),
any(LongSupplier.class)
);
- });
+ }
}
public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
@@ -1344,10 +1293,11 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
when(shard.getFailedIndexResult(any(OpenSearchRejectedExecutionException.class), anyLong())).thenCallRealMethod();
when(shard.mapperService()).thenReturn(mock(MapperService.class));
- randomlySetIgnoredPrimaryResponse(items[0]);
+ items[0] = randomlySetIgnoredPrimaryResponse(items[0]);
AtomicInteger updateCalled = new AtomicInteger();
+ BulkShardRequest[] completedRequest = new BulkShardRequest[1];
final CountDownLatch latch = new CountDownLatch(1);
TransportShardBulkAction.performOnPrimary(
bulkShardRequest,
@@ -1368,9 +1318,11 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
}
},
listener -> listener.onResponse(null),
- new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result ->
- // Assert that we still need to fsync the location that was successfully written
- assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation1))), latch),
+ new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
+ // Assert that we still need to fsync the location that was successfully written
+ assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation1));
+ completedRequest[0] = result.replicaRequest();
+ }), latch),
rejectingThreadPool,
Names.WRITE
);
@@ -1380,7 +1332,7 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());
- BulkItemResponse primaryResponse1 = bulkShardRequest.items()[0].getPrimaryResponse();
+ BulkItemResponse primaryResponse1 = completedRequest[0].items()[0].primaryResponse();
assertThat(primaryResponse1.getItemId(), equalTo(0));
assertThat(primaryResponse1.getId(), equalTo("id"));
assertThat(primaryResponse1.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
@@ -1388,13 +1340,13 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
assertThat(primaryResponse1.getResponse().status(), equalTo(RestStatus.CREATED));
assertThat(primaryResponse1.getResponse().getSeqNo(), equalTo(10L));
- BulkItemResponse primaryResponse2 = bulkShardRequest.items()[1].getPrimaryResponse();
+ BulkItemResponse primaryResponse2 = completedRequest[0].items()[1].primaryResponse();
assertThat(primaryResponse2.getItemId(), equalTo(1));
assertThat(primaryResponse2.getId(), equalTo("id"));
assertThat(primaryResponse2.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
assertTrue(primaryResponse2.isFailed());
assertNull(primaryResponse2.getResponse());
- assertEquals(primaryResponse2.status(), RestStatus.TOO_MANY_REQUESTS);
+ assertEquals(RestStatus.TOO_MANY_REQUESTS, primaryResponse2.status());
assertThat(primaryResponse2.getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class));
closeShards(shard);
@@ -1570,10 +1522,12 @@ private TransportChannel createTransportChannel(final PlainActionFuture