Skip to content

[model-gateway] refactor WorkerManager with fan_out helper and thin handlers#15583

Merged
slin1237 merged 1 commit intomainfrom
refactor-n/3
Dec 21, 2025
Merged

[model-gateway] refactor WorkerManager with fan_out helper and thin handlers#15583
slin1237 merged 1 commit intomainfrom
refactor-n/3

Conversation

@slin1237
Copy link
Collaborator

  • Add generic fan_out() helper for parallel worker requests
  • Add EngineMetricsResult and IntoResponse impls for result types
  • Refactor flush_cache_all, get_all_worker_loads, get_engine_metrics
  • Remove duplicate fan_out_simple_request (replaced by fan_out)
  • Make server.rs handlers thin wrappers using into_response()
  • Remove redundant doc comments

Checklist

…andlers

- Add generic fan_out() helper for parallel worker requests
- Add EngineMetricsResult and IntoResponse impls for result types
- Refactor flush_cache_all, get_all_worker_loads, get_engine_metrics
- Remove duplicate fan_out_simple_request (replaced by fan_out)
- Make server.rs handlers thin wrappers using into_response()
- Remove redundant doc comments
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @slin1237, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request focuses on a significant refactoring of the WorkerManager module and related API handlers. The primary goal is to enhance code reusability, maintainability, and readability by introducing a generic fan-out mechanism for parallel worker requests and by centralizing HTTP response logic within dedicated result types. This approach leads to more concise and focused API handler functions, making the codebase easier to understand and extend.

Highlights

  • Generic Fan-Out Helper: Introduced a new generic fan_out asynchronous function to centralize and simplify parallel requests to multiple workers, replacing previous ad-hoc implementations.
  • IntoResponse Implementations: Added IntoResponse trait implementations for FlushCacheResult, WorkerLoadsResult, and a new EngineMetricsResult enum. This moves the HTTP response construction logic into the result types themselves, making API handlers cleaner.
  • Refactored WorkerManager Functions: The flush_cache_all, get_all_worker_loads, and get_engine_metrics functions within WorkerManager have been refactored to leverage the new fan_out helper and return types that implement IntoResponse, significantly reducing their complexity.
  • Thinner API Handlers: API handlers in server.rs for engine_metrics, flush_cache, and get_loads are now much 'thinner', acting primarily as wrappers that call the refactored WorkerManager functions and then .into_response() on their results.
  • Code Cleanup: Removed the redundant fan_out_simple_request function and numerous unnecessary doc comments across the worker_manager.rs file, improving code clarity and reducing boilerplate.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request is a solid refactoring of the WorkerManager. The introduction of the generic fan_out helper for parallel worker requests is a great improvement, simplifying flush_cache_all and get_engine_metrics. The addition of IntoResponse implementations for result types successfully thins down the handlers in server.rs, leading to cleaner and more maintainable code. The removal of duplicated code and redundant comments is also a welcome change. I have a few suggestions to enhance robustness and logging.

warn!("Flushing cache for ALL workers - this may impact performance temporarily");

) -> FlushCacheResult {
let workers = worker_registry.get_all();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The warning log Flushing cache for ALL workers - this may impact performance temporarily was removed in this refactoring. This is a helpful warning for operators, as flushing the cache can be a disruptive operation. It would be good to add it back at the beginning of the function.

Suggested change
let workers = worker_registry.get_all();
warn!("Flushing cache for ALL workers - this may impact performance temporarily");
let workers = worker_registry.get_all();

Comment on lines +263 to +268
Ok(json) if json.is_array() => json
.as_array()
.unwrap()
.iter()
.filter_map(|e| e.get("num_tokens").and_then(|v| v.as_i64()))
.sum::<i64>() as isize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The use of .unwrap() here, while currently safe due to the if json.is_array() guard, can be brittle. Future code changes might inadvertently cause a panic. It's more idiomatic and robust in Rust to use pattern matching to handle Option and Result types, avoiding unwrap() entirely.

Suggested change
Ok(json) if json.is_array() => json
.as_array()
.unwrap()
.iter()
.filter_map(|e| e.get("num_tokens").and_then(|v| v.as_i64()))
.sum::<i64>() as isize,
Ok(Value::Array(array)) => array
.iter()
.filter_map(|e| e.get("num_tokens").and_then(|v| v.as_i64()))
.sum::<i64>() as isize,

Comment on lines +288 to +299
for resp in responses {
if let Ok(r) = resp.result {
if r.status().is_success() {
if let Ok(text) = r.text().await {
metric_packs.push(MetricPack {
labels: vec![("worker_addr".into(), resp.url)],
metrics_text: text,
});
}
}
})
.collect();

let responses: Vec<_> = stream::iter(futures)
.buffer_unordered(MAX_CONCURRENT)
.filter_map(|r| async { r })
.collect()
.await;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation silently ignores failed requests when fetching metrics from workers. This can make debugging difficult, as there would be no indication why metrics from a particular worker are missing. The previous implementation (fan_out_simple_request) logged warnings for such failures. It would be beneficial to reintroduce logging for failed connections, non-successful HTTP statuses, and body reading errors.

        for resp in responses {
            match resp.result {
                Ok(r) => {
                    if r.status().is_success() {
                        match r.text().await {
                            Ok(text) => {
                                metric_packs.push(MetricPack {
                                    labels: vec![("worker_addr".into(), resp.url)],
                                    metrics_text: text,
                                });
                            }
                            Err(e) => {
                                warn!("Failed to read metrics response body from {}: {}", resp.url, e);
                            }
                        }
                    } else {
                        warn!("Failed to get metrics from {}: HTTP {}", resp.url, r.status());
                    }
                }
                Err(e) => {
                    warn!("Failed to connect to {} for metrics: {}", resp.url, e);
                }
            }
        }

@slin1237 slin1237 merged commit b9d7860 into main Dec 21, 2025
64 of 69 checks passed
@slin1237 slin1237 deleted the refactor-n/3 branch December 21, 2025 23:29
jiaming1130 pushed a commit to zhuyijie88/sglang that referenced this pull request Dec 25, 2025
GuoYechang pushed a commit to GuoYechang/sglang that referenced this pull request Jan 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments