Skip to content

Fix X-Opaque-Id header propagation (along with other response headers) for streaming Reactor Netty 4 transport#20371

Merged
reta merged 2 commits intoopensearch-project:mainfrom
reta:fix.opaque.propagation
Jan 8, 2026
Merged

Fix X-Opaque-Id header propagation (along with other response headers) for streaming Reactor Netty 4 transport#20371
reta merged 2 commits intoopensearch-project:mainfrom
reta:fix.opaque.propagation

Conversation

@reta
Copy link
Copy Markdown
Contributor

@reta reta commented Jan 5, 2026

Description

Fix X-Opaque-Id header propagation (along with other response headers) for streaming Reactor Netty 4 transport. As report in the opensearch-project/OpenSearch-Dashboards#11090, the following exception might be raised:

2026-01-04T13:42:15,764][ERROR][o.o.r.RestController     ] [bcd07463160c][opensearch[bcd07463160c][transport_worker][T#2]] failed to send failure response for uri [/_plugins/_ml/agents/hnOyJ5sBIIp0ezv9VhY-/_execute/stream]
java.lang.NullPointerException: Cannot invoke "reactor.core.publisher.FluxSink.error(java.lang.Throwable)" because "this.emitter" is null
	at org.opensearch.http.reactor.netty4.ReactorNetty4StreamingResponseProducer.send(ReactorNetty4StreamingResponseProducer.java:40) ~[?:?]
	at org.opensearch.http.reactor.netty4.ReactorNetty4StreamingHttpChannel.sendResponse(ReactorNetty4StreamingHttpChannel.java:80) ~[?:?]
	at org.opensearch.http.DefaultRestChannel.sendResponse(DefaultRestChannel.java:174) ~[opensearch-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
	at org.opensearch.rest.RestController.dispatchRequest(RestController.java:290) [opensearch-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
	at org.opensearch.http.AbstractHttpServerTransport.dispatchRequest(AbstractHttpServerTransport.java:384) [opensearch-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
	at org.opensearch.http.AbstractHttpServerTransport.handleIncomingRequest(AbstractHttpServerTransport.java:498) [opensearch-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
	at org.opensearch.http.AbstractHttpServerTransport.incomingStream(AbstractHttpServerTransport.java:344) [opensearch-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
	at org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport.incomingRequest(ReactorNetty4HttpServerTransport.java:394) [transport-reactor-netty4-client-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
	at org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport.lambda$bind$4(ReactorNetty4HttpServerTransport.java:258) [transport-reactor-netty4-client-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
	at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:1336) [reactor-netty-http-1.3.1.jar:1.3.1]
	at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:489) [reactor-netty-core-1.3.1.jar:1.3.1]
	at reactor.netty.http.server.HttpServerOperations.handleDefaultHttpRequest(HttpServerOperations.java:863) [reactor-netty-http-1.3.1.jar:1.3.1]
	at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:789) [reactor-netty-http-1.3.1.jar:1.3.1]
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115) [reactor-netty-core-1.3.1.jar:1.3.1]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:356) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:283) [reactor-netty-http-1.3.1.jar:1.3.1]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:354) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:107) [netty-codec-base-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:120) [netty-codec-base-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:354) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:293) [netty-handler-4.2.9.Final.jar:4.2.9.Final]
	at reactor.netty.http.IdleTimeoutHandler.channelRead(IdleTimeoutHandler.java:65) [reactor-netty-http-1.3.1.jar:1.3.1]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:354) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.handler.codec.http.HttpServerUpgradeHandler.decode(HttpServerUpgradeHandler.java:290) [netty-codec-http-4.2.9.Final.jar:4.2.9.Final]
	at reactor.netty.http.server.HttpServerConfig$ReactorNettyHttpServerUpgradeHandler.decode(HttpServerConfig.java:1759) [reactor-netty-http-1.3.1.jar:1.3.1]
	at reactor.netty.http.server.HttpServerConfig$ReactorNettyHttpServerUpgradeHandler.decode(HttpServerConfig.java:1716) [reactor-netty-http-1.3.1.jar:1.3.1]
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:91) [netty-codec-base-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:356) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:434) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:361) [netty-codec-base-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:348) [netty-codec-base-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:470) [netty-codec-base-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [netty-codec-base-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:249) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:354) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1429) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:172) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.handle(AbstractNioChannel.java:445) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.nio.NioIoHandler$DefaultNioRegistration.handle(NioIoHandler.java:388) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.nio.NioIoHandler.processSelectedKey(NioIoHandler.java:596) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.nio.NioIoHandler.processSelectedKeysPlain(NioIoHandler.java:541) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.nio.NioIoHandler.processSelectedKeys(NioIoHandler.java:514) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.nio.NioIoHandler.run(NioIoHandler.java:484) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.SingleThreadIoEventLoop.runIo(SingleThreadIoEventLoop.java:225) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.SingleThreadIoEventLoop.run(SingleThreadIoEventLoop.java:196) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:1195) [netty-common-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.2.9.Final.jar:4.2.9.Final]
	at java.base/java.lang.Thread.run(Thread.java:1474) [?:?]
	Suppressed: java.lang.IllegalArgumentException: value for key [X-Opaque-Id] already present
		at org.opensearch.common.util.concurrent.ThreadContext$ThreadContextStruct.putSingleHeader(ThreadContext.java:738) ~[opensearch-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
		at org.opensearch.common.util.concurrent.ThreadContext$ThreadContextStruct.putRequest(ThreadContext.java:732) ~[opensearch-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
		at org.opensearch.common.util.concurrent.ThreadContext.putHeader(ThreadContext.java:475) ~[opensearch-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
		at org.opensearch.rest.RestController.tryAllHandlers(RestController.java:433) ~[opensearch-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
		at org.opensearch.rest.RestController.dispatchRequest(RestController.java:287) [opensearch-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
		at org.opensearch.http.AbstractHttpServerTransport.dispatchRequest(AbstractHttpServerTransport.java:384) [opensearch-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
		at org.opensearch.http.AbstractHttpServerTransport.handleIncomingRequest(AbstractHttpServerTransport.java:498) [opensearch-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
		at org.opensearch.http.AbstractHttpServerTransport.incomingStream(AbstractHttpServerTransport.java:344) [opensearch-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
		at org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport.incomingRequest(ReactorNetty4HttpServerTransport.java:394) [transport-reactor-netty4-client-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
		at org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport.lambda$bind$4(ReactorNetty4HttpServerTransport.java:258) [transport-reactor-netty4-client-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
		at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:1336) [reactor-netty-http-1.3.1.jar:1.3.1]
		at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:489) [reactor-netty-core-1.3.1.jar:1.3.1]
		at reactor.netty.http.server.HttpServerOperations.handleDefaultHttpRequest(HttpServerOperations.java:863) [reactor-netty-http-1.3.1.jar:1.3.1]
		at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:789) [reactor-netty-http-1.3.1.jar:1.3.1]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115) [reactor-netty-core-1.3.1.jar:1.3.1]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:356) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:283) [reactor-netty-http-1.3.1.jar:1.3.1]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:354) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:107) [netty-codec-base-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:120) [netty-codec-base-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:354) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:293) [netty-handler-4.2.9.Final.jar:4.2.9.Final]
		at reactor.netty.http.IdleTimeoutHandler.channelRead(IdleTimeoutHandler.java:65) [reactor-netty-http-1.3.1.jar:1.3.1]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:354) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.handler.codec.http.HttpServerUpgradeHandler.decode(HttpServerUpgradeHandler.java:290) [netty-codec-http-4.2.9.Final.jar:4.2.9.Final]
		at reactor.netty.http.server.HttpServerConfig$ReactorNettyHttpServerUpgradeHandler.decode(HttpServerConfig.java:1759) [reactor-netty-http-1.3.1.jar:1.3.1]
		at reactor.netty.http.server.HttpServerConfig$ReactorNettyHttpServerUpgradeHandler.decode(HttpServerConfig.java:1716) [reactor-netty-http-1.3.1.jar:1.3.1]
		at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:91) [netty-codec-base-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:356) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:434) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:361) [netty-codec-base-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:348) [netty-codec-base-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:470) [netty-codec-base-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [netty-codec-base-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:249) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:354) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1429) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:172) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.handle(AbstractNioChannel.java:445) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.nio.NioIoHandler$DefaultNioRegistration.handle(NioIoHandler.java:388) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.nio.NioIoHandler.processSelectedKey(NioIoHandler.java:596) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.nio.NioIoHandler.processSelectedKeysPlain(NioIoHandler.java:541) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.nio.NioIoHandler.processSelectedKeys(NioIoHandler.java:514) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.nio.NioIoHandler.run(NioIoHandler.java:484) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.SingleThreadIoEventLoop.runIo(SingleThreadIoEventLoop.java:225) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.channel.SingleThreadIoEventLoop.run(SingleThreadIoEventLoop.java:196) [netty-transport-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:1195) [netty-common-4.2.9.Final.jar:4.2.9.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.2.9.Final.jar:4.2.9.Final]
		at java.base/java.lang.Thread.run(Thread.java:1474) [?:?]

