Skip to content

Commit bfcf01b

Browse files
CatherineSue1994
authored andcommitted
[grpc] Unify ResponsesContext and HarmonyResponsesContext (sgl-project#16549)
1 parent 7fdbec8 commit bfcf01b

File tree

16 files changed

+68
-227
lines changed

16 files changed

+68
-227
lines changed

sgl-model-gateway/src/routers/grpc/harmony/responses/context.rs renamed to sgl-model-gateway/src/routers/grpc/common/responses/context.rs

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
//! Context for Harmony Responses execution
1+
//! Shared context for /v1/responses endpoint handlers
2+
//!
3+
//! This context is used by both regular and harmony response implementations.
24
35
use std::sync::{Arc, RwLock as StdRwLock};
46

@@ -8,52 +10,52 @@ use crate::{
810
routers::grpc::{context::SharedComponents, pipeline::RequestPipeline},
911
};
1012

11-
/// Context for Harmony Responses execution with MCP tool support
13+
/// Context for /v1/responses endpoint
1214
///
13-
/// Contains all dependencies needed for multi-turn Responses API execution.
14-
/// Cheap to clone (all Arc references).
15+
/// Used by both regular and harmony implementations.
16+
/// All fields are Arc/shared references, so cloning this context is cheap.
1517
#[derive(Clone)]
16-
pub(crate) struct HarmonyResponsesContext {
17-
/// Pipeline for executing Harmony requests
18+
pub(crate) struct ResponsesContext {
19+
/// Chat pipeline for executing requests
1820
pub pipeline: Arc<RequestPipeline>,
1921

2022
/// Shared components (tokenizer, parsers)
2123
pub components: Arc<SharedComponents>,
2224

23-
/// MCP manager for tool execution
24-
pub mcp_manager: Arc<McpManager>,
25-
26-
/// Server keys for MCP tools requested in this context
27-
pub requested_servers: Arc<StdRwLock<Vec<String>>>,
28-
29-
/// Response storage for loading conversation history
25+
/// Response storage backend
3026
pub response_storage: Arc<dyn ResponseStorage>,
3127

32-
/// Conversation storage for persisting conversations
28+
/// Conversation storage backend
3329
pub conversation_storage: Arc<dyn ConversationStorage>,
3430

35-
/// Conversation item storage for persisting conversation items
31+
/// Conversation item storage backend
3632
pub conversation_item_storage: Arc<dyn ConversationItemStorage>,
33+
34+
/// MCP manager for tool support
35+
pub mcp_manager: Arc<McpManager>,
36+
37+
/// Server keys for MCP tools requested in this context
38+
pub requested_servers: Arc<StdRwLock<Vec<String>>>,
3739
}
3840

39-
impl HarmonyResponsesContext {
40-
/// Create a new Harmony Responses context
41+
impl ResponsesContext {
42+
/// Create a new responses context
4143
pub fn new(
4244
pipeline: Arc<RequestPipeline>,
4345
components: Arc<SharedComponents>,
44-
mcp_manager: Arc<McpManager>,
4546
response_storage: Arc<dyn ResponseStorage>,
4647
conversation_storage: Arc<dyn ConversationStorage>,
4748
conversation_item_storage: Arc<dyn ConversationItemStorage>,
49+
mcp_manager: Arc<McpManager>,
4850
) -> Self {
4951
Self {
5052
pipeline,
5153
components,
52-
mcp_manager,
53-
requested_servers: Arc::new(StdRwLock::new(Vec::new())),
5454
response_storage,
5555
conversation_storage,
5656
conversation_item_storage,
57+
mcp_manager,
58+
requested_servers: Arc::new(StdRwLock::new(Vec::new())),
5759
}
5860
}
5961
}

sgl-model-gateway/src/routers/grpc/common/responses/handlers.rs

Lines changed: 10 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,10 @@
22
//!
33
//! These handlers are used by both pipelines for retrieving and cancelling responses.
44
5-
use axum::{
6-
http::StatusCode,
7-
response::{IntoResponse, Response},
8-
};
9-
use serde_json::json;
10-
use tracing::{debug, error, warn};
5+
use axum::response::{IntoResponse, Response};
116

12-
use crate::{
13-
data_connector::ResponseId,
14-
routers::{error, grpc::regular::responses::ResponsesContext},
15-
};
7+
use super::ResponsesContext;
8+
use crate::{data_connector::ResponseId, routers::error};
169

1710
/// Implementation for GET /v1/responses/{response_id}
1811
///
@@ -37,96 +30,33 @@ pub(crate) async fn get_response_impl(ctx: &ResponsesContext, response_id: &str)
3730

3831
/// Implementation for POST /v1/responses/{response_id}/cancel
3932
///
40-
/// Cancels a background response if it's still in progress.
33+
/// Background mode is no longer supported, so this endpoint always returns
34+
/// an error indicating that cancellation is not available.
4135
pub(crate) async fn cancel_response_impl(ctx: &ResponsesContext, response_id: &str) -> Response {
4236
let resp_id = ResponseId::from(response_id);
4337

44-
// Retrieve response from storage to check if it exists and get current status
38+
// Check if response exists
4539
match ctx.response_storage.get_response(&resp_id).await {
4640
Ok(Some(stored_response)) => {
47-
// Check current status - only queued or in_progress responses can be cancelled
4841
let current_status = stored_response
4942
.raw_response
5043
.get("status")
5144
.and_then(|v| v.as_str())
5245
.unwrap_or("unknown");
5346

5447
match current_status {
55-
"queued" | "in_progress" => {
56-
// Attempt to abort the background task
57-
let mut tasks = ctx.background_tasks.write().await;
58-
if let Some(task_info) = tasks.remove(response_id) {
59-
// Abort the Rust task immediately
60-
task_info.handle.abort();
61-
62-
// Abort the Python/scheduler request via gRPC (if client is available)
63-
let client_opt = task_info.client.read().await;
64-
if let Some(ref client) = *client_opt {
65-
if let Err(e) = client
66-
.abort_request(
67-
task_info.grpc_request_id.clone(),
68-
"User cancelled via API".to_string(),
69-
)
70-
.await
71-
{
72-
warn!(
73-
"Failed to abort Python request {}: {}",
74-
task_info.grpc_request_id, e
75-
);
76-
} else {
77-
debug!(
78-
"Successfully aborted Python request: {}",
79-
task_info.grpc_request_id
80-
);
81-
}
82-
} else {
83-
debug!("Client not yet available for abort, request may not have started yet");
84-
}
85-
86-
// Task was found and aborted
87-
(
88-
StatusCode::OK,
89-
axum::Json(json!({
90-
"id": response_id,
91-
"status": "cancelled",
92-
"message": "Background task has been cancelled"
93-
})),
94-
)
95-
.into_response()
96-
} else {
97-
// Task handle not found but status is queued/in_progress
98-
// This can happen if: (1) task crashed, or (2) storage persistence failed
99-
error!(
100-
"Response {} has status '{}' but task handle is missing. Task may have crashed or storage update failed.",
101-
response_id, current_status
102-
);
103-
error::internal_error(
104-
"status_update_failed",
105-
"Internal error: background task completed but failed to update status in storage",
106-
)
107-
}
108-
}
10948
"completed" => error::bad_request(
11049
"response_already_completed",
11150
"Cannot cancel completed response",
11251
),
11352
"failed" => {
11453
error::bad_request("response_already_failed", "Cannot cancel failed response")
11554
}
116-
"cancelled" => (
117-
StatusCode::OK,
118-
axum::Json(json!({
119-
"id": response_id,
120-
"status": "cancelled",
121-
"message": "Response was already cancelled"
122-
})),
123-
)
124-
.into_response(),
12555
_ => {
126-
// Unknown status
127-
error::internal_error(
128-
"unknown_response_status",
129-
format!("Unknown response status: {}", current_status),
56+
// Background mode is no longer supported, so there's nothing to cancel
57+
error::bad_request(
58+
"cancellation_not_supported",
59+
"Background mode is not supported. Synchronous and streaming responses cannot be cancelled.",
13060
)
13161
}
13262
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
//! Shared response functionality used by both regular and harmony implementations
22
3+
pub(crate) mod context;
34
pub(crate) mod handlers;
45
pub(crate) mod streaming;
56
pub(crate) mod utils;
67

78
// Re-export commonly used items
9+
pub(crate) use context::ResponsesContext;
810
pub(crate) use streaming::build_sse_response;
911
pub(crate) use utils::{ensure_mcp_connection, persist_response_if_needed};

sgl-model-gateway/src/routers/grpc/harmony/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ pub(crate) use builder::HarmonyBuilder;
4343
pub(crate) use detector::HarmonyDetector;
4444
pub(crate) use parser::HarmonyParserAdapter;
4545
pub(crate) use processor::{HarmonyResponseProcessor, ResponsesIterationResult};
46-
pub(crate) use responses::{
47-
serve_harmony_responses, serve_harmony_responses_stream, HarmonyResponsesContext,
48-
};
46+
pub(crate) use responses::{serve_harmony_responses, serve_harmony_responses_stream};
4947
pub(crate) use streaming::HarmonyStreamingProcessor;
5048
pub(crate) use types::HarmonyMessage;

sgl-model-gateway/src/routers/grpc/harmony/responses/common.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use serde_json::{from_value, json, to_string, Value};
55
use tracing::{debug, error, warn};
66
use uuid::Uuid;
77

8-
use super::{context::HarmonyResponsesContext, execution::ToolResult};
8+
use super::execution::ToolResult;
99
use crate::{
1010
data_connector::ResponseId,
1111
mcp,
@@ -17,7 +17,7 @@ use crate::{
1717
ResponsesRequest, ResponsesResponse, StringOrContentParts,
1818
},
1919
},
20-
routers::error,
20+
routers::{error, grpc::common::responses::ResponsesContext},
2121
};
2222

2323
/// Record of a single MCP tool call execution
@@ -271,7 +271,7 @@ pub(super) fn inject_mcp_metadata(
271271
/// If the request has `previous_response_id`, loads the response chain from storage
272272
/// and prepends the conversation history to the request input items.
273273
pub(super) async fn load_previous_messages(
274-
ctx: &HarmonyResponsesContext,
274+
ctx: &ResponsesContext,
275275
request: ResponsesRequest,
276276
) -> Result<ResponsesRequest, Response> {
277277
let Some(ref prev_id_str) = request.previous_response_id else {

sgl-model-gateway/src/routers/grpc/harmony/responses/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,17 @@
1313
//!
1414
//! ## Module Structure
1515
//!
16-
//! - `context` - HarmonyResponsesContext
1716
//! - `non_streaming` - Non-streaming entry point and tool loop
1817
//! - `streaming` - Streaming entry point and tool loop
1918
//! - `execution` - MCP tool execution logic
2019
//! - `common` - Shared helpers and state tracking
2120
2221
pub(crate) mod common;
23-
pub(crate) mod context;
2422
pub(crate) mod execution;
2523
pub(crate) mod non_streaming;
2624
pub(crate) mod streaming;
2725

2826
// Re-export types accessed via harmony::responses::TypeName
29-
pub(crate) use context::HarmonyResponsesContext;
3027
pub(crate) use execution::ToolResult;
3128
pub(crate) use non_streaming::serve_harmony_responses;
3229
pub(crate) use streaming::serve_harmony_responses_stream;

sgl-model-gateway/src/routers/grpc/harmony/responses/non_streaming.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use super::{
1414
build_mcp_tool_names_set, build_next_request_with_tools, inject_mcp_metadata,
1515
load_previous_messages, McpCallTracking,
1616
},
17-
context::HarmonyResponsesContext,
1817
execution::{convert_mcp_tools_to_response_tools, execute_mcp_tools, ToolResult},
1918
};
2019
use crate::{
@@ -29,7 +28,9 @@ use crate::{
2928
routers::{
3029
error,
3130
grpc::{
32-
common::responses::{ensure_mcp_connection, persist_response_if_needed},
31+
common::responses::{
32+
ensure_mcp_connection, persist_response_if_needed, ResponsesContext,
33+
},
3334
harmony::processor::ResponsesIterationResult,
3435
},
3536
mcp_utils::{extract_server_label, DEFAULT_MAX_ITERATIONS},
@@ -47,7 +48,7 @@ use crate::{
4748
/// - Repeat from step 1 (full pipeline re-execution)
4849
/// 4. If no tool calls, return final response
4950
pub(crate) async fn serve_harmony_responses(
50-
ctx: &HarmonyResponsesContext,
51+
ctx: &ResponsesContext,
5152
request: ResponsesRequest,
5253
) -> Result<ResponsesResponse, Response> {
5354
// Clone request for persistence
@@ -90,7 +91,7 @@ pub(crate) async fn serve_harmony_responses(
9091
///
9192
/// Automatically executes MCP tools in a loop until no more tool calls or max iterations
9293
async fn execute_with_mcp_loop(
93-
ctx: &HarmonyResponsesContext,
94+
ctx: &ResponsesContext,
9495
mut current_request: ResponsesRequest,
9596
) -> Result<ResponsesResponse, Response> {
9697
let mut iteration_count = 0;
@@ -319,7 +320,7 @@ async fn execute_with_mcp_loop(
319320
///
320321
/// For function tools or no tools - executes pipeline once and returns
321322
async fn execute_without_mcp_loop(
322-
ctx: &HarmonyResponsesContext,
323+
ctx: &ResponsesContext,
323324
current_request: ResponsesRequest,
324325
) -> Result<ResponsesResponse, Response> {
325326
debug!("Executing Harmony Responses without MCP loop");

sgl-model-gateway/src/routers/grpc/harmony/responses/streaming.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use super::{
1414
build_mcp_tool_names_set, build_next_request_with_tools, load_previous_messages,
1515
McpCallTracking,
1616
},
17-
context::HarmonyResponsesContext,
1817
execution::{convert_mcp_tools_to_response_tools, execute_mcp_tools},
1918
};
2019
use crate::{
@@ -25,6 +24,7 @@ use crate::{
2524
common::responses::{
2625
build_sse_response, ensure_mcp_connection, persist_response_if_needed,
2726
streaming::{OutputItemType, ResponseStreamEventEmitter},
27+
ResponsesContext,
2828
},
2929
harmony::{processor::ResponsesIterationResult, streaming::HarmonyStreamingProcessor},
3030
},
@@ -37,7 +37,7 @@ use crate::{
3737
/// This is the streaming equivalent of `serve_harmony_responses()`.
3838
/// Emits SSE events for lifecycle, MCP list_tools, and per-iteration streaming.
3939
pub(crate) async fn serve_harmony_responses_stream(
40-
ctx: &HarmonyResponsesContext,
40+
ctx: &ResponsesContext,
4141
request: ResponsesRequest,
4242
) -> Response {
4343
// Load previous conversation history if previous_response_id is set
@@ -112,7 +112,7 @@ pub(crate) async fn serve_harmony_responses_stream(
112112
/// - Emits final response.completed event
113113
/// - Persists response internally
114114
async fn execute_mcp_tool_loop_streaming(
115-
ctx: &HarmonyResponsesContext,
115+
ctx: &ResponsesContext,
116116
mut current_request: ResponsesRequest,
117117
original_request: &ResponsesRequest,
118118
emitter: &mut ResponseStreamEventEmitter,
@@ -459,7 +459,7 @@ async fn execute_mcp_tool_loop_streaming(
459459
/// For function tools or no tools - executes pipeline once and emits completion.
460460
/// The streaming processor handles all output items (reasoning, message, function tool calls).
461461
async fn execute_without_mcp_streaming(
462-
ctx: &HarmonyResponsesContext,
462+
ctx: &ResponsesContext,
463463
current_request: &ResponsesRequest,
464464
original_request: &ResponsesRequest,
465465
emitter: &mut ResponseStreamEventEmitter,

sgl-model-gateway/src/routers/grpc/pipeline.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use tracing::{debug, error};
1111
// Import embedding-specific and classify-specific stages
1212
use super::regular::stages::classify::ClassifyResponseProcessingStage;
1313
use super::{
14-
common::stages::*,
14+
common::{responses::ResponsesContext, stages::*},
1515
context::*,
1616
harmony,
1717
regular::{
@@ -753,7 +753,7 @@ impl RequestPipeline {
753753
pub async fn execute_harmony_responses(
754754
&self,
755755
request: &crate::protocols::responses::ResponsesRequest,
756-
harmony_ctx: &harmony::responses::HarmonyResponsesContext,
756+
harmony_ctx: &ResponsesContext,
757757
) -> Result<harmony::ResponsesIterationResult, Response> {
758758
// Create RequestContext for this Responses request
759759
let mut ctx = RequestContext::for_responses(
@@ -816,7 +816,7 @@ impl RequestPipeline {
816816
pub async fn execute_harmony_responses_streaming(
817817
&self,
818818
request: &crate::protocols::responses::ResponsesRequest,
819-
harmony_ctx: &harmony::responses::HarmonyResponsesContext,
819+
harmony_ctx: &ResponsesContext,
820820
) -> Result<(ExecutionResult, Option<LoadGuards>), Response> {
821821
// Create RequestContext for this Responses request
822822
let mut ctx = RequestContext::for_responses(

0 commit comments

Comments
 (0)