diff --git a/CHANGELOG.md b/CHANGELOG.md index b3aa88360e2e8..7aeac4f28c8a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fallback to netty client if AWS Crt client is not available on the target platform / architecture ([#20698](https://github.com/opensearch-project/OpenSearch/pull/20698)) - Fix ShardSearchFailure in transport-grpc ([#20641](https://github.com/opensearch-project/OpenSearch/pull/20641)) - Fix TLS cert hot-reload for Arrow Flight transport ([#20732](https://github.com/opensearch-project/OpenSearch/pull/20732)) +- Fix shard indexing pressure request corruption on bulk indexing ([#20727](https://github.com/opensearch-project/OpenSearch/pull/20727)) ### Dependencies - Bump shadow-gradle-plugin from 8.3.9 to 9.3.1 ([#20569](https://github.com/opensearch-project/OpenSearch/pull/20569)) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/ShardIndexingPressureIT.java b/server/src/internalClusterTest/java/org/opensearch/index/ShardIndexingPressureIT.java index 3e9844b062177..131ecd0fd1c68 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/ShardIndexingPressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/ShardIndexingPressureIT.java @@ -328,15 +328,16 @@ public void testWritesRejectedForSingleCoordinatingShardDueToNodeLevelLimitBreac assertEquals(0, coordinatingShardTracker.getReplicaOperationTracker().getStatsTracker().getCurrentBytes()); }); - expectThrows(OpenSearchRejectedExecutionException.class, () -> { - if (randomBoolean()) { - client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); - } else if (randomBoolean()) { - client(primaryName).bulk(bulkRequest).actionGet(); - } else { - client(replicaName).bulk(bulkRequest).actionGet(); - } - }); + BulkResponse rejectedResponse; + if (randomBoolean()) { + rejectedResponse = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + } else if (randomBoolean()) { + rejectedResponse = client(primaryName).bulk(bulkRequest).actionGet(); + } else { + rejectedResponse = client(replicaName).bulk(bulkRequest).actionGet(); + } + assertTrue(rejectedResponse.hasFailures()); + assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class)); replicaRelease.close(); @@ -444,10 +445,10 @@ public void testWritesRejectedFairnessWithMultipleCoordinatingShardsDueToNodeLev }); // Large request on a shard with already large occupancy is rejected - expectThrows( - OpenSearchRejectedExecutionException.class, - () -> { client(coordinatingOnlyNode).bulk(largeBulkRequest).actionGet(); } - ); + // Shard indexing pressure failures are returned as item-level failures. + BulkResponse rejectedResponse = client(coordinatingOnlyNode).bulk(largeBulkRequest).actionGet(); + assertTrue(rejectedResponse.hasFailures()); + assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class)); replicaRelease.close(); successFuture.actionGet(); diff --git a/server/src/internalClusterTest/java/org/opensearch/index/ShardIndexingPressureSettingsIT.java b/server/src/internalClusterTest/java/org/opensearch/index/ShardIndexingPressureSettingsIT.java index a4ac256b67b9d..369d0a5269ed5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/ShardIndexingPressureSettingsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/ShardIndexingPressureSettingsIT.java @@ -44,6 +44,7 @@ import java.util.stream.Stream; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.instanceOf; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1) @SuppressForbidden(reason = "Need to fix: https://github.com/opensearch-project/OpenSearch/issues/14331") @@ -270,16 +271,19 @@ public void testShardIndexingPressureNodeLimitUpdateSetting() throws Exception { .build() ); - // Any node receiving the request will end up rejecting request due to node level limit breached - expectThrows(OpenSearchRejectedExecutionException.class, () -> { - if (randomBoolean()) { - client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); - } else if (randomBoolean()) { - client(primaryName).bulk(bulkRequest).actionGet(); - } else { - client(replicaName).bulk(bulkRequest).actionGet(); - } - }); + // Any node receiving the request will end up rejecting request due to shard-level limit breached. + // After the change to handle shard indexing pressure failures as item-level failures, + // the bulk request returns a BulkResponse with failures instead of throwing an exception. + BulkResponse rejectedResponse; + if (randomBoolean()) { + rejectedResponse = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + } else if (randomBoolean()) { + rejectedResponse = client(primaryName).bulk(bulkRequest).actionGet(); + } else { + rejectedResponse = client(replicaName).bulk(bulkRequest).actionGet(); + } + assertTrue(rejectedResponse.hasFailures()); + assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class)); } public void testShardIndexingPressureEnforcedEnabledDisabledSetting() throws Exception { @@ -337,7 +341,11 @@ public void testShardIndexingPressureEnforcedEnabledDisabledSetting() throws Exc Thread.sleep(25); // This request breaches the threshold and hence will be rejected - expectThrows(OpenSearchRejectedExecutionException.class, () -> client(coordinatingOnlyNode).bulk(bulkRequest).actionGet()); + // After the change to handle shard indexing pressure failures as item-level failures, + // the bulk request returns a BulkResponse with failures instead of throwing an exception. + BulkResponse rejectedResponse = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + assertTrue(rejectedResponse.hasFailures()); + assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class)); assertEquals(1, coordinatingShardTracker.getCoordinatingOperationTracker().getRejectionTracker().getTotalRejections()); assertEquals( 1, @@ -409,7 +417,11 @@ public void testShardIndexingPressureEnforcedEnabledDisabledSetting() throws Exc waitForTwoOutstandingRequests(coordinatingShardTracker); // This request breaches the threshold and hence will be rejected - expectThrows(OpenSearchRejectedExecutionException.class, () -> client(coordinatingOnlyNode).bulk(bulkRequest).actionGet()); + // After the change to handle shard indexing pressure failures as item-level failures, + // the bulk request returns a BulkResponse with failures instead of throwing an exception. + rejectedResponse = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + assertTrue(rejectedResponse.hasFailures()); + assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class)); // new rejection added to the actual rejection count assertEquals(2, coordinatingShardTracker.getCoordinatingOperationTracker().getRejectionTracker().getTotalRejections()); @@ -636,13 +648,17 @@ public void testShardIndexingPressureLastSuccessfulSettingsUpdate() throws Excep Thread.sleep(25); // This request breaches the threshold and hence will be rejected + // After the change to handle shard indexing pressure failures as item-level failures, + // the bulk request returns a BulkResponse with failures instead of throwing an exception. if (randomBoolean) { ShardIndexingPressureTracker coordinatingShardTracker = internalCluster().getInstance( IndexingPressureService.class, coordinatingOnlyNode ).getShardIndexingPressure().getShardIndexingPressureTracker(shardId); waitForTwoOutstandingRequests(coordinatingShardTracker); - expectThrows(OpenSearchRejectedExecutionException.class, () -> client(coordinatingOnlyNode).bulk(bulkRequest).actionGet()); + BulkResponse rejectedResponse = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + assertTrue(rejectedResponse.hasFailures()); + assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class)); assertEquals(1, coordinatingShardTracker.getCoordinatingOperationTracker().getRejectionTracker().getTotalRejections()); assertEquals( 1, @@ -655,7 +671,9 @@ public void testShardIndexingPressureLastSuccessfulSettingsUpdate() throws Excep .getShardIndexingPressure() .getShardIndexingPressureTracker(shardId); waitForTwoOutstandingRequests(primaryShardTracker); - expectThrows(OpenSearchRejectedExecutionException.class, () -> client(primaryName).bulk(bulkRequest).actionGet()); + BulkResponse rejectedResponse = client(primaryName).bulk(bulkRequest).actionGet(); + assertTrue(rejectedResponse.hasFailures()); + assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class)); assertEquals(1, primaryShardTracker.getCoordinatingOperationTracker().getRejectionTracker().getTotalRejections()); assertEquals( 1, @@ -805,11 +823,16 @@ public void testShardIndexingPressureRequestSizeWindowSettingUpdate() throws Exc successFuture.actionGet(); // This request breaches the threshold and hence will be rejected + // After the change to handle shard indexing pressure failures as item-level failures, + // the bulk request returns a BulkResponse with failures instead of throwing an exception. + BulkResponse rejectedResponse; if (randomBoolean) { - expectThrows(OpenSearchRejectedExecutionException.class, () -> client(coordinatingOnlyNode).bulk(bulkRequest).actionGet()); + rejectedResponse = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); } else { - expectThrows(OpenSearchRejectedExecutionException.class, () -> client(primaryName).bulk(bulkRequest).actionGet()); + rejectedResponse = client(primaryName).bulk(bulkRequest).actionGet(); } + assertTrue(rejectedResponse.hasFailures()); + assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class)); // Update the outstanding threshold setting to see no rejections ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index 0068b9d3da172..e6bbf9ae6d552 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -120,6 +120,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -723,6 +724,8 @@ protected void doRun() { final DocStatusStats docStatusStats = new DocStatusStats(); String nodeId = clusterService.localNode().getId(); + final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices); + for (Map.Entry> entry : requestsByShard.entrySet()) { final ShardId shardId = entry.getKey(); final List requests = entry.getValue(); @@ -737,20 +740,50 @@ protected void doRun() { if (task != null) { bulkShardRequest.setParentTask(nodeId, task.getId()); } - final long startTimeNanos = relativeTime(); + final long shardStartTimeNanos = relativeTime(); final ShardRouting primary = routingTable.shardRoutingTable(shardId).primaryShard(); - String targetNodeId = primary != null ? primary.currentNodeId() : null; + final String targetNodeId = primary != null ? primary.currentNodeId() : null; IndexMetadata indexMetaData = clusterState.metadata().index(shardId.getIndexName()); - boolean bulkAdaptiveShardSelectionEnabled = indexMetaData.isAppendOnlyIndex() + final boolean bulkAdaptiveShardSelectionEnabled = indexMetaData.isAppendOnlyIndex() && indexMetaData.bulkAdaptiveShardSelectionEnabled(); - // Add the shard level accounting for coordinating and supply the listener - final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices); - final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted( - shardId, - bulkShardRequest::ramBytesUsed, - isOnlySystem - ); + // Define the failure handler that can be reused for both pressure check failures and execution failures + final Consumer onShardFailure = (Exception e) -> { + // create failures for all relevant requests + for (BulkItemRequest request : requests) { + final String indexName = concreteIndices.getConcreteIndex(request.index()).getName(); + final DocWriteRequest docWriteRequest = request.request(); + final BulkItemResponse bulkItemResponse = new BulkItemResponse( + request.id(), + docWriteRequest.opType(), + new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) + ); + + docStatusStats.inc(bulkItemResponse.status()); + responses.set(request.id(), bulkItemResponse); + } + + if (counter.decrementAndGet() == 0) { + indicesService.addDocStatusStats(docStatusStats); + listener.onResponse( + new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)) + ); + } + }; + + // Try to acquire shard-level indexing pressure permit + final Releasable releasable; + try { + releasable = indexingPressureService.markCoordinatingOperationStarted( + shardId, + bulkShardRequest::ramBytesUsed, + isOnlySystem + ); + } catch (Exception e) { + // If pressure check fails, treat it as a shard-level failure. + onShardFailure.accept(e); + continue; + } final Span span = tracer.startSpan(SpanBuilder.from("bulkShardAction", nodeId, bulkShardRequest)); boolean incrementedConnections = false; @@ -777,7 +810,7 @@ public void onResponse(BulkShardResponse bulkShardResponse) { nodeMetricsCollector.addNodeStatistics( targetNodeId, bulkShardResponse.getNodeQueueSize(), - relativeTime() - startTimeNanos, + relativeTime() - shardStartTimeNanos, bulkShardResponse.getServiceTimeEWMAInNanos() ); } @@ -798,23 +831,7 @@ public void onResponse(BulkShardResponse bulkShardResponse) { @Override public void onFailure(Exception e) { - // create failures for all relevant requests - for (BulkItemRequest request : requests) { - final String indexName = concreteIndices.getConcreteIndex(request.index()).getName(); - final DocWriteRequest docWriteRequest = request.request(); - final BulkItemResponse bulkItemResponse = new BulkItemResponse( - request.id(), - docWriteRequest.opType(), - new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) - ); - - docStatusStats.inc(bulkItemResponse.status()); - responses.set(request.id(), bulkItemResponse); - } - - if (counter.decrementAndGet() == 0) { - finishHim(); - } + onShardFailure.accept(e); } private void finishHim() { @@ -842,7 +859,8 @@ private void finishHim() { } span.setError(e); span.endSpan(); - throw e; + // Treat execution failures as shard-level failures rather than failing the entire bulk request + onShardFailure.accept(e); } } bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java index 1575de055e25d..b0bd2ab131ba6 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java @@ -53,6 +53,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.cluster.service.ClusterService; @@ -88,6 +89,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; @@ -98,6 +100,7 @@ import static org.opensearch.cluster.metadata.MetadataCreateDataStreamServiceTests.createDataStream; import static org.opensearch.ingest.IngestServiceTests.createIngestServiceWithProcessors; import static org.opensearch.test.ClusterServiceUtils.createClusterService; +import static org.opensearch.test.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.mock; @@ -471,4 +474,106 @@ private IndexRoutingTable createIndexRoutingTable(int nodeCount, int shardCount) } return indexRoutingTable.build(); } + + /** + * Test that verifies when shard-level indexing pressure check fails in doRun(), + * the failure is handled gracefully and the bulk response contains failures for that shard. + */ + public void testDoRunHandlesShardIndexingPressureFailure() throws Exception { + // Create a new transport service to avoid handler conflicts + CapturingTransport newCapturingTransport = new CapturingTransport(); + TransportService newTransportService = newCapturingTransport.createTransportService( + clusterService.getSettings(), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> clusterService.localNode(), + null, + Collections.emptySet(), + NoopTracer.INSTANCE + ); + newTransportService.start(); + newTransportService.acceptIncomingRequests(); + + // Create IndexingPressureService that always throws on shard-level coordinating check + final String expectedErrorMessage = "Shard indexing pressure limit exceeded"; + IndexingPressureService throwingPressureService = new IndexingPressureService(Settings.EMPTY, clusterService) { + @Override + public org.opensearch.common.lease.Releasable markCoordinatingOperationStarted( + ShardId shardId, + LongSupplier bytes, + boolean forceExecution + ) { + throw new OpenSearchRejectedExecutionException(expectedErrorMessage); + } + }; + + // Create TransportBulkAction with the throwing pressure service + TransportBulkAction testAction = new TransportBulkAction( + threadPool, + newTransportService, + clusterService, + createIngestServiceWithProcessors(Collections.emptyMap()), + null, // shardBulkAction - will throw NPE if we get past pressure check, but we won't + null, + new ActionFilters(Collections.emptySet()), + new Resolver(), + new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(), new SystemIndices(emptyMap())), + throwingPressureService, + mock(IndicesService.class), + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE + ) { + @Override + protected boolean needToCheck() { + return false; // Skip auto-create index logic + } + }; + + // Set up cluster state with an index so requests resolve to shards + String indexName = "test-index"; + org.opensearch.core.index.Index index = new org.opensearch.core.index.Index(indexName, "test-uuid"); + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + ) + .build(); + + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node", true, ShardRoutingState.STARTED)); + + Metadata metadata = Metadata.builder().put(indexMetadata, false).build(); + RoutingTable routingTable = RoutingTable.builder().add(indexRoutingTableBuilder.build()).build(); + + ClusterState newClusterState = ClusterState.builder(clusterService.state()).metadata(metadata).routingTable(routingTable).build(); + + // Update cluster state using the utility method + setState(clusterService.getClusterApplierService(), newClusterState); + + // Create bulk request + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest(indexName).id("1").source(emptyMap())); + bulkRequest.add(new IndexRequest(indexName).id("2").source(emptyMap())); + + // Execute the bulk action + PlainActionFuture future = PlainActionFuture.newFuture(); + ActionTestUtils.execute(testAction, null, bulkRequest, future); + + // Get the response + BulkResponse response = future.actionGet(); + assertNotNull(response); + + // Verify all items failed with the expected error + BulkItemResponse[] items = response.getItems(); + assertEquals(2, items.length); + for (BulkItemResponse item : items) { + assertTrue("Item should be failed due to pressure rejection", item.isFailed()); + assertThat(item.getFailure().getCause().getMessage(), equalTo(expectedErrorMessage)); + } + + newTransportService.close(); + } + }