The issue it two fold:

  • the exception happens before the streaming emitter is subscribed to response stream
  • caused by apparent double thread header registration

Related Issues

Closes opensearch-project/OpenSearch-Dashboards#11090

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Summary by CodeRabbit

  • Bug Fixes

    • Propagated X-Opaque-Id and other response headers for streaming; prevented duplicate header emission and ensured headers from context are included.
    • Improved streaming delivery by buffering deferred content to avoid lost records when the emitter isn't ready.
  • New Features

    • Added overloads to set request options on streaming requests.
    • Added APIs to read headers from streaming responses.
  • Tests

    • Added tests covering X-Opaque-Id handling in streaming bulk requests.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Jan 5, 2026

📝 Walkthrough

Walkthrough

Propagates response headers (including X-Opaque-Id) through Reactor Netty 4 streaming, exposes headers on StreamingResponse, allows RequestOptions on StreamingRequest, defers streaming HttpContent until the emitter is ready via a bounded in-memory queue, prevents duplicate header emission, and adds integration tests for X-Opaque-Id.

Changes

Cohort / File(s) Summary
Changelog
CHANGELOG.md
Documents fix for propagating X-Opaque-Id and other response headers in Reactor Netty 4 streaming transport.
Client REST — Request/Response
client/rest/src/main/java/org/opensearch/client/StreamingRequest.java, client/rest/src/main/java/org/opensearch/client/StreamingResponse.java
StreamingRequest: adds setOptions(RequestOptions) and setOptions(RequestOptions.Builder). StreamingResponse: adds getHeaders() and getHeader(String) to expose HTTP response headers.
Reactor Netty 4 — Channel
plugins/transport-reactor-netty4/src/main/java/.../ReactorNetty4StreamingHttpChannel.java
Emits status/headers only when headers haven't been sent yet (guard against duplicate header emission).
Reactor Netty 4 — Response Producer
plugins/transport-reactor-netty4/src/main/java/.../ReactorNetty4StreamingResponseProducer.java
Adds bounded ConcurrentLinkedQueue buffering (BUFFERED_QUEUE_SIZE = 64), DelayedHttpContent holder, defers send() when emitter not ready (fails on overflow), and drains/replays queued items on subscribe.
Server — Streaming Rest Channel
server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java
Adds threadContext usage; enriches response headers with threadContext.getResponseHeaders() and preserves request X-Opaque-Id; adds sendResponse(RestResponse) override to delegate through prepared response.
Integration Tests
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
Adds testStreamingRequestOpaqueId and testStreamingRequestOpaqueIdTwice to validate X-Opaque-Id propagation and duplicate-header handling; updates bulk payload construction style.

