Fix X-Opaque-Id header propagation (along with other response headers) for streaming Reactor Netty 4 transport#20371
Conversation
📝 WalkthroughWalkthroughPropagates response headers (including X-Opaque-Id) through Reactor Netty 4 streaming, exposes headers on StreamingResponse, allows RequestOptions on StreamingRequest, defers streaming HttpContent until the emitter is ready via a bounded in-memory queue, prevents duplicate header emission, and adds integration tests for X-Opaque-Id. Changes
Sequence DiagramsequenceDiagram
participant Client
participant StreamingRequest
participant DefaultStreamingRestChannel
participant ReactorNetty4StreamingHttpChannel
participant ReactorNetty4StreamingResponseProducer
participant StreamingResponse
Client->>StreamingRequest: attach RequestOptions (X-Opaque-Id)
StreamingRequest->>DefaultStreamingRestChannel: send request
Note over DefaultStreamingRestChannel: prepareResponse enriches headers\n(includes thread-context headers + preserves X-Opaque-Id)
DefaultStreamingRestChannel->>ReactorNetty4StreamingHttpChannel: prepareResponse(with enriched headers)
alt Headers not yet sent
ReactorNetty4StreamingHttpChannel->>ReactorNetty4StreamingResponseProducer: initialize emitter
end
par Content delivery
ReactorNetty4StreamingResponseProducer->>ReactorNetty4StreamingResponseProducer: if emitter not ready -> enqueue DelayedHttpContent (bounded)
ReactorNetty4StreamingResponseProducer->>Client: if emitter ready -> emit HttpContent
end
ReactorNetty4StreamingResponseProducer->>ReactorNetty4StreamingResponseProducer: on subscribe -> drain queue and emit deferred items
Client->>StreamingResponse: getHeaders() / getHeader(name)
StreamingResponse->>Client: return headers including X-Opaque-Id
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
7b2f644 to
774f24b
Compare
774f24b to
ec1ae0f
Compare
|
❕ Gradle check result for ec1ae0f: UNSTABLE Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #20371 +/- ##
============================================
- Coverage 73.30% 73.17% -0.13%
+ Complexity 71777 71698 -79
============================================
Files 5784 5784
Lines 328141 328196 +55
Branches 47269 47276 +7
============================================
- Hits 240531 240167 -364
- Misses 68329 68801 +472
+ Partials 19281 19228 -53 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
74204d4 to
9da2234
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
Fix all issues with AI Agents 🤖
In @CHANGELOG.md:
- Line 30: The changelog entry references the wrong GitHub repository path in
the PR URL; update the URL in the CHANGELOG.md entry that reads "Fix X-Opaque-Id
header propagation ...
([#20371](https://github.com/opensearch-project/security/pull/20371))" so the
link points to the OpenSearch repo by replacing "opensearch-project/security"
with "opensearch-project/OpenSearch" (preserve the PR number and surrounding
text).
In
@plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java:
- Around line 154-156: The StepVerifier chain created with
StepVerifier.create(Flux.from(streamingResponse.getBody()).map(...)).expectNextMatches(...).expectComplete()
never invokes .verify(), so the reactive assertions are not executed; update the
chain used in ReactorNetty4StreamingIT (the StepVerifier.create(...) call that
ends with .expectComplete()) to append a terminal .verify() call to actually run
the verification.
In
@plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java:
- Around line 27-31: The buffering queue uses a non-thread-safe ArrayDeque
(queue) and can be concurrently accessed from send() and subscribe(); replace
the ArrayDeque with a thread-safe implementation (e.g., change the declaration
of queue in ReactorNetty4StreamingResponseProducer to use
ConcurrentLinkedQueue<DelayedHttpContent>) and add the corresponding import so
send() and subscribe() can safely offer/poll without additional synchronization;
ensure all uses of queue (including any iterating) are safe for
ConcurrentLinkedQueue semantics.
In @server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java:
- Around line 98-117: prepareResponse currently overwrites multi-valued headers
when iterating threadContext.getResponseHeaders() because it repeatedly does
enriched.put(headerEntry.getKey(), List.of(headerValue)); instead, merge values
into the existing list in enriched: for each headerEntry from
threadContext.getResponseHeaders() use
enriched.computeIfAbsent(headerEntry.getKey(), k -> new ArrayList<>()) and
addAll(headerEntry.getValue()) (or append each headerValue) so existing entries
(including X_OPAQUE_ID and original headers) are preserved; then call
streamingHttpChannel.prepareResponse(status.getStatus(), enriched) as before.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (1)
42-60: Ensure HttpContent resources are released on error paths.When content is deferred in the queue, the
HttpContentobjects hold references to native buffers. If an error occurs before the queue is drained (e.g., connection closed, subscription cancelled), these buffers may leak.Recommended approach
Add error handling and cleanup logic:
- Implement a cleanup method to drain and release queued content on error or cancellation
- Add a
doOnCancelordoOnErrorhandler in the Flux.create that releases queued content- Ensure ActionListener.onFailure is called for queued items on cleanup
Example pattern:
ReactorNetty4StreamingResponseProducer() { this.sender = Flux.create(emitter -> { register(emitter); emitter.onDispose(() -> { // Release any queued content DelayedHttpContent content; while ((content = queue.poll()) != null) { content.content().release(); content.listener().onFailure(new Exception("Cancelled")); } }); }); }
🧹 Nitpick comments (1)
client/rest/src/main/java/org/opensearch/client/StreamingRequest.java (1)
80-102: LGTM! Well-designed API addition.The two
setOptionsoverloads provide a clean and convenient way to configure request options, with proper null validation and clear documentation. The builder overload is a nice ergonomic touch.One consideration: the
optionsfield is mutable and not thread-safe. IfStreamingRequestinstances are shared across threads or if options are modified after the request is submitted to the reactive pipeline, visibility issues could arise. However, this is likely acceptable if the intended pattern is to fully configure the request before use.Optional: Consider documenting thread-safety expectations
If concurrent access isn't intended, consider adding a note to the class or method javadoc stating that
StreamingRequestshould be configured before being passed to the client and should not be modified concurrently.Alternatively, if thread safety is required, consider making
optionsvolatile or using synchronization:- private RequestOptions options = RequestOptions.DEFAULT; + private volatile RequestOptions options = RequestOptions.DEFAULT;
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
CHANGELOG.mdclient/rest/src/main/java/org/opensearch/client/StreamingRequest.javaclient/rest/src/main/java/org/opensearch/client/StreamingResponse.javaplugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.javaplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.javaplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.javaserver/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Applied to files:
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.javaplugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.javaplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.javaCHANGELOG.md
📚 Learning: 2025-12-13T20:16:15.318Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.
Applied to files:
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
📚 Learning: 2025-12-12T13:31:51.234Z
Learnt from: andriyredko
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:642-646
Timestamp: 2025-12-12T13:31:51.234Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), the createBuilderWithPort() helper intentionally uses randomBoolean() for the HTTP/3 enabled setting to ensure all tests validate correct behavior with both HTTP/3 enabled and disabled configurations.
Applied to files:
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
🧬 Code graph analysis (1)
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (1)
libs/core/src/main/java/org/opensearch/core/xcontent/ObjectPath.java (1)
ObjectPath(42-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (19)
- GitHub Check: gradle-check
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: Analyze (java)
🔇 Additional comments (8)
client/rest/src/main/java/org/opensearch/client/StreamingResponse.java (2)
98-106: LGTM! Header extraction follows established pattern.The implementation correctly mirrors the existing
getStatusLine()andgetWarnings()methods with consistent error handling.
108-121: LGTM! Single header extraction correctly implemented.The method appropriately handles null cases through
mapNotNulland follows the same blocking pattern as other getters in this class.server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java (2)
44-44: LGTM! ThreadContext properly captured for header enrichment.The field is correctly initialized and enables access to custom response headers for streaming responses.
Also applies to: 59-59
119-123: LGTM! sendResponse override ensures headers are prepared.The override correctly prepares headers via the streaming channel before delegating to the superclass, ensuring proper header propagation for streaming responses.
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (1)
63-70: Queue drainage logic looks correct but relies on thread-safety fix.The deferred content is properly replayed after subscription. However, this assumes the queue access is thread-safe (see previous comment).
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java (1)
84-89: The hasSentHeaders guard prevents the IllegalArgumentException, but verify that prepareResponse is always invoked from the same EventLoop thread.The guard effectively prevents duplicate header emission by checking
hasSentHeaders()before setting status and headers. However, this check alone is not atomic with the subsequent operations. The thread-safety guarantee depends on Reactor Netty's HttpServerResponse being accessed exclusively from a single EventLoop thread per connection. If prepareResponse could be invoked from different threads (e.g., different EventLoops), a race condition is theoretically possible between the check and header operations. The current pattern in RestBulkStreamingAction and RestController suggests prepareResponse is called early before streaming, mitigating this risk, but consider documenting this expectation or adding explicit synchronization if cross-thread calls become possible in the future.plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (2)
12-12: LGTM: Import necessary for header configuration.The RequestOptions import is required for the new tests that configure X-Opaque-Id headers.
90-133: LGTM: Comprehensive test for X-Opaque-Id header propagation.The test correctly validates that the X-Opaque-Id header is propagated from the request to the response in streaming scenarios. The test structure is consistent with existing tests and includes appropriate assertions.
...sport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java
Outdated
Show resolved
Hide resolved
9da2234 to
743bbcb
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (3)
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (1)
29-31: Optional: Remove unnecessary semicolon after record declaration.The semicolon after the record closing brace is superfluous. While syntactically valid, it's not idiomatic Java.
🔎 Proposed fix
// Holds the {@code HttpContent} for deferred delivery -private record DelayedHttpContent(HttpContent content, ActionListener<Void> listener, boolean isLast) { -}; +private record DelayedHttpContent(HttpContent content, ActionListener<Void> listener, boolean isLast) { +}client/rest/src/main/java/org/opensearch/client/StreamingResponse.java (2)
98-106: LGTM: Consistent implementation following established patterns.The
getHeaders()method correctly follows the same reactive pattern and error handling approach asgetStatusLine()andgetWarnings(). The implementation properly extracts headers from the cached publisher and handles exceptions gracefully.Optional: Enhance Javadoc for API clarity
Consider adding a
@returntag to document the return value:/** * Returns a list of all headers returned in the response. + * @return array of headers, or empty array if no headers present */ public Header[] getHeaders() {
108-121: LGTM: Excellent null safety with double mapNotNull.The
getHeader(String name)method is well-implemented with proper null handling. The doublemapNotNull()chain correctly handles both the case where no matching header exists and where a header exists but has a null value. The error handling is consistent with other methods in the class.Optional: Complete Javadoc with standard tags
Consider adding
@paramand@returntags for API documentation completeness:/** * Returns the value of the first header with a specified name of this message. * If there is more than one matching header in the message the first element is returned. * If there is no matching header in the message <code>null</code> is returned. * * @param name header name + * @return the value of the first matching header, or null if no matching header exists */ public String getHeader(String name) {
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
CHANGELOG.mdclient/rest/src/main/java/org/opensearch/client/StreamingRequest.javaclient/rest/src/main/java/org/opensearch/client/StreamingResponse.javaplugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.javaplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.javaplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.javaserver/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java
🚧 Files skipped from review as they are similar to previous changes (1)
- CHANGELOG.md
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Applied to files:
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.javaplugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
📚 Learning: 2025-12-13T20:16:15.318Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.
Applied to files:
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
📚 Learning: 2025-12-12T13:31:51.234Z
Learnt from: andriyredko
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:642-646
Timestamp: 2025-12-12T13:31:51.234Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), the createBuilderWithPort() helper intentionally uses randomBoolean() for the HTTP/3 enabled setting to ensure all tests validate correct behavior with both HTTP/3 enabled and disabled configurations.
Applied to files:
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
🧬 Code graph analysis (1)
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (1)
libs/core/src/main/java/org/opensearch/core/xcontent/ObjectPath.java (1)
ObjectPath(42-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: Analyze (java)
- GitHub Check: detect-breaking-change
🔇 Additional comments (9)
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (2)
90-133: LGTM! Thorough test for X-Opaque-Id propagation.This test correctly verifies that the X-Opaque-Id header is propagated through the streaming request/response lifecycle, validates per-record responses, and confirms the final document count. The test aligns well with the PR's objective of fixing header propagation for streaming responses.
135-160: LGTM! Correctly validates duplicate header rejection.This test properly validates that duplicate X-Opaque-Id headers result in a 400 error response, which is the expected behavior for single-valued headers. The pattern of using
expectErrorfollowed by status code assertions is consistent with other error handling tests in this file.plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java (1)
84-89: LGTM! Proper guard against duplicate header emission.The
hasSentHeaders()check correctly prevents duplicate status and header emission whenprepareResponseis called multiple times during the streaming response lifecycle. This aligns with the PR's goal of fixing header propagation issues in streaming scenarios.plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (2)
42-60: LGTM! Deferred delivery correctly handles early send() calls.The implementation properly handles the race condition where
send()is called before the emitter is subscribed. By using a thread-safeConcurrentLinkedQueueto buffer content whenisReady()returns false, and then draining this queue on subscription, the code ensures no content is lost when exceptions trigger responses before streaming begins.
63-70: LGTM! Queue draining correctly processes deferred content.The subscription logic properly drains all queued content by polling the thread-safe queue and re-invoking
send()for each buffered item. Since the emitter is now registered (via theregister()callback), the recursivesend()calls will process immediately. The ordering is preserved, and the thread-safe queue ensures no concurrent modification issues.server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java (2)
98-117: LGTM! Header enrichment correctly propagates X-Opaque-Id and custom headers.The implementation properly enriches response headers by:
- Preserving existing headers from the parameter
- Adding the X-Opaque-Id from the request (overwriting any existing value to ensure request takes precedence)
- Merging custom response headers from threadContext using
computeIfAbsent+add()to preserve multi-valued headersThis aligns with the PR's objective to fix header propagation for streaming responses. The logic correctly handles both single-valued (X-Opaque-Id) and multi-valued headers, with downstream validation catching any duplicate single-valued headers.
119-123: LGTM! Override correctly prepares response headers before delegating.The
sendResponseoverride ensures that response status and headers from theRestResponseare prepared on the streaming channel before invoking the parent's implementation. This is essential for streaming scenarios where headers need to be set up early in the response lifecycle. The duplicateprepareResponsecalls are safe due to thehasSentHeaders()guard inReactorNetty4StreamingHttpChannel.client/rest/src/main/java/org/opensearch/client/StreamingRequest.java (1)
80-102: LGTM! Clean API design for setting request options.Both
setOptionsoverloads follow standard Java patterns with proper null validation. TheBuilderoverload is a nice convenience that avoids forcing callers to invoke.build()explicitly. The implementation correctly assigns the options field, enabling header propagation (such as X-Opaque-Id) for streaming requests as demonstrated in the integration tests.client/rest/src/main/java/org/opensearch/client/StreamingResponse.java (1)
11-11: LGTM: Import necessary for new API.The
Headerimport from Apache HttpCore5 is required for the return type of the newgetHeaders()method and is consistent with existing dependencies.
|
❌ Gradle check result for 743bbcb: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❕ Gradle check result for 743bbcb: UNSTABLE Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure. |
743bbcb to
04a89ce
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI Agents
In @server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java:
- Around line 98-117: In DefaultStreamingRestChannel.prepareResponse the code
uses List.of(opaque) for X_OPAQUE_ID which is immutable and will cause
UnsupportedOperationException when threadContext.getResponseHeaders() also
contains X_OPAQUE_ID and later code calls .add() on that list; change the
insertion to put a mutable list (e.g., new ArrayList<>(List.of(opaque)) or
Collections.singletonList wrapped in new ArrayList) into the enriched map so
computeIfAbsent can safely add values, leaving
threadContext.getResponseHeaders() merging logic unchanged.
🧹 Nitpick comments (1)
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (1)
25-31: Unbounded buffering is acceptable for the expected single-item case.The comment indicates only a single deferred response is expected in practice. The unbounded
ConcurrentLinkedQueueis thread-safe and suitable for this use case.Optional: Document memory implications if assumptions change
If the assumption of "single final response" doesn't hold in all scenarios, consider adding a comment about potential memory implications or monitoring for unexpected queue growth.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
CHANGELOG.mdclient/rest/src/main/java/org/opensearch/client/StreamingRequest.javaclient/rest/src/main/java/org/opensearch/client/StreamingResponse.javaplugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.javaplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.javaplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.javaserver/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java
🚧 Files skipped from review as they are similar to previous changes (2)
- CHANGELOG.md
- client/rest/src/main/java/org/opensearch/client/StreamingRequest.java
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Applied to files:
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.javaplugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
📚 Learning: 2025-12-13T20:16:15.318Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.
Applied to files:
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
📚 Learning: 2025-12-12T13:31:51.234Z
Learnt from: andriyredko
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:642-646
Timestamp: 2025-12-12T13:31:51.234Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), the createBuilderWithPort() helper intentionally uses randomBoolean() for the HTTP/3 enabled setting to ensure all tests validate correct behavior with both HTTP/3 enabled and disabled configurations.
Applied to files:
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
🧬 Code graph analysis (1)
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (1)
libs/core/src/main/java/org/opensearch/core/xcontent/ObjectPath.java (1)
ObjectPath(42-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: detect-breaking-change
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: Analyze (java)
🔇 Additional comments (9)
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java (1)
85-88: LGTM!The guard to check
hasSentHeaders()before setting status and headers correctly prevents duplicate header emission, addressing the IllegalArgumentException mentioned in the PR description.plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (2)
13-14: LGTM!Imports correctly support the thread-safe buffering mechanism.
43-69: LGTM!The deferral and replay logic correctly handles content sent before the emitter is ready. The queue draining in
subscribe()ensures buffered content is delivered after the stream is established.server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java (2)
44-44: LGTM!The
threadContextfield is properly declared and initialized to support header enrichment.Also applies to: 59-59
119-123: LGTM!The
sendResponseoverride correctly prepares response headers before delegating to the superclass implementation.client/rest/src/main/java/org/opensearch/client/StreamingResponse.java (2)
11-11: LGTM!The
getHeaders()method follows the established pattern for accessing response metadata, with proper error handling.Also applies to: 101-106
115-121: LGTM!The
getHeader(String name)method correctly retrieves the first matching header value with proper null handling.plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (2)
90-133: LGTM!The test correctly validates X-Opaque-Id header propagation through streaming requests, verifying both per-record responses and the final response header.
135-160: LGTM!The test correctly validates that duplicate X-Opaque-Id headers are rejected with a 400 status code, ensuring proper header validation.
server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java
Show resolved
Hide resolved
04a89ce to
66617c3
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (1)
25-27: Consider a bounded queue to prevent memory exhaustion.The comment notes that the queue is unbounded and "realistically we should only see a single final (last content) response being deferred." However, if an unexpected bug causes multiple items to be enqueued before subscription, an unbounded queue could lead to memory exhaustion.
Consider using a bounded
ConcurrentLinkedQueuealternative (such asArrayBlockingQueuewith a reasonable capacity) or adding a size check with a fallback error path.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
CHANGELOG.mdclient/rest/src/main/java/org/opensearch/client/StreamingRequest.javaclient/rest/src/main/java/org/opensearch/client/StreamingResponse.javaplugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.javaplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.javaplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.javaserver/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java
🚧 Files skipped from review as they are similar to previous changes (3)
- CHANGELOG.md
- client/rest/src/main/java/org/opensearch/client/StreamingRequest.java
- plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Applied to files:
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.javaplugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: gradle-check
🔇 Additional comments (5)
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java (1)
85-88: LGTM! Header emission guard prevents duplicate headers.The
hasSentHeaders()check correctly ensures that status and headers are emitted only once, preventing the duplicate header issue mentioned in the PR description.plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (1)
13-14: Good: ConcurrentLinkedQueue addresses the previous thread-safety concern.The switch from
ArrayDequetoConcurrentLinkedQueuecorrectly handles concurrent access from multiple threads, resolving the data corruption risk flagged in the earlier review.Also applies to: 25-31
server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java (2)
98-117: Header enrichment logic is correct and avoids the previous immutable-list exception.The reordered logic now:
- Merges custom headers from
threadContext(lines 102-109), usingcomputeIfAbsentwith mutableArrayList- Sets
X_OPAQUE_IDfrom the request (lines 111-114) viaput, which replaces any threadContext valueThis ordering prevents the
UnsupportedOperationExceptionthat would occur if an immutableList.of()were inserted first and thencomputeIfAbsenttried to append to it. The request'sX_OPAQUE_IDcorrectly takes precedence for response correlation.
119-123: LGTM! sendResponse enriches headers before delegating.The override ensures
prepareResponseis invoked to merge thread-context and request headers before the superclasssendResponselogic runs.client/rest/src/main/java/org/opensearch/client/StreamingResponse.java (1)
98-121: LGTM! Header accessor methods are well-structured and consistent.Both
getHeaders()andgetHeader(String name)follow the same error-handling and blocking patterns as the existinggetStatusLine()andgetWarnings()methods. The null-safety ingetHeader(usingmapNotNull) correctly handles missing headers.
...src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java
Show resolved
Hide resolved
|
❌ Gradle check result for 66617c3: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for 66617c3: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for 66617c3: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
66617c3 to
95b4679
Compare
|
❌ Gradle check result for 95b4679: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: Andriy Redko <drreta@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (1)
139-166: Consider verifying the specific error message.The test correctly validates that duplicate X-Opaque-Id headers result in a 400 error (preventing the NPE mentioned in the PR description). However, you could strengthen the assertion by verifying the error message mentions the duplicate header issue, similar to how other tests check specific error content.
♻️ Optional: Match the error checking pattern from testStreamingBadRequest
For consistency with
testStreamingBadRequest()(line 352-355), consider using.expectErrorMatches():StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) - .expectError(ResponseException.class) + .expectErrorMatches( + ex -> ex instanceof ResponseException && ((ResponseException) ex).getResponse().getStatusLine().getStatusCode() == 400 + ) .verify();Alternatively, to verify the specific error message about duplicate headers:
StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) - .expectError(ResponseException.class) + .expectErrorMatches( + ex -> ex instanceof ResponseException + && ((ResponseException) ex).getResponse().getStatusLine().getStatusCode() == 400 + && ex.getMessage().contains("X-Opaque-Id") + ) .verify();
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Applied to files:
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
📚 Learning: 2025-12-12T13:31:51.234Z
Learnt from: andriyredko
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:642-646
Timestamp: 2025-12-12T13:31:51.234Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), the createBuilderWithPort() helper intentionally uses randomBoolean() for the HTTP/3 enabled setting to ensure all tests validate correct behavior with both HTTP/3 enabled and disabled configurations.
Applied to files:
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
🧬 Code graph analysis (1)
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (1)
libs/core/src/main/java/org/opensearch/core/xcontent/ObjectPath.java (1)
ObjectPath(42-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: Analyze (java)
- GitHub Check: detect-breaking-change
🔇 Additional comments (3)
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (3)
12-12: LGTM!The
RequestOptionsimport is necessary for setting custom headers in the new X-Opaque-Id tests.
52-55: LGTM!The conversion to text blocks significantly improves readability. The use of
String.formatwithLocale.ENGLISH(orLocale.getDefault()where appropriate) ensures consistent formatting across platforms.Also applies to: 169-172, 212-215, 257-260, 302-305, 338-341, 363-368, 393-396, 419-422
92-137: LGTM!This test comprehensively validates X-Opaque-Id header propagation for streaming responses. The test correctly:
- Sets a single X-Opaque-Id header via
RequestOptions- Verifies streaming response content
- Asserts the X-Opaque-Id header is returned in the response (line 128)
- Confirms successful indexing of all documents
The test follows the established pattern and uses
VirtualTimeSchedulerfor deterministic timing control.
|
❌ Gradle check result for a25343a: null Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for a25343a: null Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for a25343a: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
Thanks @reta this will resolve NPE and i test the fix my locally that will not fix for my case. I just attached a debug screenshot of what's the situation for my case, i believe that's thread context pollution and not been correctly restore.
|
Thanks @Hailong-am , moving to this one (thread context), thanks for checking out. Do you happen to have a reproducer for it? So far I cannot reproduce it will ml-commons (built from |
You might need to use branch |
…) for streaming Reactor Netty 4 transport (opensearch-project#20371) * Fix X-Opaque-Id header propagation (along with other response headers) for streaming Reactor Netty 4 transport Signed-off-by: Andriy Redko <drreta@gmail.com> * Address code review comments Signed-off-by: Andriy Redko <drreta@gmail.com> --------- Signed-off-by: Andriy Redko <drreta@gmail.com>
…) for streaming Reactor Netty 4 transport (opensearch-project#20371) * Fix X-Opaque-Id header propagation (along with other response headers) for streaming Reactor Netty 4 transport Signed-off-by: Andriy Redko <drreta@gmail.com> * Address code review comments Signed-off-by: Andriy Redko <drreta@gmail.com> --------- Signed-off-by: Andriy Redko <drreta@gmail.com>

Description
Fix X-Opaque-Id header propagation (along with other response headers) for streaming Reactor Netty 4 transport. As report in the opensearch-project/OpenSearch-Dashboards#11090, the following exception might be raised:
The issue it two fold:
Related Issues
Closes opensearch-project/OpenSearch-Dashboards#11090
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.
Summary by CodeRabbit
Bug Fixes
New Features
Tests
✏️ Tip: You can customize this high-level summary in your review settings.