Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions codex-rs/core/src/unified_exec/process_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::process::UnifiedExecProcess;
use crate::unified_exec::UnifiedExecError;
use async_trait::async_trait;
use codex_exec_server::ExecProcess;
use codex_exec_server::ExecProcessEventReceiver;
use codex_exec_server::ExecServerError;
use codex_exec_server::ProcessId;
use codex_exec_server::ReadResponse;
Expand Down Expand Up @@ -33,6 +34,10 @@ impl ExecProcess for MockExecProcess {
self.wake_tx.subscribe()
}

fn subscribe_events(&self) -> ExecProcessEventReceiver {
ExecProcessEventReceiver::empty()
}

async fn read(
&self,
_after_seq: Option<u64>,
Expand Down
260 changes: 254 additions & 6 deletions codex-rs/exec-server/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::time::Duration;

use arc_swap::ArcSwap;
Expand All @@ -16,6 +18,9 @@ use crate::ProcessId;
use crate::client_api::ExecServerClientConnectOptions;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
use crate::process::ExecProcessEvent;
use crate::process::ExecProcessEventLog;
use crate::process::ExecProcessEventReceiver;
use crate::protocol::EXEC_CLOSED_METHOD;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_METHOD;
Expand Down Expand Up @@ -53,6 +58,7 @@ use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use crate::protocol::InitializeResponse;
use crate::protocol::ProcessOutputChunk;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateParams;
Expand All @@ -65,6 +71,8 @@ use crate::rpc::RpcClientEvent;

const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10);
const PROCESS_EVENT_CHANNEL_CAPACITY: usize = 256;
const PROCESS_EVENT_RETAINED_BYTES: usize = 1024 * 1024;

impl Default for ExecServerClientConnectOptions {
fn default() -> Self {
Expand Down Expand Up @@ -100,9 +108,20 @@ impl RemoteExecServerConnectArgs {

pub(crate) struct SessionState {
wake_tx: watch::Sender<u64>,
events: ExecProcessEventLog,
ordered_events: StdMutex<OrderedSessionEvents>,
failure: Mutex<Option<String>>,
}

#[derive(Default)]
struct OrderedSessionEvents {
last_published_seq: u64,
// Server-side output, exit, and closed notifications are emitted by
// different tasks and can reach the client out of order. Keep future events
// here until all lower sequence numbers have been published.
pending: BTreeMap<u64, ExecProcessEvent>,
}

#[derive(Clone)]
pub(crate) struct Session {
client: ExecServerClient,
Expand Down Expand Up @@ -452,6 +471,11 @@ impl SessionState {
let (wake_tx, _wake_rx) = watch::channel(0);
Self {
wake_tx,
events: ExecProcessEventLog::new(
PROCESS_EVENT_CHANNEL_CAPACITY,
PROCESS_EVENT_RETAINED_BYTES,
),
ordered_events: StdMutex::new(OrderedSessionEvents::default()),
failure: Mutex::new(None),
}
}
Expand All @@ -460,19 +484,71 @@ impl SessionState {
self.wake_tx.subscribe()
}

pub(crate) fn subscribe_events(&self) -> ExecProcessEventReceiver {
self.events.subscribe()
}

fn note_change(&self, seq: u64) {
let next = (*self.wake_tx.borrow()).max(seq);
let _ = self.wake_tx.send(next);
}

/// Publishes a process event only when all earlier sequenced events have
/// already been published.
///
/// Returns `true` only when this call actually publishes the ordered
/// `Closed` event. The caller uses that signal to remove the session route
/// after the terminal event is visible to subscribers, rather than when a
/// possibly-early closed notification first arrives.
fn publish_ordered_event(&self, event: ExecProcessEvent) -> bool {
let Some(seq) = event.seq() else {
self.events.publish(event);
return false;
};

let mut ready = Vec::new();
{
let mut ordered_events = self
.ordered_events
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
// We have already delivered this sequence number or moved past it,
// so accepting it again would duplicate output or lifecycle events.
if seq <= ordered_events.last_published_seq {
return false;
}

ordered_events.pending.entry(seq).or_insert(event);
loop {
let next_seq = ordered_events.last_published_seq + 1;
let Some(event) = ordered_events.pending.remove(&next_seq) else {
break;
};
ordered_events.last_published_seq += 1;
ready.push(event);
}
}

let mut published_closed = false;
for event in ready {
published_closed |= matches!(&event, ExecProcessEvent::Closed { .. });
self.events.publish(event);
}
published_closed
}

async fn set_failure(&self, message: String) {
let mut failure = self.failure.lock().await;
if failure.is_none() {
*failure = Some(message);
let should_publish = failure.is_none();
if should_publish {
*failure = Some(message.clone());
}
drop(failure);
let next = (*self.wake_tx.borrow()).saturating_add(1);
let _ = self.wake_tx.send(next);
if should_publish {
let _ = self.publish_ordered_event(ExecProcessEvent::Failed(message));
}
}

async fn failed_response(&self) -> Option<ReadResponse> {
Expand Down Expand Up @@ -505,6 +581,10 @@ impl Session {
self.state.subscribe()
}

pub(crate) fn subscribe_events(&self) -> ExecProcessEventReceiver {
self.state.subscribe_events()
}

pub(crate) async fn read(
&self,
after_seq: Option<u64>,
Expand Down Expand Up @@ -628,23 +708,44 @@ async fn handle_server_notification(
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
if let Some(session) = inner.get_session(&params.process_id) {
session.note_change(params.seq);
let published_closed =
session.publish_ordered_event(ExecProcessEvent::Output(ProcessOutputChunk {
seq: params.seq,
stream: params.stream,
chunk: params.chunk,
}));
if published_closed {
inner.remove_session(&params.process_id).await;
}
}
}
EXEC_EXITED_METHOD => {
let params: ExecExitedNotification =
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
if let Some(session) = inner.get_session(&params.process_id) {
session.note_change(params.seq);
let published_closed = session.publish_ordered_event(ExecProcessEvent::Exited {
seq: params.seq,
exit_code: params.exit_code,
});
if published_closed {
inner.remove_session(&params.process_id).await;
}
}
}
EXEC_CLOSED_METHOD => {
let params: ExecClosedNotification =
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
// Closed is the terminal lifecycle event for this process, so drop
// the routing entry before forwarding it.
let session = inner.remove_session(&params.process_id).await;
if let Some(session) = session {
if let Some(session) = inner.get_session(&params.process_id) {
session.note_change(params.seq);
// Closed is terminal, but it can arrive before tail output or
// exited. Keep routing this process until the ordered publisher
// says Closed has actually been delivered.
let published_closed =
session.publish_ordered_event(ExecProcessEvent::Closed { seq: params.seq });
if published_closed {
inner.remove_session(&params.process_id).await;
}
}
}
other => {
Expand Down Expand Up @@ -673,14 +774,18 @@ mod tests {
use super::ExecServerClientConnectOptions;
use crate::ProcessId;
use crate::connection::JsonRpcConnection;
use crate::process::ExecProcessEvent;
use crate::protocol::EXEC_CLOSED_METHOD;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::ExecClosedNotification;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecOutputStream;
use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeResponse;
use crate::protocol::ProcessOutputChunk;

async fn read_jsonrpc_line<R>(lines: &mut tokio::io::Lines<BufReader<R>>) -> JSONRPCMessage
where
Expand All @@ -705,6 +810,149 @@ mod tests {
.expect("json-rpc line should write");
}

#[tokio::test]
async fn process_events_are_delivered_in_seq_order_when_notifications_are_reordered() {
let (client_stdin, server_reader) = duplex(1 << 20);
let (mut server_writer, client_stdout) = duplex(1 << 20);
let (notifications_tx, mut notifications_rx) = mpsc::channel(16);
let server = tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let initialize = read_jsonrpc_line(&mut lines).await;
let request = match initialize {
JSONRPCMessage::Request(request) if request.method == INITIALIZE_METHOD => request,
other => panic!("expected initialize request, got {other:?}"),
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: request.id,
result: serde_json::to_value(InitializeResponse {
session_id: "session-1".to_string(),
})
.expect("initialize response should serialize"),
}),
)
.await;

let initialized = read_jsonrpc_line(&mut lines).await;
match initialized {
JSONRPCMessage::Notification(notification)
if notification.method == INITIALIZED_METHOD => {}
other => panic!("expected initialized notification, got {other:?}"),
}

while let Some(message) = notifications_rx.recv().await {
write_jsonrpc_line(&mut server_writer, message).await;
}
});

let client = ExecServerClient::connect(
JsonRpcConnection::from_stdio(
client_stdout,
client_stdin,
"test-exec-server-client".to_string(),
),
ExecServerClientConnectOptions::default(),
)
.await
.expect("client should connect");

let process_id = ProcessId::from("reordered");
let session = client
.register_session(&process_id)
.await
.expect("session should register");
let mut events = session.subscribe_events();

for message in [
JSONRPCMessage::Notification(JSONRPCNotification {
method: EXEC_CLOSED_METHOD.to_string(),
params: Some(
serde_json::to_value(ExecClosedNotification {
process_id: process_id.clone(),
seq: 4,
})
.expect("closed notification should serialize"),
),
}),
JSONRPCMessage::Notification(JSONRPCNotification {
method: EXEC_OUTPUT_DELTA_METHOD.to_string(),
params: Some(
serde_json::to_value(ExecOutputDeltaNotification {
process_id: process_id.clone(),
seq: 1,
stream: ExecOutputStream::Stdout,
chunk: b"one".to_vec().into(),
})
.expect("output notification should serialize"),
),
}),
JSONRPCMessage::Notification(JSONRPCNotification {
method: EXEC_EXITED_METHOD.to_string(),
params: Some(
serde_json::to_value(ExecExitedNotification {
process_id: process_id.clone(),
seq: 3,
exit_code: 0,
})
.expect("exit notification should serialize"),
),
}),
JSONRPCMessage::Notification(JSONRPCNotification {
method: EXEC_OUTPUT_DELTA_METHOD.to_string(),
params: Some(
serde_json::to_value(ExecOutputDeltaNotification {
process_id: process_id.clone(),
seq: 2,
stream: ExecOutputStream::Stderr,
chunk: b"two".to_vec().into(),
})
.expect("output notification should serialize"),
),
}),
] {
notifications_tx
.send(message)
.await
.expect("notification should queue");
}

let mut delivered = Vec::new();
for _ in 0..4 {
delivered.push(
timeout(Duration::from_secs(1), events.recv())
.await
.expect("process event should not time out")
.expect("process event stream should stay open"),
);
}

assert_eq!(
delivered,
vec![
ExecProcessEvent::Output(ProcessOutputChunk {
seq: 1,
stream: ExecOutputStream::Stdout,
chunk: b"one".to_vec().into(),
}),
ExecProcessEvent::Output(ProcessOutputChunk {
seq: 2,
stream: ExecOutputStream::Stderr,
chunk: b"two".to_vec().into(),
}),
ExecProcessEvent::Exited {
seq: 3,
exit_code: 0,
},
ExecProcessEvent::Closed { seq: 4 },
]
);

drop(notifications_tx);
drop(client);
server.await.expect("server task should finish");
}

#[tokio::test]
async fn wake_notifications_do_not_block_other_sessions() {
let (client_stdin, server_reader) = duplex(1 << 20);
Expand Down
Loading
Loading