Skip to content

Commit 7c6544f

Browse files
aibrahim-oaicodex
andcommitted
Fail exec client operations after disconnect
Reject new exec-server operations once the transport disconnects and convert pending RPC calls into closed errors. This lets remote MCP stdio calls surface executor loss immediately instead of waiting for the MCP tool timeout. Co-authored-by: Codex <noreply@openai.com>
1 parent 5615470 commit 7c6544f

File tree

3 files changed

+193
-94
lines changed

3 files changed

+193
-94
lines changed

codex-rs/exec-server/src/client.rs

Lines changed: 121 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@ struct Inner {
140140
// need serialization so concurrent register/remove operations do not
141141
// overwrite each other's copy-on-write updates.
142142
sessions_write_lock: Mutex<()>,
143+
// Once the transport closes, every executor operation should fail quickly
144+
// with the same message. This process/filesystem-level latch prevents
145+
// callers from waiting on request-specific timeouts after the environment
146+
// is gone.
147+
disconnected: std::sync::RwLock<Option<String>>,
143148
session_id: std::sync::RwLock<Option<String>>,
144149
reader_task: tokio::task::JoinHandle<()>,
145150
}
@@ -171,6 +176,8 @@ pub enum ExecServerError {
171176
InitializeTimedOut { timeout: Duration },
172177
#[error("exec-server transport closed")]
173178
Closed,
179+
#[error("{0}")]
180+
Disconnected(String),
174181
#[error("failed to serialize or deserialize exec-server JSON: {0}")]
175182
Json(#[from] serde_json::Error),
176183
#[error("exec-server protocol error: {0}")]
@@ -246,127 +253,85 @@ impl ExecServerClient {
246253
}
247254

248255
pub async fn exec(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
249-
self.inner
250-
.client
251-
.call(EXEC_METHOD, &params)
252-
.await
253-
.map_err(Into::into)
256+
self.call(EXEC_METHOD, &params).await
254257
}
255258

256259
pub async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
257-
self.inner
258-
.client
259-
.call(EXEC_READ_METHOD, &params)
260-
.await
261-
.map_err(Into::into)
260+
self.call(EXEC_READ_METHOD, &params).await
262261
}
263262

264263
pub async fn write(
265264
&self,
266265
process_id: &ProcessId,
267266
chunk: Vec<u8>,
268267
) -> Result<WriteResponse, ExecServerError> {
269-
self.inner
270-
.client
271-
.call(
272-
EXEC_WRITE_METHOD,
273-
&WriteParams {
274-
process_id: process_id.clone(),
275-
chunk: chunk.into(),
276-
},
277-
)
278-
.await
279-
.map_err(Into::into)
268+
self.call(
269+
EXEC_WRITE_METHOD,
270+
&WriteParams {
271+
process_id: process_id.clone(),
272+
chunk: chunk.into(),
273+
},
274+
)
275+
.await
280276
}
281277

282278
pub async fn terminate(
283279
&self,
284280
process_id: &ProcessId,
285281
) -> Result<TerminateResponse, ExecServerError> {
286-
self.inner
287-
.client
288-
.call(
289-
EXEC_TERMINATE_METHOD,
290-
&TerminateParams {
291-
process_id: process_id.clone(),
292-
},
293-
)
294-
.await
295-
.map_err(Into::into)
282+
self.call(
283+
EXEC_TERMINATE_METHOD,
284+
&TerminateParams {
285+
process_id: process_id.clone(),
286+
},
287+
)
288+
.await
296289
}
297290

298291
pub async fn fs_read_file(
299292
&self,
300293
params: FsReadFileParams,
301294
) -> Result<FsReadFileResponse, ExecServerError> {
302-
self.inner
303-
.client
304-
.call(FS_READ_FILE_METHOD, &params)
305-
.await
306-
.map_err(Into::into)
295+
self.call(FS_READ_FILE_METHOD, &params).await
307296
}
308297

309298
pub async fn fs_write_file(
310299
&self,
311300
params: FsWriteFileParams,
312301
) -> Result<FsWriteFileResponse, ExecServerError> {
313-
self.inner
314-
.client
315-
.call(FS_WRITE_FILE_METHOD, &params)
316-
.await
317-
.map_err(Into::into)
302+
self.call(FS_WRITE_FILE_METHOD, &params).await
318303
}
319304

320305
pub async fn fs_create_directory(
321306
&self,
322307
params: FsCreateDirectoryParams,
323308
) -> Result<FsCreateDirectoryResponse, ExecServerError> {
324-
self.inner
325-
.client
326-
.call(FS_CREATE_DIRECTORY_METHOD, &params)
327-
.await
328-
.map_err(Into::into)
309+
self.call(FS_CREATE_DIRECTORY_METHOD, &params).await
329310
}
330311

331312
pub async fn fs_get_metadata(
332313
&self,
333314
params: FsGetMetadataParams,
334315
) -> Result<FsGetMetadataResponse, ExecServerError> {
335-
self.inner
336-
.client
337-
.call(FS_GET_METADATA_METHOD, &params)
338-
.await
339-
.map_err(Into::into)
316+
self.call(FS_GET_METADATA_METHOD, &params).await
340317
}
341318

342319
pub async fn fs_read_directory(
343320
&self,
344321
params: FsReadDirectoryParams,
345322
) -> Result<FsReadDirectoryResponse, ExecServerError> {
346-
self.inner
347-
.client
348-
.call(FS_READ_DIRECTORY_METHOD, &params)
349-
.await
350-
.map_err(Into::into)
323+
self.call(FS_READ_DIRECTORY_METHOD, &params).await
351324
}
352325

353326
pub async fn fs_remove(
354327
&self,
355328
params: FsRemoveParams,
356329
) -> Result<FsRemoveResponse, ExecServerError> {
357-
self.inner
358-
.client
359-
.call(FS_REMOVE_METHOD, &params)
360-
.await
361-
.map_err(Into::into)
330+
self.call(FS_REMOVE_METHOD, &params).await
362331
}
363332

364333
pub async fn fs_copy(&self, params: FsCopyParams) -> Result<FsCopyResponse, ExecServerError> {
365-
self.inner
366-
.client
367-
.call(FS_COPY_METHOD, &params)
368-
.await
369-
.map_err(Into::into)
334+
self.call(FS_COPY_METHOD, &params).await
370335
}
371336

372337
pub(crate) async fn register_session(
@@ -411,7 +376,7 @@ impl ExecServerClient {
411376
&& let Err(err) =
412377
handle_server_notification(&inner, notification).await
413378
{
414-
fail_all_sessions(
379+
mark_disconnected(
415380
&inner,
416381
format!("exec-server notification handling failed: {err}"),
417382
)
@@ -421,7 +386,7 @@ impl ExecServerClient {
421386
}
422387
RpcClientEvent::Disconnected { reason } => {
423388
if let Some(inner) = weak.upgrade() {
424-
fail_all_sessions(&inner, disconnected_message(reason.as_deref()))
389+
mark_disconnected(&inner, disconnected_message(reason.as_deref()))
425390
.await;
426391
}
427392
return;
@@ -434,6 +399,7 @@ impl ExecServerClient {
434399
client: rpc_client,
435400
sessions: ArcSwap::from_pointee(HashMap::new()),
436401
sessions_write_lock: Mutex::new(()),
402+
disconnected: std::sync::RwLock::new(None),
437403
session_id: std::sync::RwLock::new(None),
438404
reader_task,
439405
}
@@ -451,6 +417,37 @@ impl ExecServerClient {
451417
.await
452418
.map_err(ExecServerError::Json)
453419
}
420+
421+
async fn call<P, T>(&self, method: &str, params: &P) -> Result<T, ExecServerError>
422+
where
423+
P: serde::Serialize,
424+
T: serde::de::DeserializeOwned,
425+
{
426+
// Reject new work before allocating a JSON-RPC request id. MCP tool
427+
// calls, process writes, and fs operations all pass through here, so
428+
// this is the shared low-level failure path after executor disconnect.
429+
if let Some(error) = self.inner.disconnected_error() {
430+
return Err(error);
431+
}
432+
433+
match self.inner.client.call(method, params).await {
434+
Ok(response) => Ok(response),
435+
Err(error) => {
436+
let error = ExecServerError::from(error);
437+
if is_transport_closed_error(&error) {
438+
// A call can race with disconnect after the preflight
439+
// check. Latch the disconnect once and fail every
440+
// registered process session before returning this call
441+
// error.
442+
let message = disconnected_message(/*reason*/ None);
443+
let message = mark_disconnected(&self.inner, message).await;
444+
Err(ExecServerError::Disconnected(message))
445+
} else {
446+
Err(error)
447+
}
448+
}
449+
}
450+
}
454451
}
455452

456453
impl From<RpcCallError> for ExecServerError {
@@ -630,6 +627,26 @@ impl Session {
630627
}
631628

632629
impl Inner {
630+
fn disconnected_error(&self) -> Option<ExecServerError> {
631+
self.disconnected
632+
.read()
633+
.unwrap_or_else(std::sync::PoisonError::into_inner)
634+
.clone()
635+
.map(ExecServerError::Disconnected)
636+
}
637+
638+
fn set_disconnected(&self, message: String) -> Option<String> {
639+
let mut disconnected = self
640+
.disconnected
641+
.write()
642+
.unwrap_or_else(std::sync::PoisonError::into_inner);
643+
if disconnected.is_some() {
644+
return None;
645+
}
646+
*disconnected = Some(message.clone());
647+
Some(message)
648+
}
649+
633650
fn get_session(&self, process_id: &ProcessId) -> Option<Arc<SessionState>> {
634651
self.sessions.load().get(process_id).cloned()
635652
}
@@ -640,6 +657,12 @@ impl Inner {
640657
session: Arc<SessionState>,
641658
) -> Result<(), ExecServerError> {
642659
let _sessions_write_guard = self.sessions_write_lock.lock().await;
660+
// Do not register a process session that can never receive executor
661+
// notifications. Without this check, remote MCP startup could create a
662+
// dead session and wait for process output that will never arrive.
663+
if let Some(error) = self.disconnected_error() {
664+
return Err(error);
665+
}
643666
let sessions = self.sessions.load();
644667
if sessions.contains_key(process_id) {
645668
return Err(ExecServerError::Protocol(format!(
@@ -680,20 +703,42 @@ fn disconnected_message(reason: Option<&str>) -> String {
680703
}
681704

682705
fn is_transport_closed_error(error: &ExecServerError) -> bool {
683-
matches!(error, ExecServerError::Closed)
684-
|| matches!(
685-
error,
686-
ExecServerError::Server {
687-
code: -32000,
688-
message,
689-
} if message == "JSON-RPC transport closed"
690-
)
706+
matches!(
707+
error,
708+
ExecServerError::Closed | ExecServerError::Disconnected(_)
709+
) || matches!(
710+
error,
711+
ExecServerError::Server {
712+
code: -32000,
713+
message,
714+
} if message == "JSON-RPC transport closed"
715+
)
716+
}
717+
718+
async fn mark_disconnected(inner: &Arc<Inner>, message: String) -> String {
719+
// The first observer records the canonical disconnect reason and wakes all
720+
// sessions. Later observers reuse that message so concurrent tool calls
721+
// report one consistent environment failure.
722+
if let Some(message) = inner.set_disconnected(message.clone()) {
723+
fail_all_sessions(inner, message.clone()).await;
724+
message
725+
} else {
726+
inner
727+
.disconnected
728+
.read()
729+
.unwrap_or_else(std::sync::PoisonError::into_inner)
730+
.clone()
731+
.unwrap_or(message)
732+
}
691733
}
692734

693735
async fn fail_all_sessions(inner: &Arc<Inner>, message: String) {
694736
let sessions = inner.take_all_sessions().await;
695737

696738
for (_, session) in sessions {
739+
// Sessions synthesize a closed read response and emit a pushed Failed
740+
// event. That covers both polling consumers and streaming consumers
741+
// such as executor-backed MCP stdio.
697742
session.set_failure(message.clone()).await;
698743
}
699744
}

0 commit comments

Comments
 (0)