Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 25 additions & 50 deletions sgl-router/src/routers/grpc/pd_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,7 @@ impl GrpcPDRouter {
let (original_text, token_ids) = match self.resolve_generate_input(body) {
Ok(res) => res,
Err(msg) => {
error!("Invalid generate request: {}", msg);
return (StatusCode::BAD_REQUEST, msg).into_response();
return utils::bad_request_error(msg);
}
};

Expand All @@ -208,8 +207,7 @@ impl GrpcPDRouter {
{
Ok(pair) => pair,
Err(e) => {
warn!("Failed to select PD worker pair: {}", e);
return (StatusCode::SERVICE_UNAVAILABLE, e).into_response();
return utils::service_unavailable_error(e);
}
};

Expand Down Expand Up @@ -244,15 +242,13 @@ impl GrpcPDRouter {
) {
Ok(req) => req,
Err(e) => {
error!("Failed to build generate request: {}", e);
return (StatusCode::BAD_REQUEST, e).into_response();
return utils::bad_request_error(e);
}
};

// Step 5: Inject bootstrap metadata
if let Err(e) = Self::inject_bootstrap_metadata(&mut request, &*prefill_worker) {
error!("Failed to inject bootstrap metadata: {}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, e).into_response();
return utils::internal_error_message(e);
}

// Step 6: Get weight version for response metadata
Expand Down Expand Up @@ -334,21 +330,15 @@ impl GrpcPDRouter {
let processed_messages = match utils::process_chat_messages(&body_ref, &*self.tokenizer) {
Ok(msgs) => msgs,
Err(e) => {
error!("Failed to process chat messages: {}", e);
return (StatusCode::BAD_REQUEST, e.to_string()).into_response();
return utils::bad_request_error(e.to_string());
}
};

// Step 3: Tokenize the processed text
let encoding = match self.tokenizer.encode(&processed_messages.text) {
Ok(encoding) => encoding,
Err(e) => {
error!("Tokenization failed: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Tokenization failed: {}", e),
)
.into_response();
return utils::internal_error_message(format!("Tokenization failed: {}", e));
}
};

Expand All @@ -368,8 +358,7 @@ impl GrpcPDRouter {
{
Ok(pair) => pair,
Err(e) => {
warn!("Failed to select PD worker pair: {}", e);
return (StatusCode::SERVICE_UNAVAILABLE, e).into_response();
return utils::service_unavailable_error(e);
}
};

Expand Down Expand Up @@ -402,19 +391,13 @@ impl GrpcPDRouter {
) {
Ok(request) => request,
Err(e) => {
error!("Failed to build gRPC request: {}", e);
return (
StatusCode::BAD_REQUEST,
format!("Invalid request parameters: {}", e),
)
.into_response();
return utils::bad_request_error(format!("Invalid request parameters: {}", e));
}
};

// Step 8: Inject bootstrap metadata into the request
if let Err(e) = Self::inject_bootstrap_metadata(&mut request, &*prefill_worker) {
error!("Failed to inject bootstrap metadata: {}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, e).into_response();
return utils::internal_error_message(e);
}

// Step 9: Handle streaming vs non-streaming
Expand Down Expand Up @@ -486,25 +469,21 @@ impl GrpcPDRouter {
let prefill_stream = match prefill_result {
Ok(s) => s,
Err(e) => {
error!("Failed to start prefill generation: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Prefill worker failed to start: {}", e),
)
.into_response();
return utils::internal_error_message(format!(
"Prefill worker failed to start: {}",
e
));
}
};

// Get decode stream - this is what we'll process for output
let decode_stream = match decode_result {
Ok(s) => s,
Err(e) => {
error!("Failed to start decode generation: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Decode worker failed to start: {}", e),
)
.into_response();
return utils::internal_error_message(format!(
"Decode worker failed to start: {}",
e
));
}
};

Expand Down Expand Up @@ -592,25 +571,21 @@ impl GrpcPDRouter {
let prefill_stream = match prefill_result {
Ok(s) => s,
Err(e) => {
error!("Failed to start prefill generation: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Prefill worker failed to start: {}", e),
)
.into_response();
return utils::internal_error_message(format!(
"Prefill worker failed to start: {}",
e
));
}
};

// Get decode stream - this is what we'll process for output
let decode_stream = match decode_result {
Ok(s) => s,
Err(e) => {
error!("Failed to start decode generation: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Decode worker failed to start: {}", e),
)
.into_response();
return utils::internal_error_message(format!(
"Decode worker failed to start: {}",
e
));
}
};

Expand Down
69 changes: 24 additions & 45 deletions sgl-router/src/routers/grpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,15 @@ impl GrpcRouter {
let processed_messages = match utils::process_chat_messages(&body_ref, &*self.tokenizer) {
Ok(msgs) => msgs,
Err(e) => {
error!("Failed to process chat messages: {}", e);
return (StatusCode::BAD_REQUEST, e.to_string()).into_response();
return utils::bad_request_error(e.to_string());
}
};

// Step 3: Tokenize the processed text
let encoding = match self.tokenizer.encode(&processed_messages.text) {
Ok(encoding) => encoding,
Err(e) => {
error!("Tokenization failed: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Tokenization failed: {}", e),
)
.into_response();
return utils::internal_error_message(format!("Tokenization failed: {}", e));
}
};

Expand All @@ -145,8 +139,10 @@ impl GrpcRouter {
{
Some(w) => w,
None => {
warn!("No available workers for model: {:?}", model_id);
return (StatusCode::SERVICE_UNAVAILABLE, "No available workers").into_response();
return utils::service_unavailable_error(format!(
"No available workers for model: {:?}",
model_id
));
}
};

Expand All @@ -170,12 +166,7 @@ impl GrpcRouter {
) {
Ok(request) => request,
Err(e) => {
error!("Failed to build gRPC request: {}", e);
return (
StatusCode::BAD_REQUEST,
format!("Invalid request parameters: {}", e),
)
.into_response();
return utils::bad_request_error(format!("Invalid request parameters: {}", e));
}
};

Expand All @@ -200,8 +191,7 @@ impl GrpcRouter {
let (original_text, token_ids) = match self.resolve_generate_input(body) {
Ok(res) => res,
Err(msg) => {
error!("Invalid generate request: {}", msg);
return (StatusCode::BAD_REQUEST, msg).into_response();
return utils::bad_request_error(msg);
}
};

Expand All @@ -211,8 +201,10 @@ impl GrpcRouter {
let worker = match self.select_worker_for_request(model_id, original_text.as_deref()) {
Some(w) => w,
None => {
warn!("No available workers for model: {:?}", model_id);
return (StatusCode::SERVICE_UNAVAILABLE, "No available workers").into_response();
return utils::service_unavailable_error(format!(
"No available workers for model: {:?}",
model_id
));
}
};

Expand All @@ -238,8 +230,7 @@ impl GrpcRouter {
) {
Ok(req) => req,
Err(e) => {
error!("Failed to build generate request: {}", e);
return (StatusCode::BAD_REQUEST, e).into_response();
return utils::bad_request_error(e);
}
};

Expand Down Expand Up @@ -405,16 +396,6 @@ impl GrpcRouter {
Ok((text.to_string(), encoding.token_ids().to_vec()))
}

fn internal_error_static(msg: &'static str) -> Response {
error!("{}", msg);
(StatusCode::INTERNAL_SERVER_ERROR, msg).into_response()
}

fn internal_error_message(message: String) -> Response {
error!("{}", message);
(StatusCode::INTERNAL_SERVER_ERROR, message).into_response()
}

/// Count the number of tool calls in the request message history
/// This is used for KimiK2 format which needs globally unique indices
fn get_history_tool_calls_count(request: &ChatCompletionRequest) -> usize {
Expand Down Expand Up @@ -740,12 +721,7 @@ impl GrpcRouter {
let mut grpc_stream = match client.generate(request).await {
Ok(stream) => stream,
Err(e) => {
error!("Failed to start generation: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Generation failed: {}", e),
)
.into_response();
return utils::internal_error_message(format!("Generation failed: {}", e));
}
};

Expand Down Expand Up @@ -1183,7 +1159,7 @@ impl GrpcRouter {
let stream = match client.generate(request).await {
Ok(s) => s,
Err(e) => {
return Self::internal_error_message(format!("Failed to start generation: {}", e))
return utils::internal_error_message(format!("Failed to start generation: {}", e))
}
};

Expand All @@ -1193,7 +1169,7 @@ impl GrpcRouter {
};

if all_responses.is_empty() {
return Self::internal_error_static("No responses from server");
return utils::internal_error_static("No responses from server");
}

// Process each response into a ChatChoice
Expand All @@ -1212,7 +1188,7 @@ impl GrpcRouter {
{
Ok(choice) => choices.push(choice),
Err(e) => {
return Self::internal_error_message(format!(
return utils::internal_error_message(format!(
"Failed to process choice {}: {}",
index, e
));
Expand Down Expand Up @@ -1265,7 +1241,7 @@ impl GrpcRouter {
let stream = match client.generate(request).await {
Ok(stream) => stream,
Err(e) => {
return Self::internal_error_message(format!("Failed to start generation: {}", e))
return utils::internal_error_message(format!("Failed to start generation: {}", e))
}
};

Expand All @@ -1276,7 +1252,7 @@ impl GrpcRouter {
};

if responses.is_empty() {
return Self::internal_error_static("No completion received from scheduler");
return utils::internal_error_static("No completion received from scheduler");
}

// Create stop decoder from sampling params
Expand All @@ -1298,7 +1274,10 @@ impl GrpcRouter {
let outputs = match stop_decoder.process_tokens(&complete.output_ids) {
Ok(outputs) => outputs,
Err(e) => {
return Self::internal_error_message(format!("Failed to process tokens: {}", e))
return utils::internal_error_message(format!(
"Failed to process tokens: {}",
e
))
}
};

Expand Down Expand Up @@ -1377,7 +1356,7 @@ impl GrpcRouter {
let stream = match client.generate(request).await {
Ok(stream) => stream,
Err(e) => {
return Self::internal_error_message(format!("Failed to start generation: {}", e))
return utils::internal_error_message(format!("Failed to start generation: {}", e))
}
};

Expand Down
Loading
Loading