From 3b38a8a42f7771b94cf176c5b00015de223170be Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Tue, 10 Mar 2026 16:04:46 -0700 Subject: [PATCH 1/3] Make BulkItemRequest immutable I was talking with @itschrispeck about some JIT optimization issues in BulkItemRequest's serialization. While looking at the code, the `volatile` keyword on the `primaryResponse` field made me cringe. Why is a `BulkItemRequest` mutable at all? It turns out that we modify the existing `BulkItemRequest` instances on the primary shard. These modified requests are send to the replicas. This change makes `BulkItemRequest` immutable. The primary execution context collects all of the primary responses, then produces a new `BulkShardRequest` that gets forwarded to replicas. Signed-off-by: Michael Froh --- .../action/bulk/BulkItemRequest.java | 48 ++------ .../action/bulk/BulkItemResponse.java | 1 - .../bulk/BulkPrimaryExecutionContext.java | 52 +++------ .../BulkPrimaryExecutionContextTests.java | 57 ---------- .../bulk/TransportShardBulkActionTests.java | 107 ++++-------------- 5 files changed, 49 insertions(+), 216 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java b/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java index b0ba44c308b95..8c7b6bde503af 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java @@ -36,15 +36,12 @@ import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.action.DocWriteRequest; import org.opensearch.common.Nullable; -import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.index.shard.ShardId; -import org.opensearch.core.xcontent.MediaTypeRegistry; import java.io.IOException; -import java.util.Objects; /** * Transport request for a Single bulk item @@ -55,9 +52,9 @@ public class BulkItemRequest implements Writeable, Accountable { private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkItemRequest.class); - private int id; - private DocWriteRequest request; - private volatile BulkItemResponse primaryResponse; + private final int id; + private final DocWriteRequest request; + private final BulkItemResponse primaryResponse; /** * @param shardId the shard id @@ -71,13 +68,20 @@ public class BulkItemRequest implements Writeable, Accountable { } else { primaryResponse = new BulkItemResponse(shardId, in); } + } else { + primaryResponse = null; } } // NOTE: public for testing only public BulkItemRequest(int id, DocWriteRequest request) { + this(id, request, null); + } + + BulkItemRequest(int id, DocWriteRequest request, BulkItemResponse primaryResponse) { this.id = id; this.request = request; + this.primaryResponse = primaryResponse; } public int id() { @@ -97,38 +101,6 @@ BulkItemResponse getPrimaryResponse() { return primaryResponse; } - void setPrimaryResponse(BulkItemResponse primaryResponse) { - this.primaryResponse = primaryResponse; - } - - /** - * Abort this request, and store a {@link org.opensearch.action.bulk.BulkItemResponse.Failure} response. - * - * @param index The concrete index that was resolved for this request - * @param cause The cause of the rejection (may not be null) - * @throws IllegalStateException If a response already exists for this request - */ - public void abort(String index, Exception cause) { - if (primaryResponse == null) { - final BulkItemResponse.Failure failure = new BulkItemResponse.Failure(index, request.id(), Objects.requireNonNull(cause), true); - setPrimaryResponse(new BulkItemResponse(id, request.opType(), failure)); - } else { - assert primaryResponse.isFailed() && primaryResponse.getFailure().isAborted() : "response [" - + Strings.toString(MediaTypeRegistry.JSON, primaryResponse) - + "]; cause [" - + cause - + "]"; - if (primaryResponse.isFailed() && primaryResponse.getFailure().isAborted()) { - primaryResponse.getFailure().getCause().addSuppressed(cause); - } else { - throw new IllegalStateException( - "aborting item that with response [" + primaryResponse + "] that was previously processed", - cause - ); - } - } - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(id); diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java b/server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java index e43fb2854be91..1334afbabf0be 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java @@ -421,7 +421,6 @@ public long getTerm() { /** * Whether this failure is the result of an abort. * If {@code true}, the request to which this failure relates should never be retried, regardless of the {@link #getCause() cause}. - * @see BulkItemRequest#abort(String, Exception) */ public boolean isAborted() { return aborted; diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java index fcdf0bc8a4bb1..392d5ec465b37 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java @@ -45,9 +45,6 @@ import java.util.Arrays; -import static org.opensearch.action.bulk.BulkShardResponse.DEFAULT_QUEUE_SIZE; -import static org.opensearch.action.bulk.BulkShardResponse.DEFAULT_SERVICE_TIME_IN_NANOS; - /** * This is a utility class that holds the per request state needed to perform bulk operations on the primary. * More specifically, it maintains an index to the current executing bulk item, which allows execution @@ -91,13 +88,15 @@ enum ItemProcessingState { private int currentIndex = -1; private ItemProcessingState currentItemState; - private DocWriteRequest requestToExecute; + private DocWriteRequest requestToExecute; private BulkItemResponse executionResult; private int retryCounter; + private final BulkItemResponse[] primaryResponses; BulkPrimaryExecutionContext(BulkShardRequest request, IndexShard primary) { this.request = request; this.primary = primary; + this.primaryResponses = new BulkItemResponse[request.items().length]; advance(); } @@ -131,7 +130,12 @@ public DocWriteRequest getCurrent() { } public BulkShardRequest getBulkShardRequest() { - return request; + BulkItemRequest[] newRequests = new BulkItemRequest[request.items().length]; + for (int i = 0; i < newRequests.length; i++) { + BulkItemRequest oldRequest = request.items()[i]; + newRequests[i] = new BulkItemRequest(oldRequest.id(), oldRequest.request(), primaryResponses[i]); + } + return new BulkShardRequest(request.shardId(), request.getRefreshPolicy(), newRequests); } /** returns the result of the request that has been executed on the shard */ @@ -145,21 +149,11 @@ public int getRetryCounter() { return retryCounter; } - /** returns true if the current request has been executed on the primary */ - public boolean isOperationExecuted() { - return currentItemState == ItemProcessingState.EXECUTED; - } - /** returns true if the request needs to wait for a mapping update to arrive from the cluster-manager */ public boolean requiresWaitingForMappingUpdate() { return currentItemState == ItemProcessingState.WAIT_FOR_MAPPING_UPDATE; } - /** returns true if the current request should be retried without waiting for an external event */ - public boolean requiresImmediateRetry() { - return currentItemState == ItemProcessingState.IMMEDIATE_RETRY; - } - /** * returns true if the current request has been completed and it's result translated to a user * facing response @@ -206,10 +200,10 @@ public IndexShard getPrimary() { } /** - * sets the request that should actually be executed on the primary. This can be different then the request + * sets the request that should actually be executed on the primary. This can be different from the request * received from the user (specifically, an update request is translated to an indexing or delete request). */ - public void setRequestToExecute(DocWriteRequest writeRequest) { + public void setRequestToExecute(DocWriteRequest writeRequest) { assert assertInvariants(ItemProcessingState.INITIAL); requestToExecute = writeRequest; currentItemState = ItemProcessingState.TRANSLATED; @@ -217,6 +211,7 @@ public void setRequestToExecute(DocWriteRequest writeRequest) { } /** returns the request that should be executed on the shard. */ + @SuppressWarnings("unchecked") public > T getRequestToExecute() { assert assertInvariants(ItemProcessingState.TRANSLATED); return (T) requestToExecute; @@ -252,7 +247,7 @@ public void markOperationAsNoOp(DocWriteResponse response) { public void failOnMappingUpdate(Exception cause) { assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE); currentItemState = ItemProcessingState.EXECUTED; - final DocWriteRequest docWriteRequest = getCurrentItem().request(); + final DocWriteRequest docWriteRequest = getCurrentItem().request(); executionResult = new BulkItemResponse( getCurrentItem().id(), docWriteRequest.opType(), @@ -267,7 +262,7 @@ public void failOnMappingUpdate(Exception cause) { public void markOperationAsExecuted(Engine.Result result) { assertInvariants(ItemProcessingState.TRANSLATED); final BulkItemRequest current = getCurrentItem(); - DocWriteRequest docWriteRequest = getRequestToExecute(); + DocWriteRequest docWriteRequest = getRequestToExecute(); switch (result.getResultType()) { case SUCCESS: final DocWriteResponse response; @@ -347,7 +342,7 @@ public void markAsCompleted(BulkItemResponse translatedResponse) { if (translatedResponse.isFailed() == false && requestToExecute != null && requestToExecute != getCurrent()) { request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute); } - getCurrentItem().setPrimaryResponse(translatedResponse); + primaryResponses[currentIndex] = translatedResponse; currentItemState = ItemProcessingState.COMPLETED; advance(); } @@ -355,16 +350,7 @@ public void markAsCompleted(BulkItemResponse translatedResponse) { /** builds the bulk shard response to return to the user */ public BulkShardResponse buildShardResponse(long serviceTimeEWMAInNanos, int nodeQueueSize) { assert hasMoreOperationsToExecute() == false; - return new BulkShardResponse( - request.shardId(), - Arrays.stream(request.items()).map(BulkItemRequest::getPrimaryResponse).toArray(BulkItemResponse[]::new), - serviceTimeEWMAInNanos, - nodeQueueSize - ); - } - - public BulkShardResponse buildShardResponse() { - return buildShardResponse(DEFAULT_SERVICE_TIME_IN_NANOS, DEFAULT_QUEUE_SIZE); + return new BulkShardResponse(request.shardId(), primaryResponses, serviceTimeEWMAInNanos, nodeQueueSize); } private boolean assertInvariants(ItemProcessingState... expectedCurrentState) { @@ -379,7 +365,7 @@ private boolean assertInvariants(ItemProcessingState... expectedCurrentState) { assert requestToExecute == null : requestToExecute; assert executionResult == null : executionResult; break; - case TRANSLATED: + case TRANSLATED, IMMEDIATE_RETRY: assert requestToExecute != null; assert executionResult == null : executionResult; break; @@ -387,10 +373,6 @@ private boolean assertInvariants(ItemProcessingState... expectedCurrentState) { assert requestToExecute == null; assert executionResult == null : executionResult; break; - case IMMEDIATE_RETRY: - assert requestToExecute != null; - assert executionResult == null : executionResult; - break; case EXECUTED: // requestToExecute can be null if the update ended up as NOOP assert executionResult != null; diff --git a/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java b/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java index 9745203e91586..19020f21d38bb 100644 --- a/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java @@ -40,75 +40,18 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateRequest; -import org.opensearch.core.index.AppendOnlyIndexOperationRetryException; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.translog.Translog; import org.opensearch.test.OpenSearchTestCase; -import java.util.ArrayList; - import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class BulkPrimaryExecutionContextTests extends OpenSearchTestCase { - public void testAbortedSkipped() { - BulkShardRequest shardRequest = generateRandomRequest(); - - ArrayList> nonAbortedRequests = new ArrayList<>(); - for (BulkItemRequest request : shardRequest.items()) { - if (randomBoolean()) { - request.abort("index", new OpenSearchException("bla")); - } else { - nonAbortedRequests.add(request.request()); - } - } - - ArrayList> visitedRequests = new ArrayList<>(); - for (BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, null); context - .hasMoreOperationsToExecute();) { - visitedRequests.add(context.getCurrent()); - context.setRequestToExecute(context.getCurrent()); - // using failures prevents caring about types - context.markOperationAsExecuted(new Engine.IndexResult(new OpenSearchException("bla"), 1)); - context.markAsCompleted(context.getExecutionResult()); - } - - assertThat(visitedRequests, equalTo(nonAbortedRequests)); - } - - public void testAppendOnlyIndexOperationRetryException() { - BulkShardRequest shardRequest = generateRandomRequest(); - - final IndexShard primary = mock(IndexShard.class); - when(primary.shardId()).thenReturn(shardRequest.shardId()); - ArrayList> nonAbortedRequests = new ArrayList<>(); - for (BulkItemRequest request : shardRequest.items()) { - if (randomBoolean()) { - request.abort("index", new AppendOnlyIndexOperationRetryException("Indexing operation retried for append only indices")); - } else { - nonAbortedRequests.add(request.request()); - } - } - - ArrayList> visitedRequests = new ArrayList<>(); - for (BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, primary); context - .hasMoreOperationsToExecute();) { - visitedRequests.add(context.getCurrent()); - context.setRequestToExecute(context.getCurrent()); - // using failures prevents caring about types - context.markOperationAsExecuted( - new Engine.IndexResult(new AppendOnlyIndexOperationRetryException("Indexing operation retried for append only indices"), 1) - ); - context.markAsCompleted(context.getExecutionResult()); - } - - assertThat(visitedRequests, equalTo(nonAbortedRequests)); - } - private BulkShardRequest generateRandomRequest() { BulkItemRequest[] items = new BulkItemRequest[randomInt(20)]; for (int i = 0; i < items.length; i++) { diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index 00bd99dd4b349..7fa13ee815eec 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -33,7 +33,6 @@ package org.opensearch.action.bulk; import org.opensearch.OpenSearchException; -import org.opensearch.OpenSearchStatusException; import org.opensearch.Version; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.DocWriteResponse; @@ -112,8 +111,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.anyInt; @@ -159,7 +156,7 @@ public void testExecuteBulkIndexRequest() throws Exception { items[0] = primaryRequest; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -190,7 +187,7 @@ public void testExecuteBulkIndexRequest() throws Exception { items[0] = primaryRequest; bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -229,70 +226,6 @@ public void testExecuteBulkIndexRequest() throws Exception { closeShards(shard); } - public void testSkipBulkIndexRequestIfAborted() throws Exception { - IndexShard shard = newStartedShard(true); - - BulkItemRequest[] items = new BulkItemRequest[randomIntBetween(2, 5)]; - for (int i = 0; i < items.length; i++) { - DocWriteRequest writeRequest = new IndexRequest("index").id("id_" + i) - .source(Requests.INDEX_CONTENT_TYPE) - .opType(DocWriteRequest.OpType.INDEX); - items[i] = new BulkItemRequest(i, writeRequest); - } - BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - - // Preemptively abort one of the bulk items, but allow the others to proceed - BulkItemRequest rejectItem = randomFrom(items); - RestStatus rejectionStatus = randomFrom(RestStatus.BAD_REQUEST, RestStatus.CONFLICT, RestStatus.FORBIDDEN, RestStatus.LOCKED); - final OpenSearchStatusException rejectionCause = new OpenSearchStatusException("testing rejection", rejectionStatus); - rejectItem.abort("index", rejectionCause); - - final CountDownLatch latch = new CountDownLatch(1); - TransportShardBulkAction.performOnPrimary( - bulkShardRequest, - shard, - null, - threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), - listener -> {}, - ActionListener.runAfter(ActionTestUtils.assertNoFailureListener(result -> { - // since at least 1 item passed, the tran log location should exist, - assertThat(((WritePrimaryResult) result).location, notNullValue()); - // and the response should exist and match the item count - assertThat(result.finalResponseIfSuccessful, notNullValue()); - assertThat(result.finalResponseIfSuccessful.getResponses(), arrayWithSize(items.length)); - - // check each response matches the input item, including the rejection - for (int i = 0; i < items.length; i++) { - BulkItemResponse response = result.finalResponseIfSuccessful.getResponses()[i]; - assertThat(response.getItemId(), equalTo(i)); - assertThat(response.getIndex(), equalTo("index")); - assertThat(response.getId(), equalTo("id_" + i)); - assertThat(response.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); - if (response.getItemId() == rejectItem.id()) { - assertTrue(response.isFailed()); - assertThat(response.getFailure().getCause(), equalTo(rejectionCause)); - assertThat(response.status(), equalTo(rejectionStatus)); - } else { - assertFalse(response.isFailed()); - } - } - - // Check that the non-rejected updates made it to the shard - try { - assertDocCount(shard, items.length - 1); - closeShards(shard); - } catch (IOException e) { - throw new AssertionError(e); - } - }), latch::countDown), - threadPool, - Names.WRITE - ); - - latch.await(); - } - public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { BulkItemRequest[] items = new BulkItemRequest[1]; @@ -313,7 +246,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { ); when(shard.mapperService()).thenReturn(mock(MapperService.class)); - randomlySetIgnoredPrimaryResponse(items[0]); + items[0] = randomlySetIgnoredPrimaryResponse(items[0]); // Pretend the mappings haven't made it to the node yet BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); @@ -372,7 +305,7 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex boolean errorOnWait = randomBoolean(); - randomlySetIgnoredPrimaryResponse(items[0]); + items[0] = randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); final CountDownLatch latch = new CountDownLatch(1); @@ -425,7 +358,7 @@ public void testExecuteBulkDeleteRequest() throws Exception { Translog.Location location = new Translog.Location(0, 0, 0); - randomlySetIgnoredPrimaryResponse(items[0]); + items[0] = randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -472,7 +405,7 @@ public void testExecuteBulkDeleteRequest() throws Exception { location = context.getLocationToSync(); - randomlySetIgnoredPrimaryResponse(items[0]); + items[0] = randomlySetIgnoredPrimaryResponse(items[0]); context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -535,7 +468,7 @@ public void testNoopUpdateRequest() throws Exception { BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -590,7 +523,7 @@ public void testUpdateRequestWithFailure() throws Exception { BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -647,7 +580,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -704,7 +637,7 @@ public void testUpdateRequestWithSuccess() throws Exception { BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -760,7 +693,7 @@ public void testUpdateWithDelete() throws Exception { BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -797,7 +730,7 @@ public void testFailureDuringUpdateProcessing() throws Exception { BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( @@ -848,7 +781,7 @@ public void testFailedUpdatePreparationDoesNotTriggerRefresh() throws Exception BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.IMMEDIATE, items); - randomlySetIgnoredPrimaryResponse(primaryRequest); + items[0] = randomlySetIgnoredPrimaryResponse(primaryRequest); // Execute the bulk operation through performOnPrimary CountDownLatch latch = new CountDownLatch(1); @@ -1131,10 +1064,11 @@ public void testTranslogPositionToSync() throws Exception { public void testNoOpReplicationOnPrimaryDocumentFailure() throws Exception { final IndexShard shard = spy(newStartedShard(false)); - BulkItemRequest itemRequest = new BulkItemRequest(0, new IndexRequest("index").source(Requests.INDEX_CONTENT_TYPE)); final String failureMessage = "simulated primary failure"; final IOException exception = new IOException(failureMessage); - itemRequest.setPrimaryResponse( + BulkItemRequest itemRequest = new BulkItemRequest( + 0, + new IndexRequest("index").source(Requests.INDEX_CONTENT_TYPE), new BulkItemResponse( 0, randomFrom(DocWriteRequest.OpType.CREATE, DocWriteRequest.OpType.DELETE, DocWriteRequest.OpType.INDEX), @@ -1344,7 +1278,7 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { when(shard.getFailedIndexResult(any(OpenSearchRejectedExecutionException.class), anyLong())).thenCallRealMethod(); when(shard.mapperService()).thenReturn(mock(MapperService.class)); - randomlySetIgnoredPrimaryResponse(items[0]); + items[0] = randomlySetIgnoredPrimaryResponse(items[0]); AtomicInteger updateCalled = new AtomicInteger(); @@ -1570,10 +1504,12 @@ private TransportChannel createTransportChannel(final PlainActionFuture Date: Fri, 20 Mar 2026 11:56:16 -0700 Subject: [PATCH 2/3] Fix TransportShardBulkActionTests These tests relied on the assumption that the BulkShardRequest would be mutated on the primary. In particular, there were some ridiculous tests that were verifying that the output was unchanged from the input, when the output was the same object as the input, which had been changed in place. Signed-off-by: Michael Froh --- .../bulk/TransportShardBulkActionTests.java | 68 ++++++++++++------- 1 file changed, 43 insertions(+), 25 deletions(-) diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index 7fa13ee815eec..649853ea635de 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -167,12 +167,13 @@ public void testExecuteBulkIndexRequest() throws Exception { listener -> {}, ASSERTING_DONE_LISTENER ); + BulkShardRequest completedRequest = context.getBulkShardRequest(); assertFalse(context.hasMoreOperationsToExecute()); // Translog should change, since there were no problems assertNotNull(context.getLocationToSync()); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); @@ -198,13 +199,14 @@ public void testExecuteBulkIndexRequest() throws Exception { listener -> {}, ASSERTING_DONE_LISTENER ); + completedRequest = secondContext.getBulkShardRequest(); assertFalse(context.hasMoreOperationsToExecute()); assertNull(secondContext.getLocationToSync()); - BulkItemRequest replicaRequest = bulkShardRequest.items()[0]; + BulkItemRequest replicaRequest = completedRequest.items()[0]; - primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + primaryResponse = replicaRequest.getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); @@ -219,7 +221,9 @@ public void testExecuteBulkIndexRequest() throws Exception { assertThat(failure.getCause().getMessage(), containsString("version conflict, document already exists (current version [1])")); assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT)); - assertThat(replicaRequest, equalTo(primaryRequest)); + assertEquals(primaryRequest.request(), replicaRequest.request()); + assertEquals(primaryRequest.index(), replicaRequest.index()); + assertEquals(primaryRequest.id(), replicaRequest.id()); // Assert that the document count is still 1 assertDocCount(shard, 1); @@ -277,12 +281,13 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { listener -> {}, ASSERTING_DONE_LISTENER ); + BulkShardRequest completedRequest = context.getBulkShardRequest(); // Verify that the shard "executed" the operation only once (1 for previous invocations plus // 1 for this execution) verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); @@ -327,11 +332,12 @@ public void onFailure(final Exception e) { ); latch.await(); assertFalse(context.hasMoreOperationsToExecute()); + BulkShardRequest completedRequest = context.getBulkShardRequest(); // Translog shouldn't be synced, as there were conflicting mappings assertThat(context.getLocationToSync(), nullValue()); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); // Since this was not a conflict failure, the primary response // should be filled out with the failure information @@ -370,11 +376,12 @@ public void testExecuteBulkDeleteRequest() throws Exception { ASSERTING_DONE_LISTENER ); assertFalse(context.hasMoreOperationsToExecute()); + BulkShardRequest completedRequest = context.getBulkShardRequest(); // Translog changes, even though the document didn't exist assertThat(context.getLocationToSync(), not(location)); - BulkItemRequest replicaRequest = bulkShardRequest.items()[0]; + BulkItemRequest replicaRequest = completedRequest.items()[0]; DocWriteRequest replicaDeleteRequest = replicaRequest.request(); BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse(); DeleteResponse response = primaryResponse.getResponse(); @@ -417,11 +424,12 @@ public void testExecuteBulkDeleteRequest() throws Exception { ASSERTING_DONE_LISTENER ); assertFalse(context.hasMoreOperationsToExecute()); + completedRequest = context.getBulkShardRequest(); // Translog changes, because the document was deleted assertThat(context.getLocationToSync(), not(location)); - replicaRequest = bulkShardRequest.items()[0]; + replicaRequest = completedRequest.items()[0]; replicaDeleteRequest = replicaRequest.request(); primaryResponse = replicaRequest.getPrimaryResponse(); response = primaryResponse.getResponse(); @@ -479,19 +487,19 @@ public void testNoopUpdateRequest() throws Exception { listener -> {}, ASSERTING_DONE_LISTENER ); + BulkShardRequest completedRequest = context.getBulkShardRequest(); assertFalse(context.hasMoreOperationsToExecute()); // Basically nothing changes in the request since it's a noop assertThat(context.getLocationToSync(), nullValue()); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); assertThat(primaryResponse.getResponse(), equalTo(noopUpdateResponse)); assertThat(primaryResponse.getResponse().getResult(), equalTo(DocWriteResponse.Result.NOOP)); - assertThat(bulkShardRequest.items().length, equalTo(1)); - assertEquals(primaryRequest, bulkShardRequest.items()[0]); // check that bulk item was not mutated + assertThat(completedRequest.items().length, equalTo(1)); assertThat(primaryResponse.getResponse().getSeqNo(), equalTo(0L)); } @@ -539,7 +547,8 @@ public void testUpdateRequestWithFailure() throws Exception { // Since this was not a conflict failure, the primary response // should be filled out with the failure information assertNull(context.getLocationToSync()); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkShardRequest completedRequest = context.getBulkShardRequest(); + BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -591,10 +600,11 @@ public void testUpdateRequestWithConflictFailure() throws Exception { listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER ); + BulkShardRequest completedRequest = context.getBulkShardRequest(); assertFalse(context.hasMoreOperationsToExecute()); assertNull(context.getLocationToSync()); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -648,14 +658,15 @@ public void testUpdateRequestWithSuccess() throws Exception { listener -> {}, ASSERTING_DONE_LISTENER ); + BulkShardRequest completedRequest = context.getBulkShardRequest(); assertFalse(context.hasMoreOperationsToExecute()); // Check that the translog is successfully advanced assertThat(context.getLocationToSync(), equalTo(resultLocation)); - assertThat(bulkShardRequest.items()[0].request(), equalTo(updateResponse)); + assertThat(completedRequest.items()[0].request(), equalTo(updateResponse)); // Since this was not a conflict failure, the primary response // should be filled out with the failure information - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -704,12 +715,13 @@ public void testUpdateWithDelete() throws Exception { listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER ); + BulkShardRequest completedRequest = context.getBulkShardRequest(); assertFalse(context.hasMoreOperationsToExecute()); // Check that the translog is successfully advanced assertThat(context.getLocationToSync(), equalTo(resultLocation)); - assertThat(bulkShardRequest.items()[0].request(), equalTo(updateResponse)); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + assertThat(completedRequest.items()[0].request(), equalTo(updateResponse)); + BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -741,10 +753,11 @@ public void testFailureDuringUpdateProcessing() throws Exception { listener -> {}, ASSERTING_DONE_LISTENER ); + BulkShardRequest completedRequest = context.getBulkShardRequest(); assertFalse(context.hasMoreOperationsToExecute()); assertNull(context.getLocationToSync()); - BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -1190,6 +1203,7 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items.toArray(BulkItemRequest[]::new)); + BulkShardRequest[] completedRequest = new BulkShardRequest[1]; final CountDownLatch latch = new CountDownLatch(1); Runnable runnable = () -> TransportShardBulkAction.performOnPrimary( bulkShardRequest, @@ -1203,6 +1217,7 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep for (BulkItemRequest item : result.replicaRequest().items()) { assertEquals(VersionConflictEngineException.class, item.getPrimaryResponse().getFailure().getCause().getClass()); } + completedRequest[0] = result.replicaRequest(); }), latch), threadPool, Names.WRITE @@ -1214,7 +1229,7 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep // timeout the request in 10 seconds if there is an infinite loop assertTrue(latch.await(10, TimeUnit.SECONDS)); - items.forEach(item -> { + for (BulkItemRequest item : completedRequest[0].items()) { assertEquals(item.getPrimaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class); // this assertion is based on the assumption that all bulk item requests are updates and are hence calling @@ -1225,7 +1240,7 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep any(IndexShard.class), any(LongSupplier.class) ); - }); + } } public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { @@ -1282,6 +1297,7 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { AtomicInteger updateCalled = new AtomicInteger(); + BulkShardRequest[] completedRequest = new BulkShardRequest[1]; final CountDownLatch latch = new CountDownLatch(1); TransportShardBulkAction.performOnPrimary( bulkShardRequest, @@ -1302,9 +1318,11 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { } }, listener -> listener.onResponse(null), - new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> - // Assert that we still need to fsync the location that was successfully written - assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation1))), latch), + new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { + // Assert that we still need to fsync the location that was successfully written + assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation1)); + completedRequest[0] = result.replicaRequest(); + }), latch), rejectingThreadPool, Names.WRITE ); @@ -1314,7 +1332,7 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); - BulkItemResponse primaryResponse1 = bulkShardRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse1 = completedRequest[0].items()[0].getPrimaryResponse(); assertThat(primaryResponse1.getItemId(), equalTo(0)); assertThat(primaryResponse1.getId(), equalTo("id")); assertThat(primaryResponse1.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); @@ -1322,7 +1340,7 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { assertThat(primaryResponse1.getResponse().status(), equalTo(RestStatus.CREATED)); assertThat(primaryResponse1.getResponse().getSeqNo(), equalTo(10L)); - BulkItemResponse primaryResponse2 = bulkShardRequest.items()[1].getPrimaryResponse(); + BulkItemResponse primaryResponse2 = completedRequest[0].items()[1].getPrimaryResponse(); assertThat(primaryResponse2.getItemId(), equalTo(1)); assertThat(primaryResponse2.getId(), equalTo("id")); assertThat(primaryResponse2.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); From 3ca0f4a02f5078903b04eeee3fff2c8bbcedc38b Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Tue, 24 Mar 2026 11:21:27 -0700 Subject: [PATCH 3/3] Make BulkItemRequest a record and fix BulkShardRequest clone Follow @andrross's suggestion of converting BulkItemRequest to a record, since it's immutable now. Also, we were seeing test failures because the cloned BulkShardRequest (that is propagated to replicas) did not copy the primary request's parent TaskId. Along with that, I made sure it copied all other properties from the primary shard request. Signed-off-by: Michael Froh --- .../action/bulk/BulkItemRequest.java | 37 ++++--------------- .../bulk/BulkPrimaryExecutionContext.java | 13 +++++-- .../action/bulk/TransportShardBulkAction.java | 4 +- .../bulk/TransportShardBulkActionTests.java | 36 +++++++++--------- 4 files changed, 38 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java b/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java index 8c7b6bde503af..8b25f3a5b33e5 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkItemRequest.java @@ -48,29 +48,26 @@ * * @opensearch.internal */ -public class BulkItemRequest implements Writeable, Accountable { +public record BulkItemRequest(int id, DocWriteRequest request, BulkItemResponse primaryResponse) implements Writeable, Accountable { private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkItemRequest.class); - private final int id; - private final DocWriteRequest request; - private final BulkItemResponse primaryResponse; - /** * @param shardId the shard id */ BulkItemRequest(@Nullable ShardId shardId, StreamInput in) throws IOException { - id = in.readVInt(); - request = DocWriteRequest.readDocumentRequest(shardId, in); + this(in.readVInt(), DocWriteRequest.readDocumentRequest(shardId, in), readPrimaryResponse(shardId, in)); + } + + private static BulkItemResponse readPrimaryResponse(ShardId shardId, StreamInput in) throws IOException { if (in.readBoolean()) { if (shardId == null) { - primaryResponse = new BulkItemResponse(in); + return new BulkItemResponse(in); } else { - primaryResponse = new BulkItemResponse(shardId, in); + return new BulkItemResponse(shardId, in); } - } else { - primaryResponse = null; } + return null; } // NOTE: public for testing only @@ -78,29 +75,11 @@ public BulkItemRequest(int id, DocWriteRequest request) { this(id, request, null); } - BulkItemRequest(int id, DocWriteRequest request, BulkItemResponse primaryResponse) { - this.id = id; - this.request = request; - this.primaryResponse = primaryResponse; - } - - public int id() { - return id; - } - - public DocWriteRequest request() { - return request; - } - public String index() { assert request.indices().length == 1; return request.indices()[0]; } - BulkItemResponse getPrimaryResponse() { - return primaryResponse; - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(id); diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java index 392d5ec465b37..860f1f97cc62c 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java @@ -102,7 +102,7 @@ enum ItemProcessingState { private int findNextNonAborted(int startIndex) { final int length = request.items().length; - while (startIndex < length && isAborted(request.items()[startIndex].getPrimaryResponse())) { + while (startIndex < length && isAborted(request.items()[startIndex].primaryResponse())) { startIndex++; } return startIndex; @@ -135,7 +135,14 @@ public BulkShardRequest getBulkShardRequest() { BulkItemRequest oldRequest = request.items()[i]; newRequests[i] = new BulkItemRequest(oldRequest.id(), oldRequest.request(), primaryResponses[i]); } - return new BulkShardRequest(request.shardId(), request.getRefreshPolicy(), newRequests); + BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), request.getRefreshPolicy(), newRequests); + // Clone other properties from primary shard request + // See TransportBulkAction.BulkOperation#doRun() for construction of the primary shard request. + bulkShardRequest.waitForActiveShards(request.waitForActiveShards()); + bulkShardRequest.timeout(request.timeout()); + bulkShardRequest.routedBasedOnClusterVersion(request.routedBasedOnClusterVersion()); + bulkShardRequest.setParentTask(request.getParentTask()); + return bulkShardRequest; } /** returns the result of the request that has been executed on the shard */ @@ -380,7 +387,7 @@ private boolean assertInvariants(ItemProcessingState... expectedCurrentState) { case COMPLETED: assert requestToExecute != null; assert executionResult != null; - assert getCurrentItem().getPrimaryResponse() != null; + assert getCurrentItem().primaryResponse() != null; break; } return true; diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 3f06354de9b5d..cf8a6801c4627 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -860,9 +860,9 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index Translog.Location location = null; for (int i = 0; i < request.items().length; i++) { final BulkItemRequest item = request.items()[i]; - final BulkItemResponse response = item.getPrimaryResponse(); + final BulkItemResponse response = item.primaryResponse(); final Engine.Result operationResult; - if (item.getPrimaryResponse().isFailed()) { + if (item.primaryResponse().isFailed()) { if (response.getFailure().getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) { continue; // ignore replication as we didn't generate a sequence number for this request. } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index 649853ea635de..966910c8687bd 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -173,7 +173,7 @@ public void testExecuteBulkIndexRequest() throws Exception { // Translog should change, since there were no problems assertNotNull(context.getLocationToSync()); - BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); @@ -206,7 +206,7 @@ public void testExecuteBulkIndexRequest() throws Exception { BulkItemRequest replicaRequest = completedRequest.items()[0]; - primaryResponse = replicaRequest.getPrimaryResponse(); + primaryResponse = replicaRequest.primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); @@ -287,7 +287,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { // 1 for this execution) verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); - BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); @@ -337,7 +337,7 @@ public void onFailure(final Exception e) { // Translog shouldn't be synced, as there were conflicting mappings assertThat(context.getLocationToSync(), nullValue()); - BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); // Since this was not a conflict failure, the primary response // should be filled out with the failure information @@ -383,7 +383,7 @@ public void testExecuteBulkDeleteRequest() throws Exception { BulkItemRequest replicaRequest = completedRequest.items()[0]; DocWriteRequest replicaDeleteRequest = replicaRequest.request(); - BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse(); + BulkItemResponse primaryResponse = replicaRequest.primaryResponse(); DeleteResponse response = primaryResponse.getResponse(); // Any version can be matched on replica @@ -431,7 +431,7 @@ public void testExecuteBulkDeleteRequest() throws Exception { replicaRequest = completedRequest.items()[0]; replicaDeleteRequest = replicaRequest.request(); - primaryResponse = replicaRequest.getPrimaryResponse(); + primaryResponse = replicaRequest.primaryResponse(); response = primaryResponse.getResponse(); // Any version can be matched on replica @@ -493,7 +493,7 @@ public void testNoopUpdateRequest() throws Exception { // Basically nothing changes in the request since it's a noop assertThat(context.getLocationToSync(), nullValue()); - BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -548,7 +548,7 @@ public void testUpdateRequestWithFailure() throws Exception { // should be filled out with the failure information assertNull(context.getLocationToSync()); BulkShardRequest completedRequest = context.getBulkShardRequest(); - BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -604,7 +604,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { assertFalse(context.hasMoreOperationsToExecute()); assertNull(context.getLocationToSync()); - BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -666,7 +666,7 @@ public void testUpdateRequestWithSuccess() throws Exception { assertThat(completedRequest.items()[0].request(), equalTo(updateResponse)); // Since this was not a conflict failure, the primary response // should be filled out with the failure information - BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -721,7 +721,7 @@ public void testUpdateWithDelete() throws Exception { // Check that the translog is successfully advanced assertThat(context.getLocationToSync(), equalTo(resultLocation)); assertThat(completedRequest.items()[0].request(), equalTo(updateResponse)); - BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -757,7 +757,7 @@ public void testFailureDuringUpdateProcessing() throws Exception { assertFalse(context.hasMoreOperationsToExecute()); assertNull(context.getLocationToSync()); - BulkItemResponse primaryResponse = completedRequest.items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = completedRequest.items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -1151,7 +1151,7 @@ public void testRetries() throws Exception { listener -> listener.onResponse(null), new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation)); - BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse = result.replicaRequest().items()[0].primaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); @@ -1215,7 +1215,7 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { assertEquals(nItems, result.replicaRequest().items().length); for (BulkItemRequest item : result.replicaRequest().items()) { - assertEquals(VersionConflictEngineException.class, item.getPrimaryResponse().getFailure().getCause().getClass()); + assertEquals(VersionConflictEngineException.class, item.primaryResponse().getFailure().getCause().getClass()); } completedRequest[0] = result.replicaRequest(); }), latch), @@ -1230,7 +1230,7 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep assertTrue(latch.await(10, TimeUnit.SECONDS)); for (BulkItemRequest item : completedRequest[0].items()) { - assertEquals(item.getPrimaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class); + assertEquals(item.primaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class); // this assertion is based on the assumption that all bulk item requests are updates and are hence calling // UpdateRequest::prepareRequest @@ -1332,7 +1332,7 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); - BulkItemResponse primaryResponse1 = completedRequest[0].items()[0].getPrimaryResponse(); + BulkItemResponse primaryResponse1 = completedRequest[0].items()[0].primaryResponse(); assertThat(primaryResponse1.getItemId(), equalTo(0)); assertThat(primaryResponse1.getId(), equalTo("id")); assertThat(primaryResponse1.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); @@ -1340,13 +1340,13 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { assertThat(primaryResponse1.getResponse().status(), equalTo(RestStatus.CREATED)); assertThat(primaryResponse1.getResponse().getSeqNo(), equalTo(10L)); - BulkItemResponse primaryResponse2 = completedRequest[0].items()[1].getPrimaryResponse(); + BulkItemResponse primaryResponse2 = completedRequest[0].items()[1].primaryResponse(); assertThat(primaryResponse2.getItemId(), equalTo(1)); assertThat(primaryResponse2.getId(), equalTo("id")); assertThat(primaryResponse2.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); assertTrue(primaryResponse2.isFailed()); assertNull(primaryResponse2.getResponse()); - assertEquals(primaryResponse2.status(), RestStatus.TOO_MANY_REQUESTS); + assertEquals(RestStatus.TOO_MANY_REQUESTS, primaryResponse2.status()); assertThat(primaryResponse2.getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class)); closeShards(shard);