Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions sgl-router/src/routers/grpc/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
//! Centralized error response handling for all routers
//!
//! This module provides consistent error responses across OpenAI and gRPC routers,
//! ensuring all errors follow OpenAI's API error format.

use axum::{
http::StatusCode,
response::{IntoResponse, Response},
Json,
};
use serde_json::json;
use tracing::{error, warn};

/// Create a 500 Internal Server Error response
///
/// Use this for unexpected server-side errors, database failures, etc.
///
/// # Example
/// ```ignore
/// return Err(internal_error("Database connection failed"));
/// ```
pub fn internal_error(message: impl Into<String>) -> Response {
let msg = message.into();
error!("{}", msg);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({
"error": {
"message": msg,
"type": "internal_error",
"code": 500
}
})),
)
.into_response()
}

/// Create a 400 Bad Request response
///
/// Use this for invalid request parameters, malformed JSON, validation errors, etc.
///
/// # Example
/// ```ignore
/// return Err(bad_request("Invalid conversation ID format"));
/// ```
pub fn bad_request(message: impl Into<String>) -> Response {
let msg = message.into();
error!("{}", msg);
(
StatusCode::BAD_REQUEST,
Json(json!({
"error": {
"message": msg,
"type": "invalid_request_error",
"code": 400
}
})),
)
.into_response()
}

/// Create a 404 Not Found response
///
/// Use this for resources that don't exist (conversations, responses, etc.)
///
/// # Example
/// ```ignore
/// return Err(not_found(format!("Conversation '{}' not found", id)));
/// ```
pub fn not_found(message: impl Into<String>) -> Response {
let msg = message.into();
warn!("{}", msg);
(
StatusCode::NOT_FOUND,
Json(json!({
"error": {
"message": msg,
"type": "invalid_request_error",
"code": 404
}
})),
)
.into_response()
}

/// Create a 503 Service Unavailable response
///
/// Use this for temporary service issues like no workers available, rate limiting, etc.
///
/// # Example
/// ```ignore
/// return Err(service_unavailable("No workers available for this model"));
/// ```
pub fn service_unavailable(message: impl Into<String>) -> Response {
let msg = message.into();
warn!("{}", msg);
(
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({
"error": {
"message": msg,
"type": "service_unavailable",
"code": 503
}
})),
)
.into_response()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_internal_error_string() {
let response = internal_error("Test error");
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
}

#[test]
fn test_internal_error_format() {
let response = internal_error(format!("Error: {}", 42));
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
}

#[test]
fn test_bad_request() {
let response = bad_request("Invalid input");
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}

#[test]
fn test_not_found() {
let response = not_found("Resource not found");
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}

#[test]
fn test_service_unavailable() {
let response = service_unavailable("No workers");
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
}
}
18 changes: 8 additions & 10 deletions sgl-router/src/routers/grpc/harmony/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
},
routers::grpc::{
context::{DispatchMetadata, ExecutionResult},
utils,
error, utils,
},
};

