Stream transport refactor#20359
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the
📝 WalkthroughWalkthroughEnhances Arrow Flight stream transport robustness by introducing asynchronous stream initialization with header prefetching, replacing Optional-based root handling with nullable fields, removing ThreadContext dependencies in favor of async flow patterns, improving thread naming in event loops, and refining error handling throughout the transport layer. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant ClientChannel as FlightClientChannel
participant Executor
participant Stream as FlightStream
participant Header as Header
participant Handler as ResponseHandler
rect rgb(240, 248, 255)
Note over Client,Handler: NEW: Asynchronous Prefetch Flow
end
Client->>ClientChannel: openStreamAndInvokeHandler()
ClientChannel->>ClientChannel: Get executor (warn if SAME)
ClientChannel->>Stream: Open stream asynchronously
Stream->>Stream: Prefetch first header
rect rgb(255, 250, 240)
Note over Stream,Handler: Async header retrieval
Stream-->>Header: Header ready (CompletableFuture)
end
Header->>Executor: Schedule handler task
Executor->>Handler: Stash thread context
Handler->>Handler: Validate header
Handler->>Handler: Set headers on thread context
Handler->>Handler: Delegate to response handler
Handler-->>Client: Response with header metadata
rect rgb(240, 255, 240)
Note over Handler,Client: Exception path
Handler->>Handler: Clean up on error
Handler-->>Client: Propagate exception
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (1 warning, 2 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
08c1ef7 to
6cf97cc
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java (1)
258-260: Consider reducing log verbosity for SAME executor warning.This warning logs on every stream using the
SAMEexecutor. IfSAMEis a common or intentional choice for certain handlers, this could generate significant log noise. Consider:
- Using
debuglevel instead ofwarn- Logging only once per handler type
- Adding a rate limiter
🔎 Suggested change
if (ThreadPool.Names.SAME.equals(executor)) { - logger.warn("Stream transport handler using SAME executor, which may cause blocking behavior"); + logger.debug("Stream transport handler using SAME executor, which may cause blocking behavior"); }plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/VectorStreamOutput.java (1)
73-76: Emptyflush()may cause confusion.The
flush()method is now a no-op, but callers inheriting fromStreamOutputmight expect callingflush()to commit buffered data. The actual buffer flush only happens ingetRoot()or whenwriteBytes()is called. This could lead to unexpected behavior if a caller relies on explicit flushing.Consider either:
- Having
flush()callflushTempBuffer()to commit pending bytes- Adding a comment explaining the no-op behavior
🔎 Suggested fix
@Override public void flush() { - + flushTempBuffer(); }
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
CHANGELOG.mdplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerConfig.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightStreamPlugin.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransport.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/VectorStreamOutput.javaplugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/ArrowStreamSerializationTests.javaplugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightClientChannelTests.javaplugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightTransportTestBase.java
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Applied to files:
plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightClientChannelTests.java
📚 Learning: 2025-12-13T20:16:15.318Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.
Applied to files:
plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightTransportTestBase.java
🧬 Code graph analysis (2)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java (1)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightUtils.java (1)
FlightUtils(13-30)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.java (3)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/ClientHeaderMiddleware.java (1)
ClientHeaderMiddleware(35-127)plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightErrorMapper.java (1)
FlightErrorMapper(31-116)plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightUtils.java (1)
FlightUtils(13-30)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: Analyze (java)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: detect-breaking-change
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (25, windows-latest)
🔇 Additional comments (15)
CHANGELOG.md (1)
29-29: LGTM!The changelog entry is correctly formatted and appropriately placed in the Fixed section.
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerConfig.java (1)
207-208: LGTM!Exposing
FLIGHT_THREAD_POOL_MIN_SIZEin the settings list allows external configuration of the minimum thread pool size. The setting is properly defined (lines 71-76) and consumed in theinit()method (line 143).plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java (1)
218-218: LGTM!The change ensures headers are propagated when completing a stream, even when no batches were sent. This aligns with the PR objective and maintains consistency with the batch sending path (line 160).
plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightClientChannelTests.java (1)
558-561: LGTM!The enhanced error handling properly captures transport exceptions during framework-level stream creation and signals test completion. This ensures the test can reliably observe and verify framework-level errors (verified at line 577).
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightStreamPlugin.java (1)
367-368: LGTM! ExposingSETTING_FLIGHT_PUBLISH_PORTallows external configuration of the Flight publish port, following the same pattern as the existingSETTING_FLIGHT_PUBLISH_HOSTsetting. The setting is properly defined inServerComponents.javaas an integer setting with key"arrow.flight.publish_port"and default value-1, and is correctly integrated with the transport publish port resolution logic.plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/ArrowStreamSerializationTests.java (1)
53-53: LGTM!The constructor argument change from
Optional.empty()tonullcorrectly aligns with the refactoredVectorStreamOutputAPI that now accepts a nullableVectorSchemaRootinstead ofOptional<VectorSchemaRoot>.plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightTransportTestBase.java (1)
108-110: LGTM!The TaskManager mock setup correctly stubs
taskExecutionStarted(any())to return a mockStoredContext, providing the necessary scaffolding for tests that exercise the new async thread context handling flow inFlightClientChannel.plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransport.java (2)
136-136: Verify the impact of reducing worker thread count.The worker event loop group size was reduced from
availableProcessors() * 2toavailableProcessors(). This halves the number of worker threads available for handling I/O operations.While this may reduce resource consumption, it could potentially impact throughput under high concurrent load. Ensure this change has been benchmarked or is aligned with observed resource utilization patterns.
412-416: Good improvement for observability.The custom
ThreadFactorywith meaningful thread names (os-grpc-boss-ELG-N,os-grpc-worker-ELG-N) improves debuggability and makes it easier to identify Flight transport threads in thread dumps and monitoring tools.plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java (1)
115-119: Good approach for correlation ID uniqueness.The scheme combining timestamp (upper bits) with a 20-bit channel counter ensures uniqueness across multiple channels and time. However, note that the 20-bit channel ID will wrap after ~1 million channels, though the timestamp component should still maintain uniqueness in practice.
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java (2)
129-141: Good fix for header propagation.This change addresses the PR objective to "fix header propagation when no batches are sent." When
root == null(no batches were sent), the header is now explicitly set viamiddleware.setHeader(header)before completing the stream. This ensures headers are always propagated to the client regardless of whether data batches were transmitted.
51-51: Appropriate simplification.Replacing
Optional<VectorSchemaRoot>with a nullableVectorSchemaRootis appropriate here sinceOptionalwas being used for a mutable field rather than return values. Thevolatilemodifier ensures visibility across threads.plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/VectorStreamOutput.java (1)
44-71: Good buffering optimization.The buffering strategy effectively reduces vector operations for byte-at-a-time writes by accumulating them in an 8KB buffer before committing to the vector. The
writeBytespath correctly flushes any pending buffered data before performing the direct write.plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.java (2)
89-109: Good use of virtual threads for async I/O.Using
Thread.ofVirtual()for the prefetch operation is appropriate as it moves potentially blocking gRPC I/O off the calling thread without the overhead of platform threads. This aligns with the PR objective to "make FlightClientChannel sendMessage asynchronous."However, note the timing calculation on lines 93 and 97:
117-143: Well-structured batch iteration with version propagation.The
nextResponse()method correctly:
- Uses the prefetched first batch without calling
next()again- Sets the version from the initial header for proper deserialization (addressing PR objective "Set version from headers for incoming response deserialization")
- Includes slow-log tracking for performance monitoring
Note: The ternary expression on line 124 assumes single-threaded consumption, which aligns with typical stream response usage patterns.
...rrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java
Show resolved
Hide resolved
...-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.java
Outdated
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #20359 +/- ##
============================================
- Coverage 73.25% 73.23% -0.02%
+ Complexity 71979 71977 -2
============================================
Files 5796 5796
Lines 329287 329367 +80
Branches 47419 47445 +26
============================================
+ Hits 241203 241206 +3
- Misses 68759 68797 +38
- Partials 19325 19364 +39 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
...arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/VectorStreamOutput.java
Show resolved
Hide resolved
...-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.java
Outdated
Show resolved
Hide resolved
...-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.java
Show resolved
Hide resolved
...ns/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransport.java
Outdated
Show resolved
Hide resolved
...rrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java
Show resolved
Hide resolved
...-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.java
Show resolved
Hide resolved
6e37c07 to
33b0cd6
Compare
33b0cd6 to
4f4bf1b
Compare
...rrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java
Outdated
Show resolved
Hide resolved
...rrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java
Show resolved
Hide resolved
...rrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java
Outdated
Show resolved
Hide resolved
...rrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java
Show resolved
Hide resolved
...rrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java
Outdated
Show resolved
Hide resolved
|
❌ Gradle check result for 4f4bf1b: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
4f4bf1b to
5e9115b
Compare
|
❌ Gradle check result for 5e9115b: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
...rrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java
Outdated
Show resolved
Hide resolved
...rrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java
Show resolved
Hide resolved
...rrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java
Show resolved
Hide resolved
5e9115b to
6b39983
Compare
|
❌ Gradle check result for 6b39983: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for 6b39983: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
* make FlightClientChannel sendMessage async and use of virtual threads * fix for headers when no batches are sent * set version from headers for incoming response deserialization Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
6b39983 to
35edeec
Compare
* Stream transport refactor * make FlightClientChannel sendMessage async and use of virtual threads * fix for headers when no batches are sent * set version from headers for incoming response deserialization * more debug logs and address PR comments --------- Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
* Stream transport refactor * make FlightClientChannel sendMessage async and use of virtual threads * fix for headers when no batches are sent * set version from headers for incoming response deserialization * more debug logs and address PR comments --------- Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
FlightClientChannelsendMessageasyncMinor
OptionalinFlightServerChannel.Description
[Describe what this change achieves]
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.
Summary by CodeRabbit
New Features
Bug Fixes
Chores
✏️ Tip: You can customize this high-level summary in your review settings.