diff --git a/crates/rmcp/Cargo.toml b/crates/rmcp/Cargo.toml index ea2a3f264..96c319dc4 100644 --- a/crates/rmcp/Cargo.toml +++ b/crates/rmcp/Cargo.toml @@ -210,6 +210,11 @@ name = "test_streamable_http_priming" required-features = ["server", "client", "transport-streamable-http-server", "reqwest"] path = "tests/test_streamable_http_priming.rs" +[[test]] +name = "test_streamable_http_json_response" +required-features = ["server", "client", "transport-streamable-http-server", "reqwest"] +path = "tests/test_streamable_http_json_response.rs" + [[test]] name = "test_custom_request" diff --git a/crates/rmcp/src/transport/streamable_http_server/tower.rs b/crates/rmcp/src/transport/streamable_http_server/tower.rs index 0fbe98769..f6cffb0bb 100644 --- a/crates/rmcp/src/transport/streamable_http_server/tower.rs +++ b/crates/rmcp/src/transport/streamable_http_server/tower.rs @@ -38,6 +38,11 @@ pub struct StreamableHttpServerConfig { /// If true, the server will create a session for each request and keep it alive. /// When enabled, SSE priming events are sent to enable client reconnection. pub stateful_mode: bool, + /// When true and `stateful_mode` is false, the server returns + /// `Content-Type: application/json` directly instead of `text/event-stream`. + /// This eliminates SSE framing overhead for simple request-response tools, + /// allowed by the MCP Streamable HTTP spec (2025-06-18). + pub json_response: bool, /// Cancellation token for the Streamable HTTP server. /// /// When this token is cancelled, all active sessions are terminated and @@ -51,6 +56,7 @@ impl Default for StreamableHttpServerConfig { sse_keep_alive: Some(Duration::from_secs(15)), sse_retry: Some(Duration::from_secs(3)), stateful_mode: true, + json_response: false, cancellation_token: CancellationToken::new(), } } @@ -585,27 +591,56 @@ where match message { ClientJsonRpcMessage::Request(mut request) => { request.request.extensions_mut().insert(part); - let (transport, receiver) = + let (transport, mut receiver) = OneshotTransport::::new(ClientJsonRpcMessage::Request(request)); let service = serve_directly(service, transport, None); tokio::spawn(async move { // on service created let _ = service.waiting().await; }); - // Stateless mode: no priming (no session to resume) - let stream = ReceiverStream::new(receiver).map(|message| { - tracing::info!(?message); - ServerSseMessage { - event_id: None, - message: Some(Arc::new(message)), - retry: None, + if self.config.json_response { + // JSON-direct mode: await the single response and return as + // application/json, eliminating SSE framing overhead. + // Allowed by MCP Streamable HTTP spec (2025-06-18). + let cancel = self.config.cancellation_token.child_token(); + match tokio::select! { + res = receiver.recv() => res, + _ = cancel.cancelled() => None, + } { + Some(message) => { + tracing::info!(?message); + let body = serde_json::to_vec(&message).map_err(|e| { + internal_error_response("serialize json response")(e) + })?; + Ok(Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_TYPE, JSON_MIME_TYPE) + .body(Full::new(Bytes::from(body)).boxed()) + .expect("valid response")) + } + None => Err(internal_error_response("empty response")( + std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "no response message received from handler", + ), + )), } - }); - Ok(sse_stream_response( - stream, - self.config.sse_keep_alive, - self.config.cancellation_token.child_token(), - )) + } else { + // SSE mode (default): original behaviour preserved unchanged + let stream = ReceiverStream::new(receiver).map(|message| { + tracing::info!(?message); + ServerSseMessage { + event_id: None, + message: Some(Arc::new(message)), + retry: None, + } + }); + Ok(sse_stream_response( + stream, + self.config.sse_keep_alive, + self.config.cancellation_token.child_token(), + )) + } } ClientJsonRpcMessage::Notification(_notification) => { // ignore diff --git a/crates/rmcp/tests/test_sse_concurrent_streams.rs b/crates/rmcp/tests/test_sse_concurrent_streams.rs index 9a7204ab9..b54ed5562 100644 --- a/crates/rmcp/tests/test_sse_concurrent_streams.rs +++ b/crates/rmcp/tests/test_sse_concurrent_streams.rs @@ -84,6 +84,7 @@ async fn start_test_server(ct: CancellationToken, trigger: Arc) -> Strin sse_keep_alive: Some(Duration::from_secs(15)), sse_retry: Some(Duration::from_secs(3)), cancellation_token: ct.child_token(), + ..Default::default() }, ); diff --git a/crates/rmcp/tests/test_streamable_http_json_response.rs b/crates/rmcp/tests/test_streamable_http_json_response.rs new file mode 100644 index 000000000..e5b3323a9 --- /dev/null +++ b/crates/rmcp/tests/test_streamable_http_json_response.rs @@ -0,0 +1,155 @@ +use rmcp::transport::streamable_http_server::{ + StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager, +}; +use tokio_util::sync::CancellationToken; + +mod common; +use common::calculator::Calculator; + +const INIT_BODY: &str = r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#; + +async fn spawn_server( + config: StreamableHttpServerConfig, +) -> (reqwest::Client, String, CancellationToken) { + let ct = config.cancellation_token.clone(); + let service: StreamableHttpService = + StreamableHttpService::new(|| Ok(Calculator::new()), Default::default(), config); + + let router = axum::Router::new().nest_service("/mcp", service); + let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = tcp_listener.local_addr().unwrap(); + + tokio::spawn({ + let ct = ct.clone(); + async move { + let _ = axum::serve(tcp_listener, router) + .with_graceful_shutdown(async move { ct.cancelled_owned().await }) + .await; + } + }); + + let client = reqwest::Client::new(); + let base_url = format!("http://{addr}/mcp"); + (client, base_url, ct) +} + +#[tokio::test] +async fn stateless_json_response_returns_application_json() -> anyhow::Result<()> { + let ct = CancellationToken::new(); + let (client, url, ct) = spawn_server(StreamableHttpServerConfig { + stateful_mode: false, + json_response: true, + sse_keep_alive: None, + cancellation_token: ct.child_token(), + ..Default::default() + }) + .await; + + let response = client + .post(&url) + .header("Content-Type", "application/json") + .header("Accept", "application/json, text/event-stream") + .body(INIT_BODY) + .send() + .await?; + + assert_eq!(response.status(), 200); + + let content_type = response + .headers() + .get("content-type") + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + assert!( + content_type.contains("application/json"), + "Expected application/json, got: {content_type}" + ); + + let body = response.text().await?; + let parsed: serde_json::Value = serde_json::from_str(&body)?; + assert_eq!(parsed["jsonrpc"], "2.0"); + assert_eq!(parsed["id"], 1); + assert!(parsed["result"].is_object(), "Expected result object"); + + ct.cancel(); + Ok(()) +} + +#[tokio::test] +async fn stateless_sse_mode_default_unchanged() -> anyhow::Result<()> { + let ct = CancellationToken::new(); + let (client, url, ct) = spawn_server(StreamableHttpServerConfig { + stateful_mode: false, + json_response: false, + sse_keep_alive: None, + cancellation_token: ct.child_token(), + ..Default::default() + }) + .await; + + let response = client + .post(&url) + .header("Content-Type", "application/json") + .header("Accept", "application/json, text/event-stream") + .body(INIT_BODY) + .send() + .await?; + + assert_eq!(response.status(), 200); + + let content_type = response + .headers() + .get("content-type") + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + assert!( + content_type.contains("text/event-stream"), + "Expected text/event-stream, got: {content_type}" + ); + + let body = response.text().await?; + assert!( + body.contains("data:"), + "Expected SSE framing (data: prefix), got: {body}" + ); + + ct.cancel(); + Ok(()) +} + +#[tokio::test] +async fn json_response_ignored_in_stateful_mode() -> anyhow::Result<()> { + let ct = CancellationToken::new(); + // json_response: true has no effect when stateful_mode: true — server still uses SSE + let (client, url, ct) = spawn_server(StreamableHttpServerConfig { + stateful_mode: true, + json_response: true, + sse_keep_alive: None, + cancellation_token: ct.child_token(), + ..Default::default() + }) + .await; + + let response = client + .post(&url) + .header("Content-Type", "application/json") + .header("Accept", "application/json, text/event-stream") + .body(INIT_BODY) + .send() + .await?; + + assert_eq!(response.status(), 200); + + let content_type = response + .headers() + .get("content-type") + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + assert!( + content_type.contains("text/event-stream"), + "Stateful mode should always use SSE regardless of json_response, got: {content_type}" + ); + + ct.cancel(); + Ok(()) +}