[router][grpc] Refine streaming processes#11277
Conversation
Summary of ChangesHello @CatherineSue, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request optimizes the streaming process within the router by integrating a mechanism to detect when a language model is in a 'reasoning stage.' By exposing this state, the system can intelligently bypass tool call parsing during reasoning, which streamlines operations and enhances overall performance for streaming responses. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces an optimization to skip tool call parsing during the model's reasoning phase in streaming mode. This is achieved by adding an is_in_reasoning method to the ReasoningParser trait and its implementations. The gRPC routers then use this state to conditionally bypass tool parsing logic.
The changes are logical and correctly implement the intended optimization. My review includes a couple of suggestions to improve maintainability and robustness:
- Refactoring duplicated code between the two gRPC router implementations.
- Improving error handling for mutex locks to prevent potential panics.
Overall, this is a good enhancement to the streaming process.
| if let Some(pooled_parser) = reasoning_parsers.get(&index) { | ||
| let parse_result = { | ||
| let (parse_result, in_reasoning) = { | ||
| let mut parser = pooled_parser.lock().unwrap(); |
There was a problem hiding this comment.
Using .unwrap() on a Mutex lock can lead to a panic if the lock is poisoned (i.e., if another thread panics while holding the lock). This could crash the entire router service. It's safer to handle the PoisonError gracefully to make the service more robust.
Consider recovering from the poisoned state by taking ownership of the mutex data. This same issue exists in sgl-router/src/routers/grpc/router.rs.
| let mut parser = pooled_parser.lock().unwrap(); | |
| let mut parser = match pooled_parser.lock() { | |
| Ok(guard) => guard, | |
| Err(poisoned) => { | |
| warn!("Reasoning parser lock was poisoned. Recovering by taking ownership of the data."); | |
| poisoned.into_inner() | |
| } | |
| }; |
| if let Some(pooled_parser) = reasoning_parsers.get(&index) { | ||
| let parse_result = { | ||
| let (parse_result, in_reasoning) = { | ||
| let mut parser = pooled_parser.lock().unwrap(); |
There was a problem hiding this comment.
Calling .unwrap() on a Mutex lock is risky in a concurrent environment. If another thread panics while holding the lock, this unwrap() will also panic, potentially bringing down the service. To improve robustness, you should handle the PoisonError that lock() can return.
| let mut parser = pooled_parser.lock().unwrap(); | |
| let mut parser = match pooled_parser.lock() { | |
| Ok(guard) => guard, | |
| Err(poisoned) => { | |
| warn!("Reasoning parser lock was poisoned. Recovering by taking ownership of the data."); | |
| poisoned.into_inner() | |
| } | |
| }; |
| model: &str, | ||
| created: u64, | ||
| ) -> (String, Option<ChatCompletionStreamResponse>) { | ||
| ) -> (String, Option<ChatCompletionStreamResponse>, bool) { |
There was a problem hiding this comment.
The implementation of process_reasoning_stream (lines 486-549) is identical to the one in sgl-router/src/routers/grpc/pd_router.rs (lines 1172-1232). To avoid code duplication and improve maintainability, consider moving this logic to a shared utility function, for example in sgl-router/src/routers/grpc/utils.rs.
You could define a free function that takes &ReasoningParserFactory as an argument and then call it from both GrpcRouter and GrpcPDRouter.
Motivation
In streaming, if the model is still in reasoning stage, we can skip tool call parsing.
Modifications
is_in_reasoningfor reasoning parsers.is_in_reasoningafter callingreasoing_parser. parse_reasoning_streaming_incrementalin_reasoningbesides tool_choice and tools for tool call parsing.Accuracy Tests
Benchmarking and Profiling
Checklist