[model-gateway] refactor WorkerManager with fan_out helper and thin handlers#15583
[model-gateway] refactor WorkerManager with fan_out helper and thin handlers#15583
Conversation
…andlers - Add generic fan_out() helper for parallel worker requests - Add EngineMetricsResult and IntoResponse impls for result types - Refactor flush_cache_all, get_all_worker_loads, get_engine_metrics - Remove duplicate fan_out_simple_request (replaced by fan_out) - Make server.rs handlers thin wrappers using into_response() - Remove redundant doc comments
Summary of ChangesHello @slin1237, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request focuses on a significant refactoring of the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request is a solid refactoring of the WorkerManager. The introduction of the generic fan_out helper for parallel worker requests is a great improvement, simplifying flush_cache_all and get_engine_metrics. The addition of IntoResponse implementations for result types successfully thins down the handlers in server.rs, leading to cleaner and more maintainable code. The removal of duplicated code and redundant comments is also a welcome change. I have a few suggestions to enhance robustness and logging.
| warn!("Flushing cache for ALL workers - this may impact performance temporarily"); | ||
|
|
||
| ) -> FlushCacheResult { | ||
| let workers = worker_registry.get_all(); |
There was a problem hiding this comment.
The warning log Flushing cache for ALL workers - this may impact performance temporarily was removed in this refactoring. This is a helpful warning for operators, as flushing the cache can be a disruptive operation. It would be good to add it back at the beginning of the function.
| let workers = worker_registry.get_all(); | |
| warn!("Flushing cache for ALL workers - this may impact performance temporarily"); | |
| let workers = worker_registry.get_all(); |
| Ok(json) if json.is_array() => json | ||
| .as_array() | ||
| .unwrap() | ||
| .iter() | ||
| .filter_map(|e| e.get("num_tokens").and_then(|v| v.as_i64())) | ||
| .sum::<i64>() as isize, |
There was a problem hiding this comment.
The use of .unwrap() here, while currently safe due to the if json.is_array() guard, can be brittle. Future code changes might inadvertently cause a panic. It's more idiomatic and robust in Rust to use pattern matching to handle Option and Result types, avoiding unwrap() entirely.
| Ok(json) if json.is_array() => json | |
| .as_array() | |
| .unwrap() | |
| .iter() | |
| .filter_map(|e| e.get("num_tokens").and_then(|v| v.as_i64())) | |
| .sum::<i64>() as isize, | |
| Ok(Value::Array(array)) => array | |
| .iter() | |
| .filter_map(|e| e.get("num_tokens").and_then(|v| v.as_i64())) | |
| .sum::<i64>() as isize, |
| for resp in responses { | ||
| if let Ok(r) = resp.result { | ||
| if r.status().is_success() { | ||
| if let Ok(text) = r.text().await { | ||
| metric_packs.push(MetricPack { | ||
| labels: vec![("worker_addr".into(), resp.url)], | ||
| metrics_text: text, | ||
| }); | ||
| } | ||
| } | ||
| }) | ||
| .collect(); | ||
|
|
||
| let responses: Vec<_> = stream::iter(futures) | ||
| .buffer_unordered(MAX_CONCURRENT) | ||
| .filter_map(|r| async { r }) | ||
| .collect() | ||
| .await; | ||
| } | ||
| } |
There was a problem hiding this comment.
The current implementation silently ignores failed requests when fetching metrics from workers. This can make debugging difficult, as there would be no indication why metrics from a particular worker are missing. The previous implementation (fan_out_simple_request) logged warnings for such failures. It would be beneficial to reintroduce logging for failed connections, non-successful HTTP statuses, and body reading errors.
for resp in responses {
match resp.result {
Ok(r) => {
if r.status().is_success() {
match r.text().await {
Ok(text) => {
metric_packs.push(MetricPack {
labels: vec![("worker_addr".into(), resp.url)],
metrics_text: text,
});
}
Err(e) => {
warn!("Failed to read metrics response body from {}: {}", resp.url, e);
}
}
} else {
warn!("Failed to get metrics from {}: HTTP {}", resp.url, r.status());
}
}
Err(e) => {
warn!("Failed to connect to {} for metrics: {}", resp.url, e);
}
}
}
Checklist