Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fallback to netty client if AWS Crt client is not available on the target platform / architecture ([#20698](https://github.com/opensearch-project/OpenSearch/pull/20698))
- Fix ShardSearchFailure in transport-grpc ([#20641](https://github.com/opensearch-project/OpenSearch/pull/20641))
- Fix TLS cert hot-reload for Arrow Flight transport ([#20732](https://github.com/opensearch-project/OpenSearch/pull/20732))
- Fix shard indexing pressure request corruption on bulk indexing ([#20727](https://github.com/opensearch-project/OpenSearch/pull/20727))

### Dependencies
- Bump shadow-gradle-plugin from 8.3.9 to 9.3.1 ([#20569](https://github.com/opensearch-project/OpenSearch/pull/20569))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,15 +328,16 @@ public void testWritesRejectedForSingleCoordinatingShardDueToNodeLevelLimitBreac
assertEquals(0, coordinatingShardTracker.getReplicaOperationTracker().getStatsTracker().getCurrentBytes());
});

expectThrows(OpenSearchRejectedExecutionException.class, () -> {
if (randomBoolean()) {
client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
} else if (randomBoolean()) {
client(primaryName).bulk(bulkRequest).actionGet();
} else {
client(replicaName).bulk(bulkRequest).actionGet();
}
});
BulkResponse rejectedResponse;
if (randomBoolean()) {
rejectedResponse = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
} else if (randomBoolean()) {
rejectedResponse = client(primaryName).bulk(bulkRequest).actionGet();
} else {
rejectedResponse = client(replicaName).bulk(bulkRequest).actionGet();
}
assertTrue(rejectedResponse.hasFailures());
assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class));

replicaRelease.close();

Expand Down Expand Up @@ -444,10 +445,10 @@ public void testWritesRejectedFairnessWithMultipleCoordinatingShardsDueToNodeLev
});

// Large request on a shard with already large occupancy is rejected
expectThrows(
OpenSearchRejectedExecutionException.class,
() -> { client(coordinatingOnlyNode).bulk(largeBulkRequest).actionGet(); }
);
// Shard indexing pressure failures are returned as item-level failures.
BulkResponse rejectedResponse = client(coordinatingOnlyNode).bulk(largeBulkRequest).actionGet();
assertTrue(rejectedResponse.hasFailures());
assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class));

replicaRelease.close();
successFuture.actionGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.instanceOf;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1)
@SuppressForbidden(reason = "Need to fix: https://github.com/opensearch-project/OpenSearch/issues/14331")
Expand Down Expand Up @@ -270,16 +271,19 @@ public void testShardIndexingPressureNodeLimitUpdateSetting() throws Exception {
.build()
);

// Any node receiving the request will end up rejecting request due to node level limit breached
expectThrows(OpenSearchRejectedExecutionException.class, () -> {
if (randomBoolean()) {
client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
} else if (randomBoolean()) {
client(primaryName).bulk(bulkRequest).actionGet();
} else {
client(replicaName).bulk(bulkRequest).actionGet();
}
});
// Any node receiving the request will end up rejecting request due to shard-level limit breached.
// After the change to handle shard indexing pressure failures as item-level failures,
// the bulk request returns a BulkResponse with failures instead of throwing an exception.
BulkResponse rejectedResponse;
if (randomBoolean()) {
rejectedResponse = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
} else if (randomBoolean()) {
rejectedResponse = client(primaryName).bulk(bulkRequest).actionGet();
} else {
rejectedResponse = client(replicaName).bulk(bulkRequest).actionGet();
}
assertTrue(rejectedResponse.hasFailures());
assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class));
}

public void testShardIndexingPressureEnforcedEnabledDisabledSetting() throws Exception {
Expand Down Expand Up @@ -337,7 +341,11 @@ public void testShardIndexingPressureEnforcedEnabledDisabledSetting() throws Exc
Thread.sleep(25);

// This request breaches the threshold and hence will be rejected
expectThrows(OpenSearchRejectedExecutionException.class, () -> client(coordinatingOnlyNode).bulk(bulkRequest).actionGet());
// After the change to handle shard indexing pressure failures as item-level failures,
// the bulk request returns a BulkResponse with failures instead of throwing an exception.
BulkResponse rejectedResponse = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
assertTrue(rejectedResponse.hasFailures());
assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class));
assertEquals(1, coordinatingShardTracker.getCoordinatingOperationTracker().getRejectionTracker().getTotalRejections());
assertEquals(
1,
Expand Down Expand Up @@ -409,7 +417,11 @@ public void testShardIndexingPressureEnforcedEnabledDisabledSetting() throws Exc
waitForTwoOutstandingRequests(coordinatingShardTracker);

// This request breaches the threshold and hence will be rejected
expectThrows(OpenSearchRejectedExecutionException.class, () -> client(coordinatingOnlyNode).bulk(bulkRequest).actionGet());
// After the change to handle shard indexing pressure failures as item-level failures,
// the bulk request returns a BulkResponse with failures instead of throwing an exception.
rejectedResponse = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
assertTrue(rejectedResponse.hasFailures());
assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class));

// new rejection added to the actual rejection count
assertEquals(2, coordinatingShardTracker.getCoordinatingOperationTracker().getRejectionTracker().getTotalRejections());
Expand Down Expand Up @@ -636,13 +648,17 @@ public void testShardIndexingPressureLastSuccessfulSettingsUpdate() throws Excep
Thread.sleep(25);

// This request breaches the threshold and hence will be rejected
// After the change to handle shard indexing pressure failures as item-level failures,
// the bulk request returns a BulkResponse with failures instead of throwing an exception.
if (randomBoolean) {
ShardIndexingPressureTracker coordinatingShardTracker = internalCluster().getInstance(
IndexingPressureService.class,
coordinatingOnlyNode
).getShardIndexingPressure().getShardIndexingPressureTracker(shardId);
waitForTwoOutstandingRequests(coordinatingShardTracker);
expectThrows(OpenSearchRejectedExecutionException.class, () -> client(coordinatingOnlyNode).bulk(bulkRequest).actionGet());
BulkResponse rejectedResponse = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
assertTrue(rejectedResponse.hasFailures());
assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class));
assertEquals(1, coordinatingShardTracker.getCoordinatingOperationTracker().getRejectionTracker().getTotalRejections());
assertEquals(
1,
Expand All @@ -655,7 +671,9 @@ public void testShardIndexingPressureLastSuccessfulSettingsUpdate() throws Excep
.getShardIndexingPressure()
.getShardIndexingPressureTracker(shardId);
waitForTwoOutstandingRequests(primaryShardTracker);
expectThrows(OpenSearchRejectedExecutionException.class, () -> client(primaryName).bulk(bulkRequest).actionGet());
BulkResponse rejectedResponse = client(primaryName).bulk(bulkRequest).actionGet();
assertTrue(rejectedResponse.hasFailures());
assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class));
assertEquals(1, primaryShardTracker.getCoordinatingOperationTracker().getRejectionTracker().getTotalRejections());
assertEquals(
1,
Expand Down Expand Up @@ -805,11 +823,16 @@ public void testShardIndexingPressureRequestSizeWindowSettingUpdate() throws Exc
successFuture.actionGet();

// This request breaches the threshold and hence will be rejected
// After the change to handle shard indexing pressure failures as item-level failures,
// the bulk request returns a BulkResponse with failures instead of throwing an exception.
BulkResponse rejectedResponse;
if (randomBoolean) {
expectThrows(OpenSearchRejectedExecutionException.class, () -> client(coordinatingOnlyNode).bulk(bulkRequest).actionGet());
rejectedResponse = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
} else {
expectThrows(OpenSearchRejectedExecutionException.class, () -> client(primaryName).bulk(bulkRequest).actionGet());
rejectedResponse = client(primaryName).bulk(bulkRequest).actionGet();
}
assertTrue(rejectedResponse.hasFailures());
assertThat(rejectedResponse.getItems()[0].getFailure().getCause(), instanceOf(OpenSearchRejectedExecutionException.class));

// Update the outstanding threshold setting to see no rejections
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -723,6 +724,8 @@ protected void doRun() {
final DocStatusStats docStatusStats = new DocStatusStats();
String nodeId = clusterService.localNode().getId();

final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);

for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
Expand All @@ -737,20 +740,50 @@ protected void doRun() {
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
}
final long startTimeNanos = relativeTime();
final long shardStartTimeNanos = relativeTime();
final ShardRouting primary = routingTable.shardRoutingTable(shardId).primaryShard();
String targetNodeId = primary != null ? primary.currentNodeId() : null;
final String targetNodeId = primary != null ? primary.currentNodeId() : null;
IndexMetadata indexMetaData = clusterState.metadata().index(shardId.getIndexName());
boolean bulkAdaptiveShardSelectionEnabled = indexMetaData.isAppendOnlyIndex()
final boolean bulkAdaptiveShardSelectionEnabled = indexMetaData.isAppendOnlyIndex()
&& indexMetaData.bulkAdaptiveShardSelectionEnabled();

// Add the shard level accounting for coordinating and supply the listener
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(
shardId,
bulkShardRequest::ramBytesUsed,
isOnlySystem
);
// Define the failure handler that can be reused for both pressure check failures and execution failures
final Consumer<Exception> onShardFailure = (Exception e) -> {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);

docStatusStats.inc(bulkItemResponse.status());
responses.set(request.id(), bulkItemResponse);
}

if (counter.decrementAndGet() == 0) {
indicesService.addDocStatusStats(docStatusStats);
listener.onResponse(
new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
);
Comment on lines +767 to +770
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid duplicating finishHim()?

}
};

// Try to acquire shard-level indexing pressure permit
final Releasable releasable;
try {
releasable = indexingPressureService.markCoordinatingOperationStarted(
shardId,
bulkShardRequest::ramBytesUsed,
isOnlySystem
);
} catch (Exception e) {
// If pressure check fails, treat it as a shard-level failure.
onShardFailure.accept(e);
continue;
}

final Span span = tracer.startSpan(SpanBuilder.from("bulkShardAction", nodeId, bulkShardRequest));
boolean incrementedConnections = false;
Expand All @@ -777,7 +810,7 @@ public void onResponse(BulkShardResponse bulkShardResponse) {
nodeMetricsCollector.addNodeStatistics(
targetNodeId,
bulkShardResponse.getNodeQueueSize(),
relativeTime() - startTimeNanos,
relativeTime() - shardStartTimeNanos,
bulkShardResponse.getServiceTimeEWMAInNanos()
);
}
Expand All @@ -798,23 +831,7 @@ public void onResponse(BulkShardResponse bulkShardResponse) {

@Override
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);

docStatusStats.inc(bulkItemResponse.status());
responses.set(request.id(), bulkItemResponse);
}

if (counter.decrementAndGet() == 0) {
finishHim();
}
onShardFailure.accept(e);
}

private void finishHim() {
Expand Down Expand Up @@ -842,7 +859,8 @@ private void finishHim() {
}
span.setError(e);
span.endSpan();
throw e;
// Treat execution failures as shard-level failures rather than failing the entire bulk request
onShardFailure.accept(e);
Comment on lines -845 to +863
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we need a test for this, unless an existing test already covers this?

}
}
bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed
Expand Down
Loading
Loading