Sequence Diagram

sequenceDiagram
    participant Client
    participant StreamingRequest
    participant DefaultStreamingRestChannel
    participant ReactorNetty4StreamingHttpChannel
    participant ReactorNetty4StreamingResponseProducer
    participant StreamingResponse

    Client->>StreamingRequest: attach RequestOptions (X-Opaque-Id)
    StreamingRequest->>DefaultStreamingRestChannel: send request

    Note over DefaultStreamingRestChannel: prepareResponse enriches headers\n(includes thread-context headers + preserves X-Opaque-Id)
    DefaultStreamingRestChannel->>ReactorNetty4StreamingHttpChannel: prepareResponse(with enriched headers)

    alt Headers not yet sent
        ReactorNetty4StreamingHttpChannel->>ReactorNetty4StreamingResponseProducer: initialize emitter
    end

    par Content delivery
        ReactorNetty4StreamingResponseProducer->>ReactorNetty4StreamingResponseProducer: if emitter not ready -> enqueue DelayedHttpContent (bounded)
        ReactorNetty4StreamingResponseProducer->>Client: if emitter ready -> emit HttpContent
    end

    ReactorNetty4StreamingResponseProducer->>ReactorNetty4StreamingResponseProducer: on subscribe -> drain queue and emit deferred items

    Client->>StreamingResponse: getHeaders() / getHeader(name)
    StreamingResponse->>Client: return headers including X-Opaque-Id
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Suggested reviewers

  • andrross
  • peternied

Poem

🐰 I hopped through headers, kept X-Opaque-Id in sight,
I queued little parcels until the emitter was right.
I drained them with care so no header was lost,
Streams now skip repeats and obey the buffer's cost.
Hooray — headers and bytes delivered, at no extra fright.

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 21.43% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive The description provides detailed context about the bug and root causes, but does not include all template sections: missing explicit checklist confirmations (checkboxes are unchecked) and incomplete Related Issues format. Check the testing and documentation checklist items, or explicitly confirm their status. Ensure all required template sections are addressed.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main fix: propagation of X-Opaque-Id header and other response headers for streaming Reactor Netty 4 transport.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

@reta reta force-pushed the fix.opaque.propagation branch from 7b2f644 to 774f24b Compare January 5, 2026 20:46
@reta reta changed the title Fix X-Opaque-Id header propagation (along with other response headers) Fix X-Opaque-Id header propagation (along with other response headers) for streaming Reactor Netty 4 transport Jan 5, 2026
@github-actions github-actions bot added >test-failure Test failure from CI, local build, etc. autocut bug Something isn't working labels Jan 5, 2026
@reta reta force-pushed the fix.opaque.propagation branch from 774f24b to ec1ae0f Compare January 5, 2026 20:57
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Jan 5, 2026

❕ Gradle check result for ec1ae0f: UNSTABLE

Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure.

@codecov
Copy link
Copy Markdown

codecov bot commented Jan 5, 2026

Codecov Report

❌ Patch coverage is 6.52174% with 43 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.17%. Comparing base (d866be8) to head (a25343a).
⚠️ Report is 11 commits behind head on main.

Files with missing lines Patch % Lines
...g/opensearch/http/DefaultStreamingRestChannel.java 6.25% 15 Missing ⚠️
...netty4/ReactorNetty4StreamingResponseProducer.java 16.66% 8 Missing and 2 partials ⚠️
.../java/org/opensearch/client/StreamingResponse.java 0.00% 9 Missing ⚠️
...n/java/org/opensearch/client/StreamingRequest.java 0.00% 6 Missing ⚠️
...ctor/netty4/ReactorNetty4StreamingHttpChannel.java 0.00% 3 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #20371      +/-   ##
============================================
- Coverage     73.30%   73.17%   -0.13%     
+ Complexity    71777    71698      -79     
============================================
  Files          5784     5784              
  Lines        328141   328196      +55     
  Branches      47269    47276       +7     