Expand Down Expand Up @@ -66,7 +66,7 @@ impl HarmonyResponseProcessor {
// Collect all completed responses (one per choice)
let all_responses = Self::collect_responses(execution_result).await?;
if all_responses.is_empty() {
return Err(utils::internal_error_static("No responses from server"));
return Err(error::internal_error("No responses from server"));
}

// Build choices by parsing output with HarmonyParserAdapter
Expand All @@ -84,7 +84,7 @@ impl HarmonyResponseProcessor {

// Parse Harmony channels with HarmonyParserAdapter
let mut parser = HarmonyParserAdapter::new().map_err(|e| {
utils::internal_error_message(format!("Failed to create Harmony parser: {}", e))
error::internal_error(format!("Failed to create Harmony parser: {}", e))
})?;

// Parse Harmony channels with finish_reason and matched_stop
Expand All @@ -94,9 +94,7 @@ impl HarmonyResponseProcessor {
complete.finish_reason.clone(),
matched_stop.clone(),
)
.map_err(|e| {
utils::internal_error_message(format!("Harmony parsing failed: {}", e))
})?;
.map_err(|e| error::internal_error(format!("Harmony parsing failed: {}", e)))?;

// Build response message (assistant)
let message = ChatCompletionMessage {
Expand Down Expand Up @@ -195,17 +193,17 @@ impl HarmonyResponseProcessor {
// Collect all completed responses
let all_responses = Self::collect_responses(execution_result).await?;
if all_responses.is_empty() {
return Err(utils::internal_error_static("No responses from server"));
return Err(error::internal_error("No responses from server"));
}

// For Responses API, we only process the first response (n=1)
let complete = all_responses
.first()
.ok_or_else(|| utils::internal_error_static("No complete response"))?;
.ok_or_else(|| error::internal_error("No complete response"))?;

// Parse Harmony channels
let mut parser = HarmonyParserAdapter::new().map_err(|e| {
utils::internal_error_message(format!("Failed to create Harmony parser: {}", e))
error::internal_error(format!("Failed to create Harmony parser: {}", e))
})?;

// Convert matched_stop from proto to JSON
Expand All @@ -224,7 +222,7 @@ impl HarmonyResponseProcessor {
complete.finish_reason.clone(),
matched_stop,
)
.map_err(|e| utils::internal_error_message(format!("Harmony parsing failed: {}", e)))?;
.map_err(|e| error::internal_error(format!("Harmony parsing failed: {}", e)))?;

// VALIDATION: Check if model incorrectly generated Tool role messages
// This happens when the model copies the format of tool result messages
Expand Down
10 changes: 5 additions & 5 deletions sgl-router/src/routers/grpc/harmony/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ use crate::{
routers::{
grpc::{
context::SharedComponents,
error,
harmony::processor::ResponsesIterationResult,
pipeline::RequestPipeline,
responses::streaming::{OutputItemType, ResponseStreamEventEmitter},
utils,
},
openai::mcp::ensure_request_mcp_client,
},
Expand Down Expand Up @@ -285,7 +285,7 @@ pub async fn serve_harmony_responses(

// Safety check: prevent infinite loops
if iteration_count > MAX_TOOL_ITERATIONS {
return Err(utils::internal_error_message(format!(
return Err(error::internal_error(format!(
"Maximum tool iterations ({}) exceeded",
MAX_TOOL_ITERATIONS
)));
Expand Down Expand Up @@ -333,7 +333,7 @@ pub async fn serve_harmony_responses(
execute_mcp_tools(&ctx.mcp_manager, &tool_calls, tracking).await?
} else {
// Should never happen (we only get tool_calls when has_mcp_tools=true)
return Err(utils::internal_error_static(
return Err(error::internal_error(
"Tool calls found but MCP tracking not initialized",
));
};
Expand Down Expand Up @@ -734,7 +734,7 @@ async fn execute_mcp_tools(
// Parse tool arguments from JSON string
let args_str = tool_call.function.arguments.as_deref().unwrap_or("{}");
let args: JsonValue = serde_json::from_str(args_str).map_err(|e| {
utils::internal_error_message(format!(
error::internal_error(format!(
"Invalid tool arguments JSON for tool '{}': {}",
tool_call.function.name, e
))
Expand Down Expand Up @@ -1111,7 +1111,7 @@ async fn load_previous_messages(
.get_response_chain(&prev_id, None)
.await
.map_err(|e| {
utils::internal_error_message(format!(
error::internal_error(format!(
"Failed to load previous response chain for {}: {}",
prev_id_str, e
))
Expand Down
13 changes: 7 additions & 6 deletions sgl-router/src/routers/grpc/harmony/stages/preparation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
},
routers::grpc::{
context::{PreparationOutput, RequestContext, RequestType},
error,
stages::PipelineStage,
utils,
},
Expand Down Expand Up @@ -56,7 +57,7 @@ impl PipelineStage for HarmonyPreparationStage {
let request_arc = ctx.responses_request_arc();
self.prepare_responses(ctx, &request_arc).await?;
} else {
return Err(utils::bad_request_error(
return Err(error::bad_request(
"Only Chat and Responses requests supported in Harmony pipeline".to_string(),
));
}
Expand All @@ -78,7 +79,7 @@ impl HarmonyPreparationStage {
) -> Result<Option<Response>, Response> {
// Validate - reject logprobs
if request.logprobs {
return Err(utils::bad_request_error(
return Err(error::bad_request(
"logprobs are not supported for Harmony models".to_string(),
));
}
Expand All @@ -97,7 +98,7 @@ impl HarmonyPreparationStage {
let build_output = self
.builder
.build_from_chat(&body_ref)
.map_err(|e| utils::bad_request_error(format!("Harmony build failed: {}", e)))?;
.map_err(|e| error::bad_request(format!("Harmony build failed: {}", e)))?;

// Step 4: Store results
ctx.state.preparation = Some(PreparationOutput {
Expand Down Expand Up @@ -132,7 +133,7 @@ impl HarmonyPreparationStage {
let build_output = self
.builder
.build_from_responses(request)
.map_err(|e| utils::bad_request_error(format!("Harmony build failed: {}", e)))?;
.map_err(|e| error::bad_request(format!("Harmony build failed: {}", e)))?;

// Store results in preparation output
ctx.state.preparation = Some(PreparationOutput {
Expand Down Expand Up @@ -202,7 +203,7 @@ impl HarmonyPreparationStage {

// Validate specific function exists
if specific_function.is_some() && tools_to_use.is_empty() {
return Err(Box::new(utils::bad_request_error(format!(
return Err(Box::new(error::bad_request(format!(
"Tool '{}' not found in tools list",
specific_function.unwrap()
))));
Expand Down Expand Up @@ -236,7 +237,7 @@ impl HarmonyPreparationStage {
});

serde_json::to_string(&structural_tag).map_err(|e| {
Box::new(utils::internal_error_message(format!(
Box::new(error::internal_error(format!(
"Failed to serialize structural tag: {}",
e
)))
Expand Down
16 changes: 6 additions & 10 deletions sgl-router/src/routers/grpc/harmony/stages/request_building.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use crate::{
grpc_client::proto::{DisaggregatedParams, GenerateRequest},
routers::grpc::{
context::{ClientSelection, RequestContext, RequestType, WorkerSelection},
error,
stages::PipelineStage,
utils,
},
};

Expand Down Expand Up @@ -69,14 +69,14 @@ impl PipelineStage for HarmonyRequestBuildingStage {
.state
.preparation
.as_ref()
.ok_or_else(|| utils::internal_error_static("Preparation not completed"))?;
.ok_or_else(|| error::internal_error("Preparation not completed"))?;

// Get clients
let clients = ctx
.state
.clients
.as_ref()
.ok_or_else(|| utils::internal_error_static("Client acquisition not completed"))?;
.ok_or_else(|| error::internal_error("Client acquisition not completed"))?;
let builder_client = match clients {
ClientSelection::Single { client } => client,
ClientSelection::Dual { prefill, .. } => prefill,
Expand All @@ -87,7 +87,7 @@ impl PipelineStage for HarmonyRequestBuildingStage {
RequestType::Chat(_) => format!("chatcmpl-{}", Uuid::new_v4()),
RequestType::Responses(_) => format!("responses-{}", Uuid::new_v4()),
RequestType::Generate(_) => {
return Err(utils::bad_request_error(
return Err(error::bad_request(
"Generate requests are not supported with Harmony models".to_string(),
));
}
Expand All @@ -111,9 +111,7 @@ impl PipelineStage for HarmonyRequestBuildingStage {
None,
prep.tool_constraints.clone(),
)
.map_err(|e| {
utils::bad_request_error(format!("Invalid request parameters: {}", e))
})?
.map_err(|e| error::bad_request(format!("Invalid request parameters: {}", e)))?
}
RequestType::Responses(request) => builder_client
.build_generate_request_from_responses(
Expand All @@ -123,9 +121,7 @@ impl PipelineStage for HarmonyRequestBuildingStage {
prep.token_ids.clone(),
prep.harmony_stop_ids.clone(),
)
.map_err(|e| {
utils::bad_request_error(format!("Invalid request parameters: {}", e))
})?,
.map_err(|e| error::bad_request(format!("Invalid request parameters: {}", e)))?,
_ => unreachable!(),
};

Expand Down
Loading
Loading