diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java b/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java index b0ba44c308b95..8b25f3a5b33e5 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java @@ -36,56 +36,43 @@ import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.action.DocWriteRequest; import org.opensearch.common.Nullable; -import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.index.shard.ShardId; -import org.opensearch.core.xcontent.MediaTypeRegistry; import java.io.IOException; -import java.util.Objects; /** * Transport request for a Single bulk item * * @opensearch.internal */ -public class BulkItemRequest implements Writeable, Accountable { +public record BulkItemRequest(int id, DocWriteRequest request, BulkItemResponse primaryResponse) implements Writeable, Accountable { private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkItemRequest.class); - private int id; - private DocWriteRequest request; - private volatile BulkItemResponse primaryResponse; - /** * @param shardId the shard id */ BulkItemRequest(@Nullable ShardId shardId, StreamInput in) throws IOException { - id = in.readVInt(); - request = DocWriteRequest.readDocumentRequest(shardId, in); + this(in.readVInt(), DocWriteRequest.readDocumentRequest(shardId, in), readPrimaryResponse(shardId, in)); + } + + private static BulkItemResponse readPrimaryResponse(ShardId shardId, StreamInput in) throws IOException { if (in.readBoolean()) { if (shardId == null) { - primaryResponse = new BulkItemResponse(in); + return new BulkItemResponse(in); } else { - primaryResponse = new BulkItemResponse(shardId, in); + return new BulkItemResponse(shardId, in); } } + return null; } // NOTE: public for testing only public BulkItemRequest(int id, DocWriteRequest request) { - this.id = id; - this.request = request; - } - - public int id() { - return id; - } - - public DocWriteRequest request() { - return request; + this(id, request, null); } public String index() { @@ -93,42 +80,6 @@ public String index() { return request.indices()[0]; } - BulkItemResponse getPrimaryResponse() { - return primaryResponse; - } - - void setPrimaryResponse(BulkItemResponse primaryResponse) { - this.primaryResponse = primaryResponse; - } - - /** - * Abort this request, and store a {@link org.opensearch.action.bulk.BulkItemResponse.Failure} response. - * - * @param index The concrete index that was resolved for this request - * @param cause The cause of the rejection (may not be null) - * @throws IllegalStateException If a response already exists for this request - */ - public void abort(String index, Exception cause) { - if (primaryResponse == null) { - final BulkItemResponse.Failure failure = new BulkItemResponse.Failure(index, request.id(), Objects.requireNonNull(cause), true); - setPrimaryResponse(new BulkItemResponse(id, request.opType(), failure)); - } else { - assert primaryResponse.isFailed() && primaryResponse.getFailure().isAborted() : "response [" - + Strings.toString(MediaTypeRegistry.JSON, primaryResponse) - + "]; cause [" - + cause - + "]"; - if (primaryResponse.isFailed() && primaryResponse.getFailure().isAborted()) { - primaryResponse.getFailure().getCause().addSuppressed(cause); - } else { - throw new IllegalStateException( - "aborting item that with response [" + primaryResponse + "] that was previously processed", - cause - ); - } - } - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(id); diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java b/server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java index e43fb2854be91..1334afbabf0be 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java @@ -421,7 +421,6 @@ public long getTerm() { /** * Whether this failure is the result of an abort. * If {@code true}, the request to which this failure relates should never be retried, regardless of the {@link #getCause() cause}. - * @see BulkItemRequest#abort(String, Exception) */ public boolean isAborted() { return aborted; diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java index fcdf0bc8a4bb1..860f1f97cc62c 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java @@ -45,9 +45,6 @@ import java.util.Arrays; -import static org.opensearch.action.bulk.BulkShardResponse.DEFAULT_QUEUE_SIZE; -import static org.opensearch.action.bulk.BulkShardResponse.DEFAULT_SERVICE_TIME_IN_NANOS; - /** * This is a utility class that holds the per request state needed to perform bulk operations on the primary. * More specifically, it maintains an index to the current executing bulk item, which allows execution @@ -91,19 +88,21 @@ enum ItemProcessingState { private int currentIndex = -1; private ItemProcessingState currentItemState; - private DocWriteRequest requestToExecute; + private DocWriteRequest requestToExecute; private BulkItemResponse executionResult; private int retryCounter; + private final BulkItemResponse[] primaryResponses; BulkPrimaryExecutionContext(BulkShardRequest request, IndexShard primary) { this.request = request; this.primary = primary; + this.primaryResponses = new BulkItemResponse[request.items().length]; advance(); } private int findNextNonAborted(int startIndex) { final int length = request.items().length; - while (startIndex < length && isAborted(request.items()[startIndex].getPrimaryResponse())) { + while (startIndex < length && isAborted(request.items()[startIndex].primaryResponse())) { startIndex++; } return startIndex; @@ -131,7 +130,19 @@ public DocWriteRequest getCurrent() { } public BulkShardRequest getBulkShardRequest() { - return request; + BulkItemRequest[] newRequests = new BulkItemRequest[request.items().length]; + for (int i = 0; i < newRequests.length; i++) { + BulkItemRequest oldRequest = request.items()[i]; + newRequests[i] = new BulkItemRequest(oldRequest.id(), oldRequest.request(), primaryResponses[i]); + } + BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), request.getRefreshPolicy(), newRequests); + // Clone other properties from primary shard request + // See TransportBulkAction.BulkOperation#doRun() for construction of the primary shard request. + bulkShardRequest.waitForActiveShards(request.waitForActiveShards()); + bulkShardRequest.timeout(request.timeout()); + bulkShardRequest.routedBasedOnClusterVersion(request.routedBasedOnClusterVersion()); + bulkShardRequest.setParentTask(request.getParentTask()); + return bulkShardRequest; } /** returns the result of the request that has been executed on the shard */ @@ -145,21 +156,11 @@ public int getRetryCounter() { return retryCounter; } - /** returns true if the current request has been executed on the primary */ - public boolean isOperationExecuted() { - return currentItemState == ItemProcessingState.EXECUTED; - } - /** returns true if the request needs to wait for a mapping update to arrive from the cluster-manager */ public boolean requiresWaitingForMappingUpdate() { return currentItemState == ItemProcessingState.WAIT_FOR_MAPPING_UPDATE; } - /** returns true if the current request should be retried without waiting for an external event */ - public boolean requiresImmediateRetry() { - return currentItemState == ItemProcessingState.IMMEDIATE_RETRY; - } - /** * returns true if the current request has been completed and it's result translated to a user * facing response @@ -206,10 +207,10 @@ public IndexShard getPrimary() { } /** - * sets the request that should actually be executed on the primary. This can be different then the request + * sets the request that should actually be executed on the primary. This can be different from the request * received from the user (specifically, an update request is translated to an indexing or delete request). */ - public void setRequestToExecute(DocWriteRequest writeRequest) { + public void setRequestToExecute(DocWriteRequest writeRequest) { assert assertInvariants(ItemProcessingState.INITIAL); requestToExecute = writeRequest; currentItemState = ItemProcessingState.TRANSLATED; @@ -217,6 +218,7 @@ public void setRequestToExecute(DocWriteRequest writeRequest) { } /** returns the request that should be executed on the shard. */ + @SuppressWarnings("unchecked") public > T getRequestToExecute() { assert assertInvariants(ItemProcessingState.TRANSLATED); return (T) requestToExecute; @@ -252,7 +254,7 @@ public void markOperationAsNoOp(DocWriteResponse response) { public void failOnMappingUpdate(Exception cause) { assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE); currentItemState = ItemProcessingState.EXECUTED; - final DocWriteRequest docWriteRequest = getCurrentItem().request(); + final DocWriteRequest docWriteRequest = getCurrentItem().request(); executionResult = new BulkItemResponse( getCurrentItem().id(), docWriteRequest.opType(), @@ -267,7 +269,7 @@ public void failOnMappingUpdate(Exception cause) { public void markOperationAsExecuted(Engine.Result result) { assertInvariants(ItemProcessingState.TRANSLATED); final BulkItemRequest current = getCurrentItem(); - DocWriteRequest docWriteRequest = getRequestToExecute(); + DocWriteRequest docWriteRequest = getRequestToExecute(); switch (result.getResultType()) { case SUCCESS: final DocWriteResponse response; @@ -347,7 +349,7 @@ public void markAsCompleted(BulkItemResponse translatedResponse) { if (translatedResponse.isFailed() == false && requestToExecute != null && requestToExecute != getCurrent()) { request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute); } - getCurrentItem().setPrimaryResponse(translatedResponse); + primaryResponses[currentIndex] = translatedResponse; currentItemState = ItemProcessingState.COMPLETED; advance(); } @@ -355,16 +357,7 @@ public void markAsCompleted(BulkItemResponse translatedResponse) { /** builds the bulk shard response to return to the user */ public BulkShardResponse buildShardResponse(long serviceTimeEWMAInNanos, int nodeQueueSize) { assert hasMoreOperationsToExecute() == false; - return new BulkShardResponse( - request.shardId(), - Arrays.stream(request.items()).map(BulkItemRequest::getPrimaryResponse).toArray(BulkItemResponse[]::new), - serviceTimeEWMAInNanos, - nodeQueueSize - ); - } - - public BulkShardResponse buildShardResponse() { - return buildShardResponse(DEFAULT_SERVICE_TIME_IN_NANOS, DEFAULT_QUEUE_SIZE); + return new BulkShardResponse(request.shardId(), primaryResponses, serviceTimeEWMAInNanos, nodeQueueSize); } private boolean assertInvariants(ItemProcessingState... expectedCurrentState) { @@ -379,7 +372,7 @@ private boolean assertInvariants(ItemProcessingState... expectedCurrentState) { assert requestToExecute == null : requestToExecute; assert executionResult == null : executionResult; break; - case TRANSLATED: + case TRANSLATED, IMMEDIATE_RETRY: assert requestToExecute != null; assert executionResult == null : executionResult; break; @@ -387,10 +380,6 @@ private boolean assertInvariants(ItemProcessingState... expectedCurrentState) { assert requestToExecute == null; assert executionResult == null : executionResult; break; - case IMMEDIATE_RETRY: - assert requestToExecute != null; - assert executionResult == null : executionResult; - break; case EXECUTED: // requestToExecute can be null if the update ended up as NOOP assert executionResult != null; @@ -398,7 +387,7 @@ private boolean assertInvariants(ItemProcessingState... expectedCurrentState) { case COMPLETED: assert requestToExecute != null; assert executionResult != null; - assert getCurrentItem().getPrimaryResponse() != null; + assert getCurrentItem().primaryResponse() != null; break; } return true; diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 3f06354de9b5d..cf8a6801c4627 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -860,9 +860,9 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index Translog.Location location = null; for (int i = 0; i < request.items().length; i++) { final BulkItemRequest item = request.items()[i]; - final BulkItemResponse response = item.getPrimaryResponse(); + final BulkItemResponse response = item.primaryResponse(); final Engine.Result operationResult; - if (item.getPrimaryResponse().isFailed()) { + if (item.primaryResponse().isFailed()) { if (response.getFailure().getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) { continue; // ignore replication as we didn't generate a sequence number for this request. } diff --git a/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java b/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java index 9745203e91586..19020f21d38bb 100644 --- a/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java @@ -40,75 +40,18 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateRequest; -import org.opensearch.core.index.AppendOnlyIndexOperationRetryException; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.translog.Translog; import org.opensearch.test.OpenSearchTestCase; -import java.util.ArrayList; - import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class BulkPrimaryExecutionContextTests extends OpenSearchTestCase { - public void testAbortedSkipped() { - BulkShardRequest shardRequest = generateRandomRequest(); - - ArrayList> nonAbortedRequests = new ArrayList<>(); - for (BulkItemRequest request : shardRequest.items()) { - if (randomBoolean()) { - request.abort("index", new OpenSearchException("bla")); - } else { - nonAbortedRequests.add(request.request()); - } - } - - ArrayList> visitedRequests = new ArrayList<>(); - for (BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, null); context - .hasMoreOperationsToExecute();) { - visitedRequests.add(context.getCurrent()); - context.setRequestToExecute(context.getCurrent()); - // using failures prevents caring about types - context.markOperationAsExecuted(new Engine.IndexResult(new OpenSearchException("bla"), 1)); - context.markAsCompleted(context.getExecutionResult()); - } - - assertThat(visitedRequests, equalTo(nonAbortedRequests)); - } - - public void testAppendOnlyIndexOperationRetryException() { - BulkShardRequest shardRequest = generateRandomRequest(); - - final IndexShard primary = mock(IndexShard.class); - when(primary.shardId()).thenReturn(shardRequest.shardId()); - ArrayList> nonAbortedRequests = new ArrayList<>(); - for (BulkItemRequest request : shardRequest.items()) { - if (randomBoolean()) { - request.abort("index", new AppendOnlyIndexOperationRetryException("Indexing operation retried for append only indices")); - } else { - nonAbortedRequests.add(request.request()); - } - } - - ArrayList> visitedRequests = new ArrayList<>(); - for (BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, primary); context - .hasMoreOperationsToExecute();) { - visitedRequests.add(context.getCurrent()); - context.setRequestToExecute(context.getCurrent()); - // using failures prevents caring about types - context.markOperationAsExecuted( - new Engine.IndexResult(new AppendOnlyIndexOperationRetryException("Indexing operation retried for append only indices"), 1) - ); - context.markAsCompleted(context.getExecutionResult()); - } - - assertThat(visitedRequests, equalTo(nonAbortedRequests)); - } - private BulkShardRequest generateRandomRequest() { BulkItemRequest[] items = new BulkItemRequest[randomInt(20)]; for (int i = 0; i < items.length; i++) { diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index 00bd99dd4b349..966910c8687bd 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -33,7 +33,6 @@ package org.opensearch.action.bulk; import org.opensearch.OpenSearchException; -import org.opensearch.OpenSearchStatusException; import org.opensearch.Version; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.DocWriteResponse; @@ -112,8 +111,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.anyInt; @@ -159,7 +156,7 @@ public void testExecuteBulkIndexRequest() throws Exception { items[0] = primaryRequest; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -170,12 +167,13 @@ public void testExecuteBulkIndexRequest() throws Exception { listener -> {}, ASSERTING_DONE_LISTENER ); + BulkShardRequest completedRequest = context.getBulkShardRequest(); assertFalse(context.hasMoreOperationsToExecute()); // Translog should change, since there were no problems assertNotNull(context.getLocationToSync()); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); @@ -190,7 +188,7 @@ public void testExecuteBulkIndexRequest() throws Exception { items[0] = primaryRequest; bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -201,13 +199,14 @@ public void testExecuteBulkIndexRequest() throws Exception { listener -> {}, ASSERTING_DONE_LISTENER ); + completedRequest = secondContext.getBulkShardRequest(); assertFalse(context.hasMoreOperationsToExecute()); assertNull(secondContext.getLocationToSync()); - BulkItemRequest replicaRequest = bulkShardRequest.items()[0]; + BulkItemRequest replicaRequest = completedRequest.items()[0]; - primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + primaryResponse = replicaRequest.primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); @@ -222,77 +221,15 @@ public void testExecuteBulkIndexRequest() throws Exception { assertThat(failure.getCause().getMessage(), containsString("version conflict, document already exists (current version [1])")); assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT)); - assertThat(replicaRequest, equalTo(primaryRequest)); + assertEquals(primaryRequest.request(), replicaRequest.request()); + assertEquals(primaryRequest.index(), replicaRequest.index()); + assertEquals(primaryRequest.id(), replicaRequest.id()); // Assert that the document count is still 1 assertDocCount(shard, 1); closeShards(shard); } - public void testSkipBulkIndexRequestIfAborted() throws Exception { - IndexShard shard = newStartedShard(true); - - BulkItemRequest[] items = new BulkItemRequest[randomIntBetween(2, 5)]; - for (int i = 0; i < items.length; i++) { - DocWriteRequest writeRequest = new IndexRequest("index").id("id_" + i) - .source(Requests.INDEX_CONTENT_TYPE) - .opType(DocWriteRequest.OpType.INDEX); - items[i] = new BulkItemRequest(i, writeRequest); - } - BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - - // Preemptively abort one of the bulk items, but allow the others to proceed - BulkItemRequest rejectItem = randomFrom(items); - RestStatus rejectionStatus = randomFrom(RestStatus.BAD_REQUEST, RestStatus.CONFLICT, RestStatus.FORBIDDEN, RestStatus.LOCKED); - final OpenSearchStatusException rejectionCause = new OpenSearchStatusException("testing rejection", rejectionStatus); - rejectItem.abort("index", rejectionCause); - - final CountDownLatch latch = new CountDownLatch(1); - TransportShardBulkAction.performOnPrimary( - bulkShardRequest, - shard, - null, - threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), - listener -> {}, - ActionListener.runAfter(ActionTestUtils.assertNoFailureListener(result -> { - // since at least 1 item passed, the tran log location should exist, - assertThat(((WritePrimaryResult) result).location, notNullValue()); - // and the response should exist and match the item count - assertThat(result.finalResponseIfSuccessful, notNullValue()); - assertThat(result.finalResponseIfSuccessful.getResponses(), arrayWithSize(items.length)); - - // check each response matches the input item, including the rejection - for (int i = 0; i < items.length; i++) { - BulkItemResponse response = result.finalResponseIfSuccessful.getResponses()[i]; - assertThat(response.getItemId(), equalTo(i)); - assertThat(response.getIndex(), equalTo("index")); - assertThat(response.getId(), equalTo("id_" + i)); - assertThat(response.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); - if (response.getItemId() == rejectItem.id()) { - assertTrue(response.isFailed()); - assertThat(response.getFailure().getCause(), equalTo(rejectionCause)); - assertThat(response.status(), equalTo(rejectionStatus)); - } else { - assertFalse(response.isFailed()); - } - } - - // Check that the non-rejected updates made it to the shard - try { - assertDocCount(shard, items.length - 1); - closeShards(shard); - } catch (IOException e) { - throw new AssertionError(e); - } - }), latch::countDown), - threadPool, - Names.WRITE - ); - - latch.await(); - } - public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { BulkItemRequest[] items = new BulkItemRequest[1]; @@ -313,7 +250,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { ); when(shard.mapperService()).thenReturn(mock(MapperService.class)); - randomlySetIgnoredPrimaryResponse(items[0]); + items[0] = randomlySetIgnoredPrimaryResponse(items[0]); // Pretend the mappings haven't made it to the node yet BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); @@ -344,12 +281,13 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { listener -> {}, ASSERTING_DONE_LISTENER ); + BulkShardRequest completedRequest = context.getBulkShardRequest(); // Verify that the shard "executed" the operation only once (1 for previous invocations plus // 1 for this execution) verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); @@ -372,7 +310,7 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex boolean errorOnWait = randomBoolean(); - randomlySetIgnoredPrimaryResponse(items[0]); + items[0] = randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); final CountDownLatch latch = new CountDownLatch(1); @@ -394,11 +332,12 @@ public void onFailure(final Exception e) { ); latch.await(); assertFalse(context.hasMoreOperationsToExecute()); + BulkShardRequest completedRequest = context.getBulkShardRequest(); // Translog shouldn't be synced, as there were conflicting mappings assertThat(context.getLocationToSync(), nullValue()); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); // Since this was not a conflict failure, the primary response // should be filled out with the failure information @@ -425,7 +364,7 @@ public void testExecuteBulkDeleteRequest() throws Exception { Translog.Location location = new Translog.Location(0, 0, 0); - randomlySetIgnoredPrimaryResponse(items[0]); + items[0] = randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -437,13 +376,14 @@ public void testExecuteBulkDeleteRequest() throws Exception { ASSERTING_DONE_LISTENER ); assertFalse(context.hasMoreOperationsToExecute()); + BulkShardRequest completedRequest = context.getBulkShardRequest(); // Translog changes, even though the document didn't exist assertThat(context.getLocationToSync(), not(location)); - BulkItemRequest replicaRequest = bulkShardRequest.items()[0]; + BulkItemRequest replicaRequest = completedRequest.items()[0]; DocWriteRequest replicaDeleteRequest = replicaRequest.request(); - BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse(); + BulkItemResponse primaryResponse = replicaRequest.primaryResponse(); DeleteResponse response = primaryResponse.getResponse(); // Any version can be matched on replica @@ -472,7 +412,7 @@ public void testExecuteBulkDeleteRequest() throws Exception { location = context.getLocationToSync(); - randomlySetIgnoredPrimaryResponse(items[0]); + items[0] = randomlySetIgnoredPrimaryResponse(items[0]); context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -484,13 +424,14 @@ public void testExecuteBulkDeleteRequest() throws Exception { ASSERTING_DONE_LISTENER ); assertFalse(context.hasMoreOperationsToExecute()); + completedRequest = context.getBulkShardRequest(); // Translog changes, because the document was deleted assertThat(context.getLocationToSync(), not(location)); - replicaRequest = bulkShardRequest.items()[0]; + replicaRequest = completedRequest.items()[0]; replicaDeleteRequest = replicaRequest.request(); - primaryResponse = replicaRequest.getPrimaryResponse(); + primaryResponse = replicaRequest.primaryResponse(); response = primaryResponse.getResponse(); // Any version can be matched on replica @@ -535,7 +476,7 @@ public void testNoopUpdateRequest() throws Exception { BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -546,19 +487,19 @@ public void testNoopUpdateRequest() throws Exception { listener -> {}, ASSERTING_DONE_LISTENER ); + BulkShardRequest completedRequest = context.getBulkShardRequest(); assertFalse(context.hasMoreOperationsToExecute()); // Basically nothing changes in the request since it's a noop assertThat(context.getLocationToSync(), nullValue()); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); assertThat(primaryResponse.getResponse(), equalTo(noopUpdateResponse)); assertThat(primaryResponse.getResponse().getResult(), equalTo(DocWriteResponse.Result.NOOP)); - assertThat(bulkShardRequest.items().length, equalTo(1)); - assertEquals(primaryRequest, bulkShardRequest.items()[0]); // check that bulk item was not mutated + assertThat(completedRequest.items().length, equalTo(1)); assertThat(primaryResponse.getResponse().getSeqNo(), equalTo(0L)); } @@ -590,7 +531,7 @@ public void testUpdateRequestWithFailure() throws Exception { BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -606,7 +547,8 @@ public void testUpdateRequestWithFailure() throws Exception { // Since this was not a conflict failure, the primary response // should be filled out with the failure information assertNull(context.getLocationToSync()); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkShardRequest completedRequest = context.getBulkShardRequest(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -647,7 +589,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -658,10 +600,11 @@ public void testUpdateRequestWithConflictFailure() throws Exception { listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER ); + BulkShardRequest completedRequest = context.getBulkShardRequest(); assertFalse(context.hasMoreOperationsToExecute()); assertNull(context.getLocationToSync()); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -704,7 +647,7 @@ public void testUpdateRequestWithSuccess() throws Exception { BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -715,14 +658,15 @@ public void testUpdateRequestWithSuccess() throws Exception { listener -> {}, ASSERTING_DONE_LISTENER ); + BulkShardRequest completedRequest = context.getBulkShardRequest(); assertFalse(context.hasMoreOperationsToExecute()); // Check that the translog is successfully advanced assertThat(context.getLocationToSync(), equalTo(resultLocation)); - assertThat(bulkShardRequest.items()[0].request(), equalTo(updateResponse)); + assertThat(completedRequest.items()[0].request(), equalTo(updateResponse)); // Since this was not a conflict failure, the primary response // should be filled out with the failure information - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -760,7 +704,7 @@ public void testUpdateWithDelete() throws Exception { BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -771,12 +715,13 @@ public void testUpdateWithDelete() throws Exception { listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER ); + BulkShardRequest completedRequest = context.getBulkShardRequest(); assertFalse(context.hasMoreOperationsToExecute()); // Check that the translog is successfully advanced assertThat(context.getLocationToSync(), equalTo(resultLocation)); - assertThat(bulkShardRequest.items()[0].request(), equalTo(updateResponse)); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + assertThat(completedRequest.items()[0].request(), equalTo(updateResponse)); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -797,7 +742,7 @@ public void testFailureDuringUpdateProcessing() throws Exception { BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -808,10 +753,11 @@ public void testFailureDuringUpdateProcessing() throws Exception { listener -> {}, ASSERTING_DONE_LISTENER ); + BulkShardRequest completedRequest = context.getBulkShardRequest(); assertFalse(context.hasMoreOperationsToExecute()); assertNull(context.getLocationToSync()); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -848,7 +794,7 @@ public void testFailedUpdatePreparationDoesNotTriggerRefresh() throws Exception BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.IMMEDIATE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); // Execute the bulk operation through performOnPrimary CountDownLatch latch = new CountDownLatch(1); @@ -1131,10 +1077,11 @@ public void testTranslogPositionToSync() throws Exception { public void testNoOpReplicationOnPrimaryDocumentFailure() throws Exception { final IndexShard shard = spy(newStartedShard(false)); - BulkItemRequest itemRequest = new BulkItemRequest(0, new IndexRequest("index").source(Requests.INDEX_CONTENT_TYPE)); final String failureMessage = "simulated primary failure"; final IOException exception = new IOException(failureMessage); - itemRequest.setPrimaryResponse( + BulkItemRequest itemRequest = new BulkItemRequest( + 0, + new IndexRequest("index").source(Requests.INDEX_CONTENT_TYPE), new BulkItemResponse( 0, randomFrom(DocWriteRequest.OpType.CREATE, DocWriteRequest.OpType.DELETE, DocWriteRequest.OpType.INDEX), @@ -1204,7 +1151,7 @@ public void testRetries() throws Exception { listener -> listener.onResponse(null), new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation)); - BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = result.replicaRequest().items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -1256,6 +1203,7 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items.toArray(BulkItemRequest[]::new)); + BulkShardRequest[] completedRequest = new BulkShardRequest[1]; final CountDownLatch latch = new CountDownLatch(1); Runnable runnable = () -> TransportShardBulkAction.performOnPrimary( bulkShardRequest, @@ -1267,8 +1215,9 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { assertEquals(nItems, result.replicaRequest().items().length); for (BulkItemRequest item : result.replicaRequest().items()) { - assertEquals(VersionConflictEngineException.class, item.getPrimaryResponse().getFailure().getCause().getClass()); + assertEquals(VersionConflictEngineException.class, item.primaryResponse().getFailure().getCause().getClass()); } + completedRequest[0] = result.replicaRequest(); }), latch), threadPool, Names.WRITE @@ -1280,8 +1229,8 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep // timeout the request in 10 seconds if there is an infinite loop assertTrue(latch.await(10, TimeUnit.SECONDS)); - items.forEach(item -> { - assertEquals(item.getPrimaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class); + for (BulkItemRequest item : completedRequest[0].items()) { + assertEquals(item.primaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class); // this assertion is based on the assumption that all bulk item requests are updates and are hence calling // UpdateRequest::prepareRequest @@ -1291,7 +1240,7 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep any(IndexShard.class), any(LongSupplier.class) ); - }); + } } public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { @@ -1344,10 +1293,11 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { when(shard.getFailedIndexResult(any(OpenSearchRejectedExecutionException.class), anyLong())).thenCallRealMethod(); when(shard.mapperService()).thenReturn(mock(MapperService.class)); - randomlySetIgnoredPrimaryResponse(items[0]); + items[0] = randomlySetIgnoredPrimaryResponse(items[0]); AtomicInteger updateCalled = new AtomicInteger(); + BulkShardRequest[] completedRequest = new BulkShardRequest[1]; final CountDownLatch latch = new CountDownLatch(1); TransportShardBulkAction.performOnPrimary( bulkShardRequest, @@ -1368,9 +1318,11 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { } }, listener -> listener.onResponse(null), - new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> - // Assert that we still need to fsync the location that was successfully written - assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation1))), latch), + new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { + // Assert that we still need to fsync the location that was successfully written + assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation1)); + completedRequest[0] = result.replicaRequest(); + }), latch), rejectingThreadPool, Names.WRITE ); @@ -1380,7 +1332,7 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); - BulkItemResponse primaryResponse1 = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse1 = completedRequest[0].items()[0].primaryResponse(); assertThat(primaryResponse1.getItemId(), equalTo(0)); assertThat(primaryResponse1.getId(), equalTo("id")); assertThat(primaryResponse1.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); @@ -1388,13 +1340,13 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { assertThat(primaryResponse1.getResponse().status(), equalTo(RestStatus.CREATED)); assertThat(primaryResponse1.getResponse().getSeqNo(), equalTo(10L)); - BulkItemResponse primaryResponse2 = bulkShardRequest.items()[1].getPrimaryResponse(); + BulkItemResponse primaryResponse2 = completedRequest[0].items()[1].primaryResponse(); assertThat(primaryResponse2.getItemId(), equalTo(1)); assertThat(primaryResponse2.getId(), equalTo("id")); assertThat(primaryResponse2.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); assertTrue(primaryResponse2.isFailed()); assertNull(primaryResponse2.getResponse()); - assertEquals(primaryResponse2.status(), RestStatus.TOO_MANY_REQUESTS); + assertEquals(RestStatus.TOO_MANY_REQUESTS, primaryResponse2.status()); assertThat(primaryResponse2.getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class)); closeShards(shard); @@ -1570,10 +1522,12 @@ private TransportChannel createTransportChannel(final PlainActionFuture