============================================
- Hits         240531   240167     -364     
- Misses        68329    68801     +472     
+ Partials      19281    19228      -53     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@reta reta force-pushed the fix.opaque.propagation branch 2 times, most recently from 74204d4 to 9da2234 Compare January 6, 2026 02:35
@reta reta marked this pull request as ready for review January 6, 2026 02:39
@reta reta requested a review from a team as a code owner January 6, 2026 02:39
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Fix all issues with AI Agents 🤖
In @CHANGELOG.md:
- Line 30: The changelog entry references the wrong GitHub repository path in
the PR URL; update the URL in the CHANGELOG.md entry that reads "Fix X-Opaque-Id
header propagation ...
([#20371](https://github.com/opensearch-project/security/pull/20371))" so the
link points to the OpenSearch repo by replacing "opensearch-project/security"
with "opensearch-project/OpenSearch" (preserve the PR number and surrounding
text).

In
@plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java:
- Around line 154-156: The StepVerifier chain created with
StepVerifier.create(Flux.from(streamingResponse.getBody()).map(...)).expectNextMatches(...).expectComplete()
never invokes .verify(), so the reactive assertions are not executed; update the
chain used in ReactorNetty4StreamingIT (the StepVerifier.create(...) call that
ends with .expectComplete()) to append a terminal .verify() call to actually run
the verification.

In
@plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java:
- Around line 27-31: The buffering queue uses a non-thread-safe ArrayDeque
(queue) and can be concurrently accessed from send() and subscribe(); replace
the ArrayDeque with a thread-safe implementation (e.g., change the declaration
of queue in ReactorNetty4StreamingResponseProducer to use
ConcurrentLinkedQueue<DelayedHttpContent>) and add the corresponding import so
send() and subscribe() can safely offer/poll without additional synchronization;
ensure all uses of queue (including any iterating) are safe for
ConcurrentLinkedQueue semantics.

In @server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java:
- Around line 98-117: prepareResponse currently overwrites multi-valued headers
when iterating threadContext.getResponseHeaders() because it repeatedly does
enriched.put(headerEntry.getKey(), List.of(headerValue)); instead, merge values
into the existing list in enriched: for each headerEntry from
threadContext.getResponseHeaders() use
enriched.computeIfAbsent(headerEntry.getKey(), k -> new ArrayList<>()) and
addAll(headerEntry.getValue()) (or append each headerValue) so existing entries
(including X_OPAQUE_ID and original headers) are preserved; then call
streamingHttpChannel.prepareResponse(status.getStatus(), enriched) as before.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (1)

42-60: Ensure HttpContent resources are released on error paths.

When content is deferred in the queue, the HttpContent objects hold references to native buffers. If an error occurs before the queue is drained (e.g., connection closed, subscription cancelled), these buffers may leak.

Recommended approach

Add error handling and cleanup logic:

  1. Implement a cleanup method to drain and release queued content on error or cancellation
  2. Add a doOnCancel or doOnError handler in the Flux.create that releases queued content
  3. Ensure ActionListener.onFailure is called for queued items on cleanup

Example pattern:

ReactorNetty4StreamingResponseProducer() {
    this.sender = Flux.create(emitter -> {
        register(emitter);
        emitter.onDispose(() -> {
            // Release any queued content
            DelayedHttpContent content;
            while ((content = queue.poll()) != null) {
                content.content().release();
                content.listener().onFailure(new Exception("Cancelled"));
            }
        });
    });
}
🧹 Nitpick comments (1)
client/rest/src/main/java/org/opensearch/client/StreamingRequest.java (1)

80-102: LGTM! Well-designed API addition.

The two setOptions overloads provide a clean and convenient way to configure request options, with proper null validation and clear documentation. The builder overload is a nice ergonomic touch.

One consideration: the options field is mutable and not thread-safe. If StreamingRequest instances are shared across threads or if options are modified after the request is submitted to the reactive pipeline, visibility issues could arise. However, this is likely acceptable if the intended pattern is to fully configure the request before use.

Optional: Consider documenting thread-safety expectations

If concurrent access isn't intended, consider adding a note to the class or method javadoc stating that StreamingRequest should be configured before being passed to the client and should not be modified concurrently.

Alternatively, if thread safety is required, consider making options volatile or using synchronization:

-    private RequestOptions options = RequestOptions.DEFAULT;
+    private volatile RequestOptions options = RequestOptions.DEFAULT;
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 66d2482 and 9da2234.

📒 Files selected for processing (7)
  • CHANGELOG.md
  • client/rest/src/main/java/org/opensearch/client/StreamingRequest.java
  • client/rest/src/main/java/org/opensearch/client/StreamingResponse.java
  • plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java
  • server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.

Applied to files:

  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
  • plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java
  • CHANGELOG.md
📚 Learning: 2025-12-13T20:16:15.318Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.

Applied to files:

  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
📚 Learning: 2025-12-12T13:31:51.234Z
Learnt from: andriyredko
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:642-646
Timestamp: 2025-12-12T13:31:51.234Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), the createBuilderWithPort() helper intentionally uses randomBoolean() for the HTTP/3 enabled setting to ensure all tests validate correct behavior with both HTTP/3 enabled and disabled configurations.

Applied to files:

  • plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
🧬 Code graph analysis (1)
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (1)
libs/core/src/main/java/org/opensearch/core/xcontent/ObjectPath.java (1)
  • ObjectPath (42-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (19)
  • GitHub Check: gradle-check
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: Analyze (java)
🔇 Additional comments (8)
client/rest/src/main/java/org/opensearch/client/StreamingResponse.java (2)

98-106: LGTM! Header extraction follows established pattern.

The implementation correctly mirrors the existing getStatusLine() and getWarnings() methods with consistent error handling.


108-121: LGTM! Single header extraction correctly implemented.

The method appropriately handles null cases through mapNotNull and follows the same blocking pattern as other getters in this class.

server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java (2)

44-44: LGTM! ThreadContext properly captured for header enrichment.

The field is correctly initialized and enables access to custom response headers for streaming responses.

Also applies to: 59-59


119-123: LGTM! sendResponse override ensures headers are prepared.

The override correctly prepares headers via the streaming channel before delegating to the superclass, ensuring proper header propagation for streaming responses.

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (1)

63-70: Queue drainage logic looks correct but relies on thread-safety fix.

The deferred content is properly replayed after subscription. However, this assumes the queue access is thread-safe (see previous comment).

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java (1)

84-89: The hasSentHeaders guard prevents the IllegalArgumentException, but verify that prepareResponse is always invoked from the same EventLoop thread.

The guard effectively prevents duplicate header emission by checking hasSentHeaders() before setting status and headers. However, this check alone is not atomic with the subsequent operations. The thread-safety guarantee depends on Reactor Netty's HttpServerResponse being accessed exclusively from a single EventLoop thread per connection. If prepareResponse could be invoked from different threads (e.g., different EventLoops), a race condition is theoretically possible between the check and header operations. The current pattern in RestBulkStreamingAction and RestController suggests prepareResponse is called early before streaming, mitigating this risk, but consider documenting this expectation or adding explicit synchronization if cross-thread calls become possible in the future.

plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (2)

12-12: LGTM: Import necessary for header configuration.

The RequestOptions import is required for the new tests that configure X-Opaque-Id headers.


90-133: LGTM: Comprehensive test for X-Opaque-Id header propagation.

The test correctly validates that the X-Opaque-Id header is propagated from the request to the response in streaming scenarios. The test structure is consistent with existing tests and includes appropriate assertions.

@reta reta force-pushed the fix.opaque.propagation branch from 9da2234 to 743bbcb Compare January 6, 2026 03:09
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (1)

29-31: Optional: Remove unnecessary semicolon after record declaration.

The semicolon after the record closing brace is superfluous. While syntactically valid, it's not idiomatic Java.

🔎 Proposed fix
 // Holds the {@code HttpContent} for deferred delivery
-private record DelayedHttpContent(HttpContent content, ActionListener<Void> listener, boolean isLast) {
-};
+private record DelayedHttpContent(HttpContent content, ActionListener<Void> listener, boolean isLast) {
+}
client/rest/src/main/java/org/opensearch/client/StreamingResponse.java (2)

98-106: LGTM: Consistent implementation following established patterns.

The getHeaders() method correctly follows the same reactive pattern and error handling approach as getStatusLine() and getWarnings(). The implementation properly extracts headers from the cached publisher and handles exceptions gracefully.

Optional: Enhance Javadoc for API clarity

Consider adding a @return tag to document the return value:

 /**
  * Returns a list of all headers returned in the response.
+ * @return array of headers, or empty array if no headers present
  */
 public Header[] getHeaders() {

108-121: LGTM: Excellent null safety with double mapNotNull.

The getHeader(String name) method is well-implemented with proper null handling. The double mapNotNull() chain correctly handles both the case where no matching header exists and where a header exists but has a null value. The error handling is consistent with other methods in the class.

Optional: Complete Javadoc with standard tags

Consider adding @param and @return tags for API documentation completeness:

 /**
  * Returns the value of the first header with a specified name of this message.
  * If there is more than one matching header in the message the first element is returned.
  * If there is no matching header in the message <code>null</code> is returned.
  *
  * @param name header name
+ * @return the value of the first matching header, or null if no matching header exists
  */
 public String getHeader(String name) {
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9da2234 and 743bbcb.

📒 Files selected for processing (7)
  • CHANGELOG.md
  • client/rest/src/main/java/org/opensearch/client/StreamingRequest.java
  • client/rest/src/main/java/org/opensearch/client/StreamingResponse.java
  • plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java
  • server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java
🚧 Files skipped from review as they are similar to previous changes (1)
  • CHANGELOG.md
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.

Applied to files:

  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
  • plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
📚 Learning: 2025-12-13T20:16:15.318Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.

Applied to files:

  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
📚 Learning: 2025-12-12T13:31:51.234Z
Learnt from: andriyredko
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:642-646
Timestamp: 2025-12-12T13:31:51.234Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), the createBuilderWithPort() helper intentionally uses randomBoolean() for the HTTP/3 enabled setting to ensure all tests validate correct behavior with both HTTP/3 enabled and disabled configurations.

Applied to files:

  • plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
🧬 Code graph analysis (1)
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (1)
libs/core/src/main/java/org/opensearch/core/xcontent/ObjectPath.java (1)
  • ObjectPath (42-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: gradle-check
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: Analyze (java)
  • GitHub Check: detect-breaking-change
🔇 Additional comments (9)
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (2)

90-133: LGTM! Thorough test for X-Opaque-Id propagation.

This test correctly verifies that the X-Opaque-Id header is propagated through the streaming request/response lifecycle, validates per-record responses, and confirms the final document count. The test aligns well with the PR's objective of fixing header propagation for streaming responses.


135-160: LGTM! Correctly validates duplicate header rejection.

This test properly validates that duplicate X-Opaque-Id headers result in a 400 error response, which is the expected behavior for single-valued headers. The pattern of using expectError followed by status code assertions is consistent with other error handling tests in this file.

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java (1)

84-89: LGTM! Proper guard against duplicate header emission.

The hasSentHeaders() check correctly prevents duplicate status and header emission when prepareResponse is called multiple times during the streaming response lifecycle. This aligns with the PR's goal of fixing header propagation issues in streaming scenarios.

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (2)

42-60: LGTM! Deferred delivery correctly handles early send() calls.

The implementation properly handles the race condition where send() is called before the emitter is subscribed. By using a thread-safe ConcurrentLinkedQueue to buffer content when isReady() returns false, and then draining this queue on subscription, the code ensures no content is lost when exceptions trigger responses before streaming begins.


63-70: LGTM! Queue draining correctly processes deferred content.

The subscription logic properly drains all queued content by polling the thread-safe queue and re-invoking send() for each buffered item. Since the emitter is now registered (via the register() callback), the recursive send() calls will process immediately. The ordering is preserved, and the thread-safe queue ensures no concurrent modification issues.

server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java (2)

98-117: LGTM! Header enrichment correctly propagates X-Opaque-Id and custom headers.

The implementation properly enriches response headers by:

  1. Preserving existing headers from the parameter
  2. Adding the X-Opaque-Id from the request (overwriting any existing value to ensure request takes precedence)
  3. Merging custom response headers from threadContext using computeIfAbsent + add() to preserve multi-valued headers

This aligns with the PR's objective to fix header propagation for streaming responses. The logic correctly handles both single-valued (X-Opaque-Id) and multi-valued headers, with downstream validation catching any duplicate single-valued headers.


119-123: LGTM! Override correctly prepares response headers before delegating.

The sendResponse override ensures that response status and headers from the RestResponse are prepared on the streaming channel before invoking the parent's implementation. This is essential for streaming scenarios where headers need to be set up early in the response lifecycle. The duplicate prepareResponse calls are safe due to the hasSentHeaders() guard in ReactorNetty4StreamingHttpChannel.

client/rest/src/main/java/org/opensearch/client/StreamingRequest.java (1)

80-102: LGTM! Clean API design for setting request options.

Both setOptions overloads follow standard Java patterns with proper null validation. The Builder overload is a nice convenience that avoids forcing callers to invoke .build() explicitly. The implementation correctly assigns the options field, enabling header propagation (such as X-Opaque-Id) for streaming requests as demonstrated in the integration tests.

client/rest/src/main/java/org/opensearch/client/StreamingResponse.java (1)

11-11: LGTM: Import necessary for new API.

The Header import from Apache HttpCore5 is required for the return type of the new getHeaders() method and is consistent with existing dependencies.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Jan 6, 2026

❌ Gradle check result for 743bbcb: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Jan 6, 2026

❕ Gradle check result for 743bbcb: UNSTABLE

Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure.

@reta reta force-pushed the fix.opaque.propagation branch from 743bbcb to 04a89ce Compare January 6, 2026 13:53
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI Agents
In @server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java:
- Around line 98-117: In DefaultStreamingRestChannel.prepareResponse the code
uses List.of(opaque) for X_OPAQUE_ID which is immutable and will cause
UnsupportedOperationException when threadContext.getResponseHeaders() also
contains X_OPAQUE_ID and later code calls .add() on that list; change the
insertion to put a mutable list (e.g., new ArrayList<>(List.of(opaque)) or
Collections.singletonList wrapped in new ArrayList) into the enriched map so
computeIfAbsent can safely add values, leaving
threadContext.getResponseHeaders() merging logic unchanged.
🧹 Nitpick comments (1)
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (1)

25-31: Unbounded buffering is acceptable for the expected single-item case.

The comment indicates only a single deferred response is expected in practice. The unbounded ConcurrentLinkedQueue is thread-safe and suitable for this use case.

Optional: Document memory implications if assumptions change

If the assumption of "single final response" doesn't hold in all scenarios, consider adding a comment about potential memory implications or monitoring for unexpected queue growth.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 743bbcb and 04a89ce.

📒 Files selected for processing (7)
  • CHANGELOG.md
  • client/rest/src/main/java/org/opensearch/client/StreamingRequest.java
  • client/rest/src/main/java/org/opensearch/client/StreamingResponse.java
  • plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java
  • server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java
🚧 Files skipped from review as they are similar to previous changes (2)
  • CHANGELOG.md
  • client/rest/src/main/java/org/opensearch/client/StreamingRequest.java
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.

Applied to files:

  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
  • plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
📚 Learning: 2025-12-13T20:16:15.318Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.

Applied to files:

  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
📚 Learning: 2025-12-12T13:31:51.234Z
Learnt from: andriyredko
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:642-646
Timestamp: 2025-12-12T13:31:51.234Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), the createBuilderWithPort() helper intentionally uses randomBoolean() for the HTTP/3 enabled setting to ensure all tests validate correct behavior with both HTTP/3 enabled and disabled configurations.

Applied to files:

  • plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
🧬 Code graph analysis (1)
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (1)
libs/core/src/main/java/org/opensearch/core/xcontent/ObjectPath.java (1)
  • ObjectPath (42-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: gradle-check
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: detect-breaking-change
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: Analyze (java)
🔇 Additional comments (9)
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java (1)

85-88: LGTM!

The guard to check hasSentHeaders() before setting status and headers correctly prevents duplicate header emission, addressing the IllegalArgumentException mentioned in the PR description.

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (2)

13-14: LGTM!

Imports correctly support the thread-safe buffering mechanism.


43-69: LGTM!

The deferral and replay logic correctly handles content sent before the emitter is ready. The queue draining in subscribe() ensures buffered content is delivered after the stream is established.

server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java (2)

44-44: LGTM!

The threadContext field is properly declared and initialized to support header enrichment.

Also applies to: 59-59


119-123: LGTM!

The sendResponse override correctly prepares response headers before delegating to the superclass implementation.

client/rest/src/main/java/org/opensearch/client/StreamingResponse.java (2)

11-11: LGTM!

The getHeaders() method follows the established pattern for accessing response metadata, with proper error handling.

Also applies to: 101-106


115-121: LGTM!

The getHeader(String name) method correctly retrieves the first matching header value with proper null handling.

plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (2)

90-133: LGTM!

The test correctly validates X-Opaque-Id header propagation through streaming requests, verifying both per-record responses and the final response header.


135-160: LGTM!

The test correctly validates that duplicate X-Opaque-Id headers are rejected with a 400 status code, ensuring proper header validation.

@reta reta force-pushed the fix.opaque.propagation branch from 04a89ce to 66617c3 Compare January 6, 2026 14:03
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (1)

25-27: Consider a bounded queue to prevent memory exhaustion.

The comment notes that the queue is unbounded and "realistically we should only see a single final (last content) response being deferred." However, if an unexpected bug causes multiple items to be enqueued before subscription, an unbounded queue could lead to memory exhaustion.

Consider using a bounded ConcurrentLinkedQueue alternative (such as ArrayBlockingQueue with a reasonable capacity) or adding a size check with a fallback error path.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 04a89ce and 66617c3.

📒 Files selected for processing (7)
  • CHANGELOG.md
  • client/rest/src/main/java/org/opensearch/client/StreamingRequest.java
  • client/rest/src/main/java/org/opensearch/client/StreamingResponse.java
  • plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java
  • server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java
🚧 Files skipped from review as they are similar to previous changes (3)
  • CHANGELOG.md
  • client/rest/src/main/java/org/opensearch/client/StreamingRequest.java
  • plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.

Applied to files:

  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
  • plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: gradle-check
🔇 Additional comments (5)
plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java (1)

85-88: LGTM! Header emission guard prevents duplicate headers.

The hasSentHeaders() check correctly ensures that status and headers are emitted only once, preventing the duplicate header issue mentioned in the PR description.

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java (1)

13-14: Good: ConcurrentLinkedQueue addresses the previous thread-safety concern.

The switch from ArrayDeque to ConcurrentLinkedQueue correctly handles concurrent access from multiple threads, resolving the data corruption risk flagged in the earlier review.

Also applies to: 25-31

server/src/main/java/org/opensearch/http/DefaultStreamingRestChannel.java (2)

98-117: Header enrichment logic is correct and avoids the previous immutable-list exception.

The reordered logic now:

  1. Merges custom headers from threadContext (lines 102-109), using computeIfAbsent with mutable ArrayList
  2. Sets X_OPAQUE_ID from the request (lines 111-114) via put, which replaces any threadContext value

This ordering prevents the UnsupportedOperationException that would occur if an immutable List.of() were inserted first and then computeIfAbsent tried to append to it. The request's X_OPAQUE_ID correctly takes precedence for response correlation.


119-123: LGTM! sendResponse enriches headers before delegating.

The override ensures prepareResponse is invoked to merge thread-context and request headers before the superclass sendResponse logic runs.

client/rest/src/main/java/org/opensearch/client/StreamingResponse.java (1)

98-121: LGTM! Header accessor methods are well-structured and consistent.

Both getHeaders() and getHeader(String name) follow the same error-handling and blocking patterns as the existing getStatusLine() and getWarnings() methods. The null-safety in getHeader (using mapNotNull) correctly handles missing headers.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Jan 6, 2026

❌ Gradle check result for 66617c3: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Jan 6, 2026

❌ Gradle check result for 66617c3: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Jan 6, 2026

❌ Gradle check result for 66617c3: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Jan 6, 2026

✅ Gradle check result for 66617c3: SUCCESS

@reta reta force-pushed the fix.opaque.propagation branch from 66617c3 to 95b4679 Compare January 6, 2026 22:30
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Jan 6, 2026

❌ Gradle check result for 95b4679: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@reta reta removed the autocut label Jan 7, 2026
Signed-off-by: Andriy Redko <drreta@gmail.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (1)

139-166: Consider verifying the specific error message.

The test correctly validates that duplicate X-Opaque-Id headers result in a 400 error (preventing the NPE mentioned in the PR description). However, you could strengthen the assertion by verifying the error message mentions the duplicate header issue, similar to how other tests check specific error content.

♻️ Optional: Match the error checking pattern from testStreamingBadRequest

For consistency with testStreamingBadRequest() (line 352-355), consider using .expectErrorMatches():

 StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
-    .expectError(ResponseException.class)
+    .expectErrorMatches(
+        ex -> ex instanceof ResponseException && ((ResponseException) ex).getResponse().getStatusLine().getStatusCode() == 400
+    )
     .verify();

Alternatively, to verify the specific error message about duplicate headers:

 StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
-    .expectError(ResponseException.class)
+    .expectErrorMatches(
+        ex -> ex instanceof ResponseException 
+            && ((ResponseException) ex).getResponse().getStatusLine().getStatusCode() == 400
+            && ex.getMessage().contains("X-Opaque-Id")
+    )
     .verify();
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a47e989 and a25343a.

📒 Files selected for processing (1)
  • plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.

Applied to files:

  • plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
📚 Learning: 2025-12-12T13:31:51.234Z
Learnt from: andriyredko
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:642-646
Timestamp: 2025-12-12T13:31:51.234Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), the createBuilderWithPort() helper intentionally uses randomBoolean() for the HTTP/3 enabled setting to ensure all tests validate correct behavior with both HTTP/3 enabled and disabled configurations.

Applied to files:

  • plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java
🧬 Code graph analysis (1)
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (1)
libs/core/src/main/java/org/opensearch/core/xcontent/ObjectPath.java (1)
  • ObjectPath (42-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: gradle-check
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: Analyze (java)
  • GitHub Check: detect-breaking-change
🔇 Additional comments (3)
plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java (3)

12-12: LGTM!

The RequestOptions import is necessary for setting custom headers in the new X-Opaque-Id tests.


52-55: LGTM!

The conversion to text blocks significantly improves readability. The use of String.format with Locale.ENGLISH (or Locale.getDefault() where appropriate) ensures consistent formatting across platforms.

Also applies to: 169-172, 212-215, 257-260, 302-305, 338-341, 363-368, 393-396, 419-422


92-137: LGTM!

This test comprehensively validates X-Opaque-Id header propagation for streaming responses. The test correctly:

  • Sets a single X-Opaque-Id header via RequestOptions
  • Verifies streaming response content
  • Asserts the X-Opaque-Id header is returned in the response (line 128)
  • Confirms successful indexing of all documents

The test follows the established pattern and uses VirtualTimeScheduler for deterministic timing control.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Jan 7, 2026

❌ Gradle check result for a25343a: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Jan 7, 2026

❌ Gradle check result for a25343a: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Jan 7, 2026

❌ Gradle check result for a25343a: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Jan 8, 2026

✅ Gradle check result for a25343a: SUCCESS

@Hailong-am
Copy link
Copy Markdown
Contributor

Hailong-am commented Jan 8, 2026

Thanks @reta this will resolve NPE and i test the fix my locally that will not fix for my case. I just attached a debug screenshot of what's the situation for my case, i believe that's thread context pollution and not been correctly restore.

My code https://github.com/opensearch-project/ml-commons/blob/feature/3.4-release-prep/plugin/src/main/java/org/opensearch/ml/rest/RestMLExecuteStreamAction.java

image

@reta reta merged commit 733931b into opensearch-project:main Jan 8, 2026
38 of 47 checks passed
@reta
Copy link
Copy Markdown
Contributor Author

reta commented Jan 8, 2026

Thanks @reta this will resolve NPE and i test the fix my locally that will not fix for my case. I just attached a debug screenshot of what's the situation for my case, i believe that's thread context pollution and not been correctly restore.

Thanks @Hailong-am , moving to this one (thread context), thanks for checking out. Do you happen to have a reproducer for it? So far I cannot reproduce it will ml-commons (built from main)

@Hailong-am
Copy link
Copy Markdown
Contributor

Thanks @reta this will resolve NPE and i test the fix my locally that will not fix for my case. I just attached a debug screenshot of what's the situation for my case, i believe that's thread context pollution and not been correctly restore.

Thanks @Hailong-am , moving to this one (thread context), thanks for checking out. Do you happen to have a reproducer for it? So far I cannot reproduce it will ml-commons (built from main)

You might need to use branch feature/3.4-release-prep and it might a little complicated to setup agents etc.I have a local env can reproduce it and will try to make a simpler version for reproduce.

tanyabti pushed a commit to tanyabti/OpenSearch that referenced this pull request Feb 24, 2026
…) for streaming Reactor Netty 4 transport (opensearch-project#20371)

* Fix X-Opaque-Id header propagation (along with other response headers) for streaming Reactor Netty 4 transport

Signed-off-by: Andriy Redko <drreta@gmail.com>

* Address code review comments

Signed-off-by: Andriy Redko <drreta@gmail.com>

---------

Signed-off-by: Andriy Redko <drreta@gmail.com>
tanyabti pushed a commit to tanyabti/OpenSearch that referenced this pull request Feb 24, 2026
…) for streaming Reactor Netty 4 transport (opensearch-project#20371)

* Fix X-Opaque-Id header propagation (along with other response headers) for streaming Reactor Netty 4 transport

Signed-off-by: Andriy Redko <drreta@gmail.com>

* Address code review comments

Signed-off-by: Andriy Redko <drreta@gmail.com>

---------

Signed-off-by: Andriy Redko <drreta@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

autocut bug Something isn't working >test-failure Test failure from CI, local build, etc.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] Chatbot Activation Error: X-Opaque-Id Already Present

4 participants