Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sgl-router/src/routers/grpc/common/responses/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use axum::{
response::{IntoResponse, Response},
};
use serde_json::{json, to_value};
use tracing::{debug, warn};
use tracing::{debug, error, warn};

use crate::{
core::WorkerRegistry,
Expand Down Expand Up @@ -44,6 +44,10 @@ pub async fn ensure_mcp_connection(
.await
.is_none()
{
error!(
function = "ensure_mcp_connection",
"Failed to connect to MCP server"
);
return Err(error::failed_dependency(
"Failed to connect to MCP server. Check server_url and authorization.",
));
Expand Down
13 changes: 8 additions & 5 deletions sgl-router/src/routers/grpc/common/stages/client_acquisition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use async_trait::async_trait;
use axum::response::Response;
use tracing::error;

use super::PipelineStage;
use crate::routers::grpc::{
Expand All @@ -15,11 +16,13 @@ pub struct ClientAcquisitionStage;
#[async_trait]
impl PipelineStage for ClientAcquisitionStage {
async fn execute(&self, ctx: &mut RequestContext) -> Result<Option<Response>, Response> {
let workers = ctx
.state
.workers
.as_ref()
.ok_or_else(|| error::internal_error("Worker selection not completed"))?;
let workers = ctx.state.workers.as_ref().ok_or_else(|| {
error!(
function = "ClientAcquisitionStage::execute",
"Worker selection stage not completed"
);
error::internal_error("Worker selection not completed")
})?;

let clients = match workers {
WorkerSelection::Single { worker } => {
Expand Down
13 changes: 8 additions & 5 deletions sgl-router/src/routers/grpc/common/stages/dispatch_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::{SystemTime, UNIX_EPOCH};

use async_trait::async_trait;
use axum::response::Response;
use tracing::error;

use super::PipelineStage;
use crate::routers::grpc::{
Expand All @@ -17,11 +18,13 @@ pub struct DispatchMetadataStage;
#[async_trait]
impl PipelineStage for DispatchMetadataStage {
async fn execute(&self, ctx: &mut RequestContext) -> Result<Option<Response>, Response> {
let proto_request = ctx
.state
.proto_request
.as_ref()
.ok_or_else(|| error::internal_error("Proto request not built"))?;
let proto_request = ctx.state.proto_request.as_ref().ok_or_else(|| {
error!(
function = "DispatchMetadataStage::execute",
"Proto request not built"
);
error::internal_error("Proto request not built")
})?;

let request_id = proto_request.request_id.clone();
let model = match &ctx.input.request_type {
Expand Down
71 changes: 49 additions & 22 deletions sgl-router/src/routers/grpc/common/stages/request_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use async_trait::async_trait;
use axum::response::Response;
use tracing::error;

use super::PipelineStage;
use crate::{
Expand Down Expand Up @@ -35,17 +36,21 @@ impl RequestExecutionStage {
#[async_trait]
impl PipelineStage for RequestExecutionStage {
async fn execute(&self, ctx: &mut RequestContext) -> Result<Option<Response>, Response> {
let proto_request = ctx
.state
.proto_request
.take()
.ok_or_else(|| error::internal_error("Proto request not built"))?;

let clients = ctx
.state
.clients
.as_mut()
.ok_or_else(|| error::internal_error("Client acquisition not completed"))?;
let proto_request = ctx.state.proto_request.take().ok_or_else(|| {
error!(
function = "RequestExecutionStage::execute",
"Proto request not built"
);
error::internal_error("Proto request not built")
})?;

let clients = ctx.state.clients.as_mut().ok_or_else(|| {
error!(
function = "RequestExecutionStage::execute",
"Client acquisition not completed"
);
error::internal_error("Client acquisition not completed")
})?;

let result = match self.mode {
ExecutionMode::Single => self.execute_single(proto_request, clients).await?,
Expand All @@ -70,14 +75,22 @@ impl RequestExecutionStage {
proto_request: proto::GenerateRequest,
clients: &mut ClientSelection,
) -> Result<ExecutionResult, Response> {
let client = clients
.single_mut()
.ok_or_else(|| error::internal_error("Expected single client but got dual"))?;

let stream = client
.generate(proto_request)
.await
.map_err(|e| error::internal_error(format!("Failed to start generation: {}", e)))?;
let client = clients.single_mut().ok_or_else(|| {
error!(
function = "execute_single",
"Expected single client but got dual"
);
error::internal_error("Expected single client but got dual")
})?;

let stream = client.generate(proto_request).await.map_err(|e| {
error!(
function = "execute_single",
error = %e,
"Failed to start generation"
);
error::internal_error(format!("Failed to start generation: {}", e))
})?;

Ok(ExecutionResult::Single { stream })
}
Expand All @@ -87,9 +100,13 @@ impl RequestExecutionStage {
proto_request: proto::GenerateRequest,
clients: &mut ClientSelection,
) -> Result<ExecutionResult, Response> {
let (prefill_client, decode_client) = clients
.dual_mut()
.ok_or_else(|| error::internal_error("Expected dual clients but got single"))?;
let (prefill_client, decode_client) = clients.dual_mut().ok_or_else(|| {
error!(
function = "execute_dual_dispatch",
"Expected dual clients but got single"
);
error::internal_error("Expected dual clients but got single")
})?;

let prefill_request = proto_request.clone();
let decode_request = proto_request;
Expand All @@ -103,6 +120,11 @@ impl RequestExecutionStage {
let prefill_stream = match prefill_result {
Ok(s) => s,
Err(e) => {
error!(
function = "execute_dual_dispatch",
error = %e,
"Prefill worker failed to start"
);
return Err(error::internal_error(format!(
"Prefill worker failed to start: {}",
e
Expand All @@ -114,6 +136,11 @@ impl RequestExecutionStage {
let decode_stream = match decode_result {
Ok(s) => s,
Err(e) => {
error!(
function = "execute_dual_dispatch",
error = %e,
"Decode worker failed to start"
);
return Err(error::internal_error(format!(
"Decode worker failed to start: {}",
e
Expand Down
26 changes: 20 additions & 6 deletions sgl-router/src/routers/grpc/common/stages/worker_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use axum::response::Response;
use tracing::warn;
use tracing::{error, warn};

use super::PipelineStage;
use crate::{
Expand Down Expand Up @@ -47,11 +47,13 @@ impl WorkerSelectionStage {
#[async_trait]
impl PipelineStage for WorkerSelectionStage {
async fn execute(&self, ctx: &mut RequestContext) -> Result<Option<Response>, Response> {
let prep = ctx
.state
.preparation
.as_ref()
.ok_or_else(|| error::internal_error("Preparation stage not completed"))?;
let prep = ctx.state.preparation.as_ref().ok_or_else(|| {
error!(
function = "WorkerSelectionStage::execute",
"Preparation stage not completed"
);
error::internal_error("Preparation stage not completed")
})?;

// For Harmony, use selection_text produced during Harmony encoding
// Otherwise, use original_text from regular preparation
Expand All @@ -66,6 +68,12 @@ impl PipelineStage for WorkerSelectionStage {
match self.select_single_worker(ctx.input.model_id.as_deref(), text) {
Some(w) => WorkerSelection::Single { worker: w },
None => {
error!(
function = "WorkerSelectionStage::execute",
mode = "Regular",
model_id = ?ctx.input.model_id,
"No available workers for model"
);
return Err(error::service_unavailable(format!(
"No available workers for model: {:?}",
ctx.input.model_id
Expand All @@ -77,6 +85,12 @@ impl PipelineStage for WorkerSelectionStage {
match self.select_pd_pair(ctx.input.model_id.as_deref(), text) {
Some((prefill, decode)) => WorkerSelection::Dual { prefill, decode },
None => {
error!(
function = "WorkerSelectionStage::execute",
mode = "PrefillDecode",
model_id = ?ctx.input.model_id,
"No available PD worker pairs for model"
);
return Err(error::service_unavailable(format!(
"No available PD worker pairs for model: {:?}",
ctx.input.model_id
Expand Down
6 changes: 0 additions & 6 deletions sgl-router/src/routers/grpc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use axum::{
Json,
};
use serde_json::json;
use tracing::{error, warn};

/// Create a 500 Internal Server Error response
///
Expand All @@ -21,7 +20,6 @@ use tracing::{error, warn};
/// ```
pub fn internal_error(message: impl Into<String>) -> Response {
let msg = message.into();
error!("{}", msg);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({
Expand All @@ -45,7 +43,6 @@ pub fn internal_error(message: impl Into<String>) -> Response {
/// ```
pub fn bad_request(message: impl Into<String>) -> Response {
let msg = message.into();
error!("{}", msg);
(
StatusCode::BAD_REQUEST,
Json(json!({
Expand All @@ -69,7 +66,6 @@ pub fn bad_request(message: impl Into<String>) -> Response {
/// ```
pub fn not_found(message: impl Into<String>) -> Response {
let msg = message.into();
warn!("{}", msg);
(
StatusCode::NOT_FOUND,
Json(json!({
Expand All @@ -93,7 +89,6 @@ pub fn not_found(message: impl Into<String>) -> Response {
/// ```
pub fn service_unavailable(message: impl Into<String>) -> Response {
let msg = message.into();
warn!("{}", msg);
(
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({
Expand All @@ -117,7 +112,6 @@ pub fn service_unavailable(message: impl Into<String>) -> Response {
/// ```
pub fn failed_dependency(message: impl Into<String>) -> Response {
let msg = message.into();
warn!("{}", msg);
(
StatusCode::FAILED_DEPENDENCY,
Json(json!({
Expand Down
29 changes: 27 additions & 2 deletions sgl-router/src/routers/grpc/harmony/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;

use axum::response::Response;
use proto::generate_complete::MatchedStop::{MatchedStopStr, MatchedTokenId};
use tracing::error;

use super::HarmonyParserAdapter;
use crate::{
Expand Down Expand Up @@ -63,6 +64,11 @@ impl HarmonyResponseProcessor {

// Parse Harmony channels with HarmonyParserAdapter
let mut parser = HarmonyParserAdapter::new().map_err(|e| {
error!(
function = "process_non_streaming_chat_response",
error = %e,
"Failed to create Harmony parser"
);
error::internal_error(format!("Failed to create Harmony parser: {}", e))
})?;

Expand All @@ -73,7 +79,14 @@ impl HarmonyResponseProcessor {
complete.finish_reason.clone(),
matched_stop.clone(),
)
.map_err(|e| error::internal_error(format!("Harmony parsing failed: {}", e)))?;
.map_err(|e| {
error!(
function = "process_non_streaming_chat_response",
error = %e,
"Harmony parsing failed on complete response"
);
error::internal_error(format!("Harmony parsing failed: {}", e))
})?;

// Build response message (assistant)
let message = ChatCompletionMessage {
Expand Down Expand Up @@ -171,6 +184,11 @@ impl HarmonyResponseProcessor {

// Parse Harmony channels
let mut parser = HarmonyParserAdapter::new().map_err(|e| {
error!(
function = "process_responses_iteration",
error = %e,
"Failed to create Harmony parser"
);
error::internal_error(format!("Failed to create Harmony parser: {}", e))
})?;

Expand All @@ -190,7 +208,14 @@ impl HarmonyResponseProcessor {
complete.finish_reason.clone(),
matched_stop,
)
.map_err(|e| error::internal_error(format!("Harmony parsing failed: {}", e)))?;
.map_err(|e| {
error!(
function = "process_responses_iteration",
error = %e,
"Harmony parsing failed on complete response"
);
error::internal_error(format!("Harmony parsing failed: {}", e))
})?;

// VALIDATION: Check if model incorrectly generated Tool role messages
// This happens when the model copies the format of tool result messages
Expand Down
Loading
Loading