Skip to content

Commit 034b4e3

Browse files
slin1237key4ng
authored andcommitted
[model-gateway] move all responses api event from oai to proto (sgl-project#14446)
Co-authored-by: key4ng <rukeyang@gmail.com>
1 parent aa12573 commit 034b4e3

File tree

6 files changed

+250
-128
lines changed

6 files changed

+250
-128
lines changed
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
use std::fmt;
2+
3+
/// Response lifecycle events
4+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
5+
pub enum ResponseEvent {
6+
Created,
7+
InProgress,
8+
Completed,
9+
}
10+
11+
impl ResponseEvent {
12+
pub const CREATED: &'static str = "response.created";
13+
pub const IN_PROGRESS: &'static str = "response.in_progress";
14+
pub const COMPLETED: &'static str = "response.completed";
15+
16+
pub const fn as_str(&self) -> &'static str {
17+
match self {
18+
Self::Created => Self::CREATED,
19+
Self::InProgress => Self::IN_PROGRESS,
20+
Self::Completed => Self::COMPLETED,
21+
}
22+
}
23+
}
24+
25+
impl fmt::Display for ResponseEvent {
26+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27+
f.write_str(self.as_str())
28+
}
29+
}
30+
31+
/// Output item events for streaming
32+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33+
pub enum OutputItemEvent {
34+
Added,
35+
Done,
36+
Delta,
37+
}
38+
39+
impl OutputItemEvent {
40+
pub const ADDED: &'static str = "response.output_item.added";
41+
pub const DONE: &'static str = "response.output_item.done";
42+
pub const DELTA: &'static str = "response.output_item.delta";
43+
44+
pub const fn as_str(&self) -> &'static str {
45+
match self {
46+
Self::Added => Self::ADDED,
47+
Self::Done => Self::DONE,
48+
Self::Delta => Self::DELTA,
49+
}
50+
}
51+
}
52+
53+
impl fmt::Display for OutputItemEvent {
54+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55+
f.write_str(self.as_str())
56+
}
57+
}
58+
59+
/// Function call argument streaming events
60+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
61+
pub enum FunctionCallEvent {
62+
ArgumentsDelta,
63+
ArgumentsDone,
64+
}
65+
66+
impl FunctionCallEvent {
67+
pub const ARGUMENTS_DELTA: &'static str = "response.function_call_arguments.delta";
68+
pub const ARGUMENTS_DONE: &'static str = "response.function_call_arguments.done";
69+
70+
pub const fn as_str(&self) -> &'static str {
71+
match self {
72+
Self::ArgumentsDelta => Self::ARGUMENTS_DELTA,
73+
Self::ArgumentsDone => Self::ARGUMENTS_DONE,
74+
}
75+
}
76+
}
77+
78+
impl fmt::Display for FunctionCallEvent {
79+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80+
f.write_str(self.as_str())
81+
}
82+
}
83+
84+
// ============================================================================
85+
// MCP Events
86+
// ============================================================================
87+
88+
/// MCP (Model Context Protocol) call events
89+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
90+
pub enum McpEvent {
91+
CallArgumentsDelta,
92+
CallArgumentsDone,
93+
CallInProgress,
94+
CallCompleted,
95+
ListToolsInProgress,
96+
ListToolsCompleted,
97+
}
98+
99+
impl McpEvent {
100+
pub const CALL_ARGUMENTS_DELTA: &'static str = "response.mcp_call_arguments.delta";
101+
pub const CALL_ARGUMENTS_DONE: &'static str = "response.mcp_call_arguments.done";
102+
pub const CALL_IN_PROGRESS: &'static str = "response.mcp_call.in_progress";
103+
pub const CALL_COMPLETED: &'static str = "response.mcp_call.completed";
104+
pub const LIST_TOOLS_IN_PROGRESS: &'static str = "response.mcp_list_tools.in_progress";
105+
pub const LIST_TOOLS_COMPLETED: &'static str = "response.mcp_list_tools.completed";
106+
107+
pub const fn as_str(&self) -> &'static str {
108+
match self {
109+
Self::CallArgumentsDelta => Self::CALL_ARGUMENTS_DELTA,
110+
Self::CallArgumentsDone => Self::CALL_ARGUMENTS_DONE,
111+
Self::CallInProgress => Self::CALL_IN_PROGRESS,
112+
Self::CallCompleted => Self::CALL_COMPLETED,
113+
Self::ListToolsInProgress => Self::LIST_TOOLS_IN_PROGRESS,
114+
Self::ListToolsCompleted => Self::LIST_TOOLS_COMPLETED,
115+
}
116+
}
117+
}
118+
119+
impl fmt::Display for McpEvent {
120+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121+
f.write_str(self.as_str())
122+
}
123+
}
124+
125+
/// Item type discriminators used in output items
126+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
127+
pub enum ItemType {
128+
FunctionCall,
129+
FunctionToolCall,
130+
McpCall,
131+
Function,
132+
McpListTools,
133+
}
134+
135+
impl ItemType {
136+
pub const FUNCTION_CALL: &'static str = "function_call";
137+
pub const FUNCTION_TOOL_CALL: &'static str = "function_tool_call";
138+
pub const MCP_CALL: &'static str = "mcp_call";
139+
pub const FUNCTION: &'static str = "function";
140+
pub const MCP_LIST_TOOLS: &'static str = "mcp_list_tools";
141+
142+
pub const fn as_str(&self) -> &'static str {
143+
match self {
144+
Self::FunctionCall => Self::FUNCTION_CALL,
145+
Self::FunctionToolCall => Self::FUNCTION_TOOL_CALL,
146+
Self::McpCall => Self::MCP_CALL,
147+
Self::Function => Self::FUNCTION,
148+
Self::McpListTools => Self::MCP_LIST_TOOLS,
149+
}
150+
}
151+
152+
/// Check if this is a function call variant (FunctionCall or FunctionToolCall)
153+
pub const fn is_function_call(&self) -> bool {
154+
matches!(self, Self::FunctionCall | Self::FunctionToolCall)
155+
}
156+
}
157+
158+
impl fmt::Display for ItemType {
159+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160+
f.write_str(self.as_str())
161+
}
162+
}
163+
164+
/// Check if an event type string matches any response lifecycle event
165+
pub fn is_response_event(event_type: &str) -> bool {
166+
matches!(
167+
event_type,
168+
ResponseEvent::CREATED | ResponseEvent::IN_PROGRESS | ResponseEvent::COMPLETED
169+
)
170+
}
171+
172+
/// Check if an item type string is a function call variant
173+
pub fn is_function_call_type(item_type: &str) -> bool {
174+
item_type == ItemType::FUNCTION_CALL || item_type == ItemType::FUNCTION_TOOL_CALL
175+
}

sgl-router/src/protocols/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod classify;
77
pub mod common;
88
pub mod completion;
99
pub mod embedding;
10+
pub mod event_types;
1011
pub mod generate;
1112
pub mod rerank;
1213
pub mod responses;

sgl-router/src/routers/openai/mcp.rs

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ use serde_json::{json, to_value, Value};
1616
use tokio::sync::mpsc;
1717
use tracing::{debug, info, warn};
1818

19-
use super::utils::event_types;
2019
use crate::{
2120
mcp,
22-
protocols::responses::{
23-
generate_id, ResponseInput, ResponseTool, ResponseToolType, ResponsesRequest,
21+
protocols::{
22+
event_types::{is_function_call_type, ItemType, McpEvent, OutputItemEvent},
23+
responses::{generate_id, ResponseInput, ResponseTool, ResponseToolType, ResponsesRequest},
2424
},
2525
routers::header_utils::apply_request_headers,
2626
};
@@ -76,7 +76,7 @@ impl ToolLoopState {
7676
) {
7777
// Add function_call item to history
7878
let func_item = json!({
79-
"type": event_types::ITEM_TYPE_FUNCTION_CALL,
79+
"type": ItemType::FUNCTION_CALL,
8080
"call_id": call_id,
8181
"name": tool_name,
8282
"arguments": args_json_str
@@ -285,7 +285,7 @@ pub(super) fn prepare_mcp_payload_for_streaming(
285285
arr.retain(|item| {
286286
item.get("type")
287287
.and_then(|v| v.as_str())
288-
.map(|s| s == event_types::ITEM_TYPE_FUNCTION)
288+
.map(|s| s == ItemType::FUNCTION)
289289
.unwrap_or(false)
290290
});
291291
}
@@ -297,7 +297,7 @@ pub(super) fn prepare_mcp_payload_for_streaming(
297297
for t in tools {
298298
let parameters = Value::Object((*t.input_schema).clone());
299299
let tool = serde_json::json!({
300-
"type": event_types::ITEM_TYPE_FUNCTION,
300+
"type": ItemType::FUNCTION,
301301
"name": t.name,
302302
"description": t.description,
303303
"parameters": parameters
@@ -399,15 +399,15 @@ pub(super) fn send_mcp_list_tools_events(
399399

400400
// Event 1: response.output_item.added with empty tools
401401
let event1_payload = json!({
402-
"type": event_types::OUTPUT_ITEM_ADDED,
402+
"type": OutputItemEvent::ADDED,
403403
"sequence_number": *sequence_number,
404404
"output_index": output_index,
405405
"item": tools_item_empty
406406
});
407407
*sequence_number += 1;
408408
let event1 = format!(
409409
"event: {}\ndata: {}\n\n",
410-
event_types::OUTPUT_ITEM_ADDED,
410+
OutputItemEvent::ADDED,
411411
event1_payload
412412
);
413413
if tx.send(Ok(Bytes::from(event1))).is_err() {
@@ -416,15 +416,15 @@ pub(super) fn send_mcp_list_tools_events(
416416

417417
// Event 2: response.mcp_list_tools.in_progress
418418
let event2_payload = json!({
419-
"type": event_types::MCP_LIST_TOOLS_IN_PROGRESS,
419+
"type": McpEvent::LIST_TOOLS_IN_PROGRESS,
420420
"sequence_number": *sequence_number,
421421
"output_index": output_index,
422422
"item_id": item_id
423423
});
424424
*sequence_number += 1;
425425
let event2 = format!(
426426
"event: {}\ndata: {}\n\n",
427-
event_types::MCP_LIST_TOOLS_IN_PROGRESS,
427+
McpEvent::LIST_TOOLS_IN_PROGRESS,
428428
event2_payload
429429
);
430430
if tx.send(Ok(Bytes::from(event2))).is_err() {
@@ -433,15 +433,15 @@ pub(super) fn send_mcp_list_tools_events(
433433

434434
// Event 3: response.mcp_list_tools.completed
435435
let event3_payload = json!({
436-
"type": event_types::MCP_LIST_TOOLS_COMPLETED,
436+
"type": McpEvent::LIST_TOOLS_COMPLETED,
437437
"sequence_number": *sequence_number,
438438
"output_index": output_index,
439439
"item_id": item_id
440440
});
441441
*sequence_number += 1;
442442
let event3 = format!(
443443
"event: {}\ndata: {}\n\n",
444-
event_types::MCP_LIST_TOOLS_COMPLETED,
444+
McpEvent::LIST_TOOLS_COMPLETED,
445445
event3_payload
446446
);
447447
if tx.send(Ok(Bytes::from(event3))).is_err() {
@@ -450,15 +450,15 @@ pub(super) fn send_mcp_list_tools_events(
450450

451451
// Event 4: response.output_item.done with full tools list
452452
let event4_payload = json!({
453-
"type": event_types::OUTPUT_ITEM_DONE,
453+
"type": OutputItemEvent::DONE,
454454
"sequence_number": *sequence_number,
455455
"output_index": output_index,
456456
"item": tools_item_full
457457
});
458458
*sequence_number += 1;
459459
let event4 = format!(
460460
"event: {}\ndata: {}\n\n",
461-
event_types::OUTPUT_ITEM_DONE,
461+
OutputItemEvent::DONE,
462462
event4_payload
463463
);
464464
tx.send(Ok(Bytes::from(event4))).is_ok()
@@ -495,7 +495,7 @@ pub(super) fn send_mcp_call_completion_events_with_error(
495495

496496
// Event 1: response.mcp_call.completed
497497
let completed_payload = json!({
498-
"type": event_types::MCP_CALL_COMPLETED,
498+
"type": McpEvent::CALL_COMPLETED,
499499
"sequence_number": *sequence_number,
500500
"output_index": effective_output_index,
501501
"item_id": item_id
@@ -504,7 +504,7 @@ pub(super) fn send_mcp_call_completion_events_with_error(
504504

505505
let completed_event = format!(
506506
"event: {}\ndata: {}\n\n",
507-
event_types::MCP_CALL_COMPLETED,
507+
McpEvent::CALL_COMPLETED,
508508
completed_payload
509509
);
510510
if tx.send(Ok(Bytes::from(completed_event))).is_err() {
@@ -513,7 +513,7 @@ pub(super) fn send_mcp_call_completion_events_with_error(
513513

514514
// Event 2: response.output_item.done (with completed mcp_call)
515515
let done_payload = json!({
516-
"type": event_types::OUTPUT_ITEM_DONE,
516+
"type": OutputItemEvent::DONE,
517517
"sequence_number": *sequence_number,
518518
"output_index": effective_output_index,
519519
"item": mcp_call_item
@@ -522,7 +522,7 @@ pub(super) fn send_mcp_call_completion_events_with_error(
522522

523523
let done_event = format!(
524524
"event: {}\ndata: {}\n\n",
525-
event_types::OUTPUT_ITEM_DONE,
525+
OutputItemEvent::DONE,
526526
done_payload
527527
);
528528
tx.send(Ok(Bytes::from(done_event))).is_ok()
@@ -541,7 +541,7 @@ pub(super) fn inject_mcp_metadata_streaming(
541541
) {
542542
if let Some(output_array) = response.get_mut("output").and_then(|v| v.as_array_mut()) {
543543
output_array.retain(|item| {
544-
item.get("type").and_then(|t| t.as_str()) != Some(event_types::ITEM_TYPE_MCP_LIST_TOOLS)
544+
item.get("type").and_then(|t| t.as_str()) != Some(ItemType::MCP_LIST_TOOLS)
545545
});
546546

547547
let list_tools_item = build_mcp_list_tools_item(mcp, server_label);
@@ -782,9 +782,7 @@ pub(super) fn build_incomplete_response(
782782
let mut mcp_call_items = Vec::new();
783783
for item in output_array.iter() {
784784
let item_type = item.get("type").and_then(|t| t.as_str());
785-
if item_type == Some(event_types::ITEM_TYPE_FUNCTION_TOOL_CALL)
786-
|| item_type == Some(event_types::ITEM_TYPE_FUNCTION_CALL)
787-
{
785+
if item_type.is_some_and(is_function_call_type) {
788786
let tool_name = item.get("name").and_then(|v| v.as_str()).unwrap_or("");
789787
let args = item
790788
.get("arguments")
@@ -870,7 +868,7 @@ pub(super) fn build_mcp_list_tools_item(mcp: &Arc<mcp::McpManager>, server_label
870868

871869
json!({
872870
"id": generate_id("mcpl"),
873-
"type": event_types::ITEM_TYPE_MCP_LIST_TOOLS,
871+
"type": ItemType::MCP_LIST_TOOLS,
874872
"server_label": server_label,
875873
"tools": tools_json
876874
})
@@ -887,7 +885,7 @@ pub(super) fn build_mcp_call_item(
887885
) -> Value {
888886
json!({
889887
"id": generate_id("mcp"),
890-
"type": event_types::ITEM_TYPE_MCP_CALL,
888+
"type": ItemType::MCP_CALL,
891889
"status": if success { "completed" } else { "failed" },
892890
"approval_request_id": Value::Null,
893891
"arguments": arguments,
@@ -906,7 +904,7 @@ pub(super) fn build_executed_mcp_call_items(
906904
let mut mcp_call_items = Vec::new();
907905

908906
for item in conversation_history {
909-
if item.get("type").and_then(|t| t.as_str()) == Some(event_types::ITEM_TYPE_FUNCTION_CALL) {
907+
if item.get("type").and_then(|t| t.as_str()) == Some(ItemType::FUNCTION_CALL) {
910908
let call_id = item.get("call_id").and_then(|v| v.as_str()).unwrap_or("");
911909
let tool_name = item.get("name").and_then(|v| v.as_str()).unwrap_or("");
912910
let args = item
@@ -958,9 +956,7 @@ pub(super) fn extract_function_call(resp: &Value) -> Option<(String, String, Str
958956
for item in output {
959957
let obj = item.as_object()?;
960958
let t = obj.get("type")?.as_str()?;
961-
if t == event_types::ITEM_TYPE_FUNCTION_TOOL_CALL
962-
|| t == event_types::ITEM_TYPE_FUNCTION_CALL
963-
{
959+
if is_function_call_type(t) {
964960
let call_id = obj
965961
.get("call_id")
966962
.and_then(|v| v.as_str())

0 commit comments

Comments
 (0)