Skip to content

Prevent request corruption by avoiding failing entire bulk request on single shard rejection from shard backpressure#20727

Open
varunbharadwaj wants to merge 2 commits intoopensearch-project:mainfrom
varunbharadwaj:vb/shardindexpressure
Open

Prevent request corruption by avoiding failing entire bulk request on single shard rejection from shard backpressure#20727
varunbharadwaj wants to merge 2 commits intoopensearch-project:mainfrom
varunbharadwaj:vb/shardindexpressure

Conversation

@varunbharadwaj
Copy link
Copy Markdown
Contributor

Description

Shard backpressure rejections result in failing entire bulk request, releasing unsafe buffer which can cause request corruption. More details can be found in #20724. This PR updates the logic to treat shard level pressure check failures as regular shard failures, without failing the entire bulk request. This way, we continue to process other shards, and return shard level errors in the bulk response.

This change also solves the request corruption problem when shard level pressure check is enabled.

Related Issues

Resolves #20724

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions github-actions bot added bug Something isn't working Indexing Indexing, Bulk Indexing and anything related to indexing labels Feb 25, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Feb 25, 2026

PR Reviewer Guide 🔍

(Review updated until commit 52b3c0e)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Resource Leak Risk

The releasable acquired from markCoordinatingOperationStarted is not released when onShardFailure.accept(e) is called at line 784. This could lead to resource leaks if the pressure check fails, as the releasable should be closed before continuing to the next iteration.

final Releasable releasable;
try {
    releasable = indexingPressureService.markCoordinatingOperationStarted(
        shardId,
        bulkShardRequest::ramBytesUsed,
        isOnlySystem
    );
} catch (Exception e) {
    // If pressure check fails, treat it as a shard-level failure.
    onShardFailure.accept(e);
    continue;
}
Timing Inconsistency

The variable shardStartTimeNanos is defined at line 743 but is used in the success callback at line 813. However, if the pressure check fails and onShardFailure is called, the timing metric won't be recorded. Consider whether timing should be tracked differently for failed pressure checks.

final long shardStartTimeNanos = relativeTime();
final ShardRouting primary = routingTable.shardRoutingTable(shardId).primaryShard();
final String targetNodeId = primary != null ? primary.currentNodeId() : null;
IndexMetadata indexMetaData = clusterState.metadata().index(shardId.getIndexName());
final boolean bulkAdaptiveShardSelectionEnabled = indexMetaData.isAppendOnlyIndex()
    && indexMetaData.bulkAdaptiveShardSelectionEnabled();

// Define the failure handler that can be reused for both pressure check failures and execution failures
final Consumer<Exception> onShardFailure = (Exception e) -> {
    // create failures for all relevant requests
    for (BulkItemRequest request : requests) {
        final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
        final DocWriteRequest<?> docWriteRequest = request.request();
        final BulkItemResponse bulkItemResponse = new BulkItemResponse(
            request.id(),
            docWriteRequest.opType(),
            new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
        );

        docStatusStats.inc(bulkItemResponse.status());
        responses.set(request.id(), bulkItemResponse);
    }

    if (counter.decrementAndGet() == 0) {
        indicesService.addDocStatusStats(docStatusStats);
        listener.onResponse(
            new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
        );
    }
};

// Try to acquire shard-level indexing pressure permit
final Releasable releasable;
try {
    releasable = indexingPressureService.markCoordinatingOperationStarted(
        shardId,
        bulkShardRequest::ramBytesUsed,
        isOnlySystem
    );
} catch (Exception e) {
    // If pressure check fails, treat it as a shard-level failure.
    onShardFailure.accept(e);
    continue;
}

final Span span = tracer.startSpan(SpanBuilder.from("bulkShardAction", nodeId, bulkShardRequest));
boolean incrementedConnections = false;
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
    if (targetNodeId != null) {
        clientConnections.compute(targetNodeId, (id, conns) -> conns == null ? 1 : conns + 1);
        incrementedConnections = true;
    }
    shardBulkAction.execute(
        bulkShardRequest,
        TraceableActionListener.create(
            ActionListener.runAfter(ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
                @Override
                public void onResponse(BulkShardResponse bulkShardResponse) {
                    if (targetNodeId != null && bulkAdaptiveShardSelectionEnabled) {
                        if (bulkShardResponse.getNodeQueueSize() < 0 || bulkShardResponse.getServiceTimeEWMAInNanos() < 0) {
                            throw new IllegalStateException(
                                "node queue size and service time ewma must be non-negative, got "
                                    + bulkShardResponse.getNodeQueueSize()
                                    + " and "
                                    + bulkShardResponse.getServiceTimeEWMAInNanos()
                            );
                        }
                        nodeMetricsCollector.addNodeStatistics(
                            targetNodeId,
                            bulkShardResponse.getNodeQueueSize(),
                            relativeTime() - shardStartTimeNanos,
Exception Handling Gap

The catch block at line 855 now calls onShardFailure.accept(e) instead of throwing the exception. However, the releasable.close() is called before this. Verify that this ordering is correct and that the releasable should be closed before processing the failure, as the failure handler might need the resource.

} catch (Exception e) {
    releasable.close();
    if (incrementedConnections && targetNodeId != null) {
        clientConnections.computeIfPresent(targetNodeId, (id, conns) -> conns == 1 ? null : conns - 1);
    }
    span.setError(e);
    span.endSpan();
    // Treat execution failures as shard-level failures rather than failing the entire bulk request
    onShardFailure.accept(e);

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Feb 25, 2026

PR Code Suggestions ✨

Latest suggestions up to 52b3c0e

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Ensure releasable cleanup on execution failure

The onShardFailure consumer is invoked both when pressure check fails (before
acquiring the releasable) and when execution fails (after acquiring it). However,
when called from the pressure check failure path, no Releasable has been acquired
yet, so there's no cleanup needed. When called from the execution failure path via
onFailure(), the releasable should be closed to prevent resource leaks. Consider
ensuring proper cleanup of the releasable in the execution failure path.

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java [751-772]

 final Consumer<Exception> onShardFailure = (Exception e) -> {
     // create failures for all relevant requests
     for (BulkItemRequest request : requests) {
         final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
         final DocWriteRequest<?> docWriteRequest = request.request();
         final BulkItemResponse bulkItemResponse = new BulkItemResponse(
             request.id(),
             docWriteRequest.opType(),
             new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
         );
 
         docStatusStats.inc(bulkItemResponse.status());
         responses.set(request.id(), bulkItemResponse);
     }
 
     if (counter.decrementAndGet() == 0) {
         indicesService.addDocStatusStats(docStatusStats);
         listener.onResponse(
             new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
         );
     }
 };
 
+// Wrapper that ensures releasable cleanup when called from execution path
+final Consumer<Exception> onExecutionFailure = (Exception e) -> {
+    releasable.close();
+    onShardFailure.accept(e);
+};
+
Suggestion importance[1-10]: 10

__

Why: This is a critical resource leak issue. The releasable acquired from markCoordinatingOperationStarted must be closed in the onFailure() callback, but currently it's only closed in the catch block. The suggestion correctly identifies that onShardFailure is used in both paths (pressure check failure where no releasable exists, and execution failure where it does), creating an inconsistency that leads to resource leaks.

High
Inconsistent releasable cleanup in failure paths

The catch block closes the releasable and then calls onShardFailure.accept(e).
However, onShardFailure is also used in the pressure check failure path where no
releasable exists. This creates inconsistency - the releasable is closed here but
not in the onFailure() callback. The releasable should be closed consistently in
both execution failure paths (catch block and onFailure callback).

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java [855-864]

 } catch (Exception e) {
-    releasable.close();
     if (incrementedConnections && targetNodeId != null) {
         clientConnections.computeIfPresent(targetNodeId, (id, conns) -> conns == 1 ? null : conns - 1);
     }
     span.setError(e);
     span.endSpan();
     // Treat execution failures as shard-level failures rather than failing the entire bulk request
-    onShardFailure.accept(e);
+    onExecutionFailure.accept(e);
 }
Suggestion importance[1-10]: 8

__

Why: This suggestion correctly identifies the inconsistency in releasable cleanup. The catch block closes the releasable before calling onShardFailure, but the onFailure() callback at line 834 doesn't close it. However, this is essentially the same issue as suggestion 1, just viewed from a different code location. The improved code should use a wrapper that ensures cleanup.

Medium
General
Verify system flag per shard

The isOnlySystem flag is computed once before the loop but used inside the loop for
each shard. If the bulk request contains mixed system and non-system indices, this
could lead to incorrect pressure accounting. Consider computing isOnlySystem per
shard or verifying that all shards in the request have the same system/non-system
classification.

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java [727-786]

-final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
-
 for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
+    final ShardId shardId = entry.getKey();
+    final List<BulkItemRequest> requests = entry.getValue();
+    
+    // Compute isOnlySystem per shard to handle mixed system/non-system indices correctly
+    final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
+    
+    BulkShardRequest bulkShardRequest = new BulkShardRequest(
+        ...
+    );
     ...
     try {
         releasable = indexingPressureService.markCoordinatingOperationStarted(
             shardId,
             bulkShardRequest::ramBytesUsed,
             isOnlySystem
         );
     } catch (Exception e) {
-        // If pressure check fails, treat it as a shard-level failure.
         onShardFailure.accept(e);
         continue;
     }
Suggestion importance[1-10]: 3

__

Why: While the concern about mixed system/non-system indices is theoretically valid, the isOnlySystem method already checks the entire bulkRequest against all indices. Computing it per shard would be redundant since the method signature doesn't take a ShardId parameter. The current implementation appears correct for the intended use case, making this a low-priority optimization suggestion.

Low

Previous suggestions

Suggestions up to commit 1c2ad51
CategorySuggestion                                                                                                                                    Impact
Possible issue
Ensure correct variable capture

The onShardFailure consumer captures requests from the loop iteration, but when
called from this catch block, it references the current iteration's requests list.
Ensure that the lambda correctly captures the intended requests variable for each
shard, as the variable is reassigned in each loop iteration.

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java [855-864]

+final List<BulkItemRequest> capturedRequests = requests;
+// ... later in catch block ...
 } catch (Exception e) {
     releasable.close();
     if (incrementedConnections && targetNodeId != null) {
         clientConnections.computeIfPresent(targetNodeId, (id, conns) -> conns == 1 ? null : conns - 1);
     }
     span.setError(e);
     span.endSpan();
     // Treat execution failures as shard-level failures rather than failing the entire bulk request
     onShardFailure.accept(e);
 }
Suggestion importance[1-10]: 8

__

Why: This is a critical issue. The onShardFailure consumer is defined inside the loop and captures the requests variable. Since requests is reassigned in each iteration, the lambda may not capture the correct list when called from the catch block. This could lead to incorrect failure handling where the wrong requests are marked as failed.

Medium
General
Verify counter decrement logic

When the pressure check fails and onShardFailure.accept(e) is called, the Releasable
is never acquired, but the code continues to the next iteration. However, if this is
the last shard and counter.decrementAndGet() reaches 0 inside onShardFailure, the
response is sent. Verify that the counter logic correctly handles this scenario
where some shards fail during pressure check while others may still be processing.

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java [782-786]

 } catch (Exception e) {
     // If pressure check fails, treat it as a shard-level failure.
+    // The counter is decremented inside onShardFailure, ensuring proper completion tracking
     onShardFailure.accept(e);
     continue;
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies a potential concern about counter management when pressure checks fail. However, the code appears to handle this correctly since onShardFailure decrements the counter and sends the response when it reaches zero. The suggestion mainly asks for verification rather than identifying a concrete bug.

Low
Suggestions up to commit 7e90117
CategorySuggestion                                                                                                                                    Impact
Possible issue
Ensure resource cleanup on success

When an exception occurs after acquiring the releasable but before the action
completes, the releasable is closed in the catch block. However, if the action
succeeds, the releasable should be closed in the success callback. Currently,
there's no guarantee that releasable.close() is called on success, which could lead
to resource leaks and incorrect pressure tracking.

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java [855-863]

 } catch (Exception e) {
     releasable.close();
     if (incrementedConnections && targetNodeId != null) {
         clientConnections.computeIfPresent(targetNodeId, (id, conns) -> conns == 1 ? null : conns - 1);
     }
     span.setError(e);
     span.endSpan();
     // Treat execution failures as shard-level failures rather than failing the entire bulk request
     onShardFailure.accept(e);
 }
+// Note: Ensure releasable.close() is also called in the success path of the action listener
Suggestion importance[1-10]: 9

__

Why: This is a critical resource leak issue. The releasable acquired from indexingPressureService.markCoordinatingOperationStarted() must be closed in both success and failure paths. The PR only closes it in the catch block (line 856), but there's no corresponding releasable.close() in the success callback (onResponse). This will cause the indexing pressure permits to never be released on successful operations, leading to resource exhaustion and incorrect pressure tracking.

High
Add thread-safety for concurrent access

The onShardFailure consumer may be invoked concurrently from multiple threads (e.g.,
when multiple shards fail simultaneously). The docStatusStats object and responses
array are shared across all shards but are not thread-safe. This can lead to race
conditions where statistics are lost or responses are corrupted. Consider using
thread-safe operations or synchronization when updating shared state.

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java [751-772]

 final Consumer<Exception> onShardFailure = (Exception e) -> {
     // create failures for all relevant requests
     for (BulkItemRequest request : requests) {
         final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
         final DocWriteRequest<?> docWriteRequest = request.request();
         final BulkItemResponse bulkItemResponse = new BulkItemResponse(
             request.id(),
             docWriteRequest.opType(),
             new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
         );
 
-        docStatusStats.inc(bulkItemResponse.status());
+        synchronized (docStatusStats) {
+            docStatusStats.inc(bulkItemResponse.status());
+        }
         responses.set(request.id(), bulkItemResponse);
     }
 
     if (counter.decrementAndGet() == 0) {
         indicesService.addDocStatusStats(docStatusStats);
         listener.onResponse(
             new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
         );
     }
 };
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a potential race condition where docStatusStats and responses are accessed concurrently from multiple shard processing threads. The onShardFailure consumer can indeed be invoked from multiple threads when multiple shards fail, and without proper synchronization, this could lead to data corruption or lost statistics. However, the suggested fix only synchronizes docStatusStats updates but doesn't address potential issues with responses.set() or the counter.decrementAndGet() check.

Medium
Suggestions up to commit 09b6c8b
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent potential NullPointerException in failure handler

The onShardFailure consumer accesses concreteIndices which may be null after the
line bulkRequest = null; executes. If a shard failure occurs after bulkRequest is
nulled but before all shards complete, this will cause a NullPointerException. Store
a reference to concreteIndices before the loop or ensure it's not cleared
prematurely.

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java [751-772]

+final ConcreteIndices concreteIndicesRef = concreteIndices;
 final Consumer<Exception> onShardFailure = (Exception e) -> {
     // create failures for all relevant requests
     for (BulkItemRequest request : requests) {
-        final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
+        final String indexName = concreteIndicesRef.getConcreteIndex(request.index()).getName();
         final DocWriteRequest<?> docWriteRequest = request.request();
         final BulkItemResponse bulkItemResponse = new BulkItemResponse(
             request.id(),
             docWriteRequest.opType(),
             new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
         );
 
         docStatusStats.inc(bulkItemResponse.status());
         responses.set(request.id(), bulkItemResponse);
     }
 
     if (counter.decrementAndGet() == 0) {
         indicesService.addDocStatusStats(docStatusStats);
         listener.onResponse(
             new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
         );
     }
 };
Suggestion importance[1-10]: 9

__

Why: This is a critical bug. The onShardFailure consumer captures concreteIndices which can be nulled at line 866 (bulkRequest = null;). If a shard failure occurs after this point but before all shards complete, accessing concreteIndices will cause a NullPointerException. Storing a local reference prevents this race condition.

High
General
Ensure proper resource cleanup ordering

When an exception occurs in the catch block, releasable.close() is called but the
onShardFailure handler also needs to release resources. However, the releasable is
already closed here, which could lead to resource leaks if onShardFailure expects to
manage it. Consider passing the releasable state or ensuring proper cleanup
coordination.

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java [855-864]

 } catch (Exception e) {
-    releasable.close();
     if (incrementedConnections && targetNodeId != null) {
         clientConnections.computeIfPresent(targetNodeId, (id, conns) -> conns == 1 ? null : conns - 1);
     }
     span.setError(e);
     span.endSpan();
+    releasable.close();
     // Treat execution failures as shard-level failures rather than failing the entire bulk request
     onShardFailure.accept(e);
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion to reorder releasable.close() after onShardFailure.accept(e) is questionable. The current order ensures the releasable is closed before handling the failure, which is appropriate since the failure handler doesn't need the releasable. The suggested change doesn't provide clear benefits and may introduce issues if the failure handler throws an exception.

Low

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 7e90117

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 7e90117: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 7e90117: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@varunbharadwaj varunbharadwaj changed the title [WIP] Avoid failing entire bulk request on single shard rejection from shard backpressure Avoid failing entire bulk request on single shard rejection from shard backpressure Feb 25, 2026
@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 7e90117: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 7e90117: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit acabbad

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for acabbad: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 1c2ad51

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 1c2ad51: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 1c2ad51: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 1c2ad51: SUCCESS

@codecov
Copy link
Copy Markdown

codecov bot commented Feb 26, 2026

Codecov Report

❌ Patch coverage is 89.65517% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.26%. Comparing base (c12aa29) to head (52b3c0e).
⚠️ Report is 67 commits behind head on main.

Files with missing lines Patch % Lines
...rg/opensearch/action/bulk/TransportBulkAction.java 89.65% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #20727      +/-   ##
============================================
- Coverage     73.28%   73.26%   -0.03%     
+ Complexity    72066    72047      -19     
============================================
  Files          5786     5786              
  Lines        329620   329683      +63     
  Branches      47568    47572       +4     
============================================
- Hits         241574   241533      -41     
- Misses        68700    68766      +66     
- Partials      19346    19384      +38     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

…rejection

Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
@varunbharadwaj varunbharadwaj marked this pull request as ready for review February 26, 2026 06:05
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 52b3c0e

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 52b3c0e: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 52b3c0e: SUCCESS

@varunbharadwaj varunbharadwaj changed the title Avoid failing entire bulk request on single shard rejection from shard backpressure Avoid failing entire bulk request on single shard rejection from shard backpressure to avoid request corruption Feb 26, 2026
@varunbharadwaj varunbharadwaj changed the title Avoid failing entire bulk request on single shard rejection from shard backpressure to avoid request corruption Prevent requst corruption by avoiding failing entire bulk request on single shard rejection from shard backpressure Feb 26, 2026
@varunbharadwaj varunbharadwaj changed the title Prevent requst corruption by avoiding failing entire bulk request on single shard rejection from shard backpressure Prevent request corruption by avoiding failing entire bulk request on single shard rejection from shard backpressure Feb 27, 2026
@andrross
Copy link
Copy Markdown
Member

For batch style APIs in general, the best practice is that the overall request only fails with a non-200 response for things like failing authentication or invalid request syntax. Shard backpressure rejections are clearly shard-level failures and as such only apply to part of the batch _bulk request, so it seems that it is clearly a bug to fail the entire request as opposed to sending a 200 with individual document-level failures for the documents that were rejected due to backpressure. I mean, obviously request corruption is a bug but the behavior is wrong even if corruption wasn't possible. Do I understand that correctly?

However, tuning clients to handle backpressure properly is often a delicate balance so I do worry about the impact if today the service sends 429s and after this fix it starts sending 200s with the backpressure details in the response. @msfroh @Bukhtawar what do you think?

@varunbharadwaj
Copy link
Copy Markdown
Contributor Author

I mean, obviously request corruption is a bug but the behavior is wrong even if corruption wasn't possible. Do I understand that correctly?

Yeah, that was the idea behind this approach.

However, tuning clients to handle backpressure properly is often a delicate balance so I do worry about the impact if today the service sends 429s and after this fix it starts sending 200s with the backpressure details in the response.

This is a good point, agree. I was thinking if someone is relying on the 429s from shard level back-pressure mechanism, they are very likely impacted by the request corruption issue. Another alternative tested was to first apply the shard level checks on all shards (phase 1) and later send the shard requests (phase 2). So we fail the bulk request if any shard is overloaded. This will maintain better compatibility with current behavior. Open to suggestions on which approach might be preferred here.

@andrross
Copy link
Copy Markdown
Member

Another alternative tested was to first apply the shard level checks on all shards (phase 1) and later send the shard requests (phase 2). So we fail the bulk request if any shard is overloaded.

Honestly this seems like a less risky approach, but I'd like to get some opinions from others familiar with the indexing path, e.g. @Bukhtawar @mgodwan @gbbafna

It seems like there might be other aspects to this behavior change as well. With this change, in case of a single slow shard then indexing traffic will proceed a full throughput to the other shards. This seems good but could lead to data skew across shards if the slowness persists. That could be a big change to a system relying on the current behavior.

Comment on lines +767 to +770
indicesService.addDocStatusStats(docStatusStats);
listener.onResponse(
new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid duplicating finishHim()?

Comment on lines -845 to +863
throw e;
// Treat execution failures as shard-level failures rather than failing the entire bulk request
onShardFailure.accept(e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we need a test for this, unless an existing test already covers this?

@Bukhtawar
Copy link
Copy Markdown
Contributor

Another alternative tested was to first apply the shard level checks on all shards (phase 1) and later send the shard requests (phase 2). So we fail the bulk request if any shard is overloaded.

Honestly this seems like a less risky approach, but I'd like to get some opinions from others familiar with the indexing path, e.g. @Bukhtawar @mgodwan @gbbafna

It seems like there might be other aspects to this behavior change as well. With this change, in case of a single slow shard then indexing traffic will proceed a full throughput to the other shards. This seems good but could lead to data skew across shards if the slowness persists. That could be a big change to a system relying on the current behavior.

Do you think this can be enabled behind a bulk query param while still keeping the existing behavior as default?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working Indexing Indexing, Bulk Indexing and anything related to indexing

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] Shard indexing pressure results in request corruption

3 participants