Skip to content

Commit e89f9e2

Browse files
aibrahim-oaicodex
andcommitted
Add pushed exec process events
Co-authored-by: Codex <noreply@openai.com>
1 parent 8978634 commit e89f9e2

6 files changed

Lines changed: 199 additions & 10 deletions

File tree

codex-rs/exec-server/src/client.rs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use arc_swap::ArcSwap;
66
use codex_app_server_protocol::JSONRPCNotification;
77
use serde_json::Value;
88
use tokio::sync::Mutex;
9+
use tokio::sync::broadcast;
910
use tokio::sync::watch;
1011

1112
use tokio::time::timeout;
@@ -16,6 +17,7 @@ use crate::ProcessId;
1617
use crate::client_api::ExecServerClientConnectOptions;
1718
use crate::client_api::RemoteExecServerConnectArgs;
1819
use crate::connection::JsonRpcConnection;
20+
use crate::process::ExecProcessEvent;
1921
use crate::protocol::EXEC_CLOSED_METHOD;
2022
use crate::protocol::EXEC_EXITED_METHOD;
2123
use crate::protocol::EXEC_METHOD;
@@ -53,6 +55,7 @@ use crate::protocol::INITIALIZE_METHOD;
5355
use crate::protocol::INITIALIZED_METHOD;
5456
use crate::protocol::InitializeParams;
5557
use crate::protocol::InitializeResponse;
58+
use crate::protocol::ProcessOutputChunk;
5659
use crate::protocol::ReadParams;
5760
use crate::protocol::ReadResponse;
5861
use crate::protocol::TerminateParams;
@@ -65,6 +68,7 @@ use crate::rpc::RpcClientEvent;
6568

6669
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
6770
const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10);
71+
const PROCESS_EVENT_CHANNEL_CAPACITY: usize = 256;
6872

6973
impl Default for ExecServerClientConnectOptions {
7074
fn default() -> Self {
@@ -100,6 +104,7 @@ impl RemoteExecServerConnectArgs {
100104

101105
pub(crate) struct SessionState {
102106
wake_tx: watch::Sender<u64>,
107+
event_tx: broadcast::Sender<ExecProcessEvent>,
103108
failure: Mutex<Option<String>>,
104109
}
105110

@@ -450,8 +455,10 @@ impl From<RpcCallError> for ExecServerError {
450455
impl SessionState {
451456
fn new() -> Self {
452457
let (wake_tx, _wake_rx) = watch::channel(0);
458+
let (event_tx, _event_rx) = broadcast::channel(PROCESS_EVENT_CHANNEL_CAPACITY);
453459
Self {
454460
wake_tx,
461+
event_tx,
455462
failure: Mutex::new(None),
456463
}
457464
}
@@ -460,19 +467,31 @@ impl SessionState {
460467
self.wake_tx.subscribe()
461468
}
462469

470+
pub(crate) fn subscribe_events(&self) -> broadcast::Receiver<ExecProcessEvent> {
471+
self.event_tx.subscribe()
472+
}
473+
463474
fn note_change(&self, seq: u64) {
464475
let next = (*self.wake_tx.borrow()).max(seq);
465476
let _ = self.wake_tx.send(next);
466477
}
467478

479+
fn publish_event(&self, event: ExecProcessEvent) {
480+
let _ = self.event_tx.send(event);
481+
}
482+
468483
async fn set_failure(&self, message: String) {
469484
let mut failure = self.failure.lock().await;
470-
if failure.is_none() {
471-
*failure = Some(message);
485+
let should_publish = failure.is_none();
486+
if should_publish {
487+
*failure = Some(message.clone());
472488
}
473489
drop(failure);
474490
let next = (*self.wake_tx.borrow()).saturating_add(1);
475491
let _ = self.wake_tx.send(next);
492+
if should_publish {
493+
self.publish_event(ExecProcessEvent::Failed(message));
494+
}
476495
}
477496

478497
async fn failed_response(&self) -> Option<ReadResponse> {
@@ -505,6 +524,10 @@ impl Session {
505524
self.state.subscribe()
506525
}
507526

527+
pub(crate) fn subscribe_events(&self) -> broadcast::Receiver<ExecProcessEvent> {
528+
self.state.subscribe_events()
529+
}
530+
508531
pub(crate) async fn read(
509532
&self,
510533
after_seq: Option<u64>,
@@ -628,13 +651,22 @@ async fn handle_server_notification(
628651
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
629652
if let Some(session) = inner.get_session(&params.process_id) {
630653
session.note_change(params.seq);
654+
session.publish_event(ExecProcessEvent::Output(ProcessOutputChunk {
655+
seq: params.seq,
656+
stream: params.stream,
657+
chunk: params.chunk,
658+
}));
631659
}
632660
}
633661
EXEC_EXITED_METHOD => {
634662
let params: ExecExitedNotification =
635663
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
636664
if let Some(session) = inner.get_session(&params.process_id) {
637665
session.note_change(params.seq);
666+
session.publish_event(ExecProcessEvent::Exited {
667+
seq: params.seq,
668+
exit_code: params.exit_code,
669+
});
638670
}
639671
}
640672
EXEC_CLOSED_METHOD => {
@@ -645,6 +677,7 @@ async fn handle_server_notification(
645677
let session = inner.remove_session(&params.process_id).await;
646678
if let Some(session) = session {
647679
session.note_change(params.seq);
680+
session.publish_event(ExecProcessEvent::Closed { seq: params.seq });
648681
}
649682
}
650683
other => {

codex-rs/exec-server/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub use local_file_system::LOCAL_FS;
3939
pub use local_file_system::LocalFileSystem;
4040
pub use process::ExecBackend;
4141
pub use process::ExecProcess;
42+
pub use process::ExecProcessEvent;
4243
pub use process::StartedExecProcess;
4344
pub use process_id::ProcessId;
4445
pub use protocol::ExecClosedNotification;
@@ -66,6 +67,7 @@ pub use protocol::FsWriteFileParams;
6667
pub use protocol::FsWriteFileResponse;
6768
pub use protocol::InitializeParams;
6869
pub use protocol::InitializeResponse;
70+
pub use protocol::ProcessOutputChunk;
6971
pub use protocol::ReadParams;
7072
pub use protocol::ReadResponse;
7173
pub use protocol::TerminateParams;

codex-rs/exec-server/src/local_process.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ use codex_utils_pty::ExecCommandSession;
1212
use codex_utils_pty::TerminalSize;
1313
use tokio::sync::Mutex;
1414
use tokio::sync::Notify;
15+
use tokio::sync::broadcast;
1516
use tokio::sync::mpsc;
1617
use tokio::sync::watch;
1718

1819
use crate::ExecBackend;
1920
use crate::ExecProcess;
21+
use crate::ExecProcessEvent;
2022
use crate::ExecServerError;
2123
use crate::ProcessId;
2224
use crate::StartedExecProcess;
@@ -45,6 +47,7 @@ use crate::rpc::invalid_request;
4547

4648
const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024;
4749
const NOTIFICATION_CHANNEL_CAPACITY: usize = 256;
50+
const PROCESS_EVENT_CHANNEL_CAPACITY: usize = 256;
4851
#[cfg(test)]
4952
const EXITED_PROCESS_RETENTION: Duration = Duration::from_millis(25);
5053
#[cfg(not(test))]
@@ -66,6 +69,7 @@ struct RunningProcess {
6669
next_seq: u64,
6770
exit_code: Option<i32>,
6871
wake_tx: watch::Sender<u64>,
72+
event_tx: broadcast::Sender<ExecProcessEvent>,
6973
output_notify: Arc<Notify>,
7074
open_streams: usize,
7175
closed: bool,
@@ -90,6 +94,7 @@ struct LocalExecProcess {
9094
process_id: ProcessId,
9195
backend: LocalProcess,
9296
wake_tx: watch::Sender<u64>,
97+
event_tx: broadcast::Sender<ExecProcessEvent>,
9398
}
9499

95100
impl Default for LocalProcess {
@@ -139,7 +144,14 @@ impl LocalProcess {
139144
async fn start_process(
140145
&self,
141146
params: ExecParams,
142-
) -> Result<(ExecResponse, watch::Sender<u64>), JSONRPCErrorError> {
147+
) -> Result<
148+
(
149+
ExecResponse,
150+
watch::Sender<u64>,
151+
broadcast::Sender<ExecProcessEvent>,
152+
),
153+
JSONRPCErrorError,
154+
> {
143155
let process_id = params.process_id.clone();
144156
let (program, args) = params
145157
.argv
@@ -199,6 +211,7 @@ impl LocalProcess {
199211

200212
let output_notify = Arc::new(Notify::new());
201213
let (wake_tx, _wake_rx) = watch::channel(0);
214+
let (event_tx, _event_rx) = broadcast::channel(PROCESS_EVENT_CHANNEL_CAPACITY);
202215
{
203216
let mut process_map = self.inner.processes.lock().await;
204217
process_map.insert(
@@ -212,6 +225,7 @@ impl LocalProcess {
212225
next_seq: 1,
213226
exit_code: None,
214227
wake_tx: wake_tx.clone(),
228+
event_tx: event_tx.clone(),
215229
output_notify: Arc::clone(&output_notify),
216230
open_streams: 2,
217231
closed: false,
@@ -248,13 +262,13 @@ impl LocalProcess {
248262
output_notify,
249263
));
250264

251-
Ok((ExecResponse { process_id }, wake_tx))
265+
Ok((ExecResponse { process_id }, wake_tx, event_tx))
252266
}
253267

254268
pub(crate) async fn exec(&self, params: ExecParams) -> Result<ExecResponse, JSONRPCErrorError> {
255269
self.start_process(params)
256270
.await
257-
.map(|(response, _)| response)
271+
.map(|(response, _, _)| response)
258272
}
259273

260274
pub(crate) async fn exec_read(
@@ -425,7 +439,7 @@ fn shell_environment_policy(env_policy: &ExecEnvPolicy) -> ShellEnvironmentPolic
425439
#[async_trait]
426440
impl ExecBackend for LocalProcess {
427441
async fn start(&self, params: ExecParams) -> Result<StartedExecProcess, ExecServerError> {
428-
let (response, wake_tx) = self
442+
let (response, wake_tx, event_tx) = self
429443
.start_process(params)
430444
.await
431445
.map_err(map_handler_error)?;
@@ -434,6 +448,7 @@ impl ExecBackend for LocalProcess {
434448
process_id: response.process_id,
435449
backend: self.clone(),
436450
wake_tx,
451+
event_tx,
437452
}),
438453
})
439454
}
@@ -449,6 +464,10 @@ impl ExecProcess for LocalExecProcess {
449464
self.wake_tx.subscribe()
450465
}
451466

467+
fn subscribe_events(&self) -> broadcast::Receiver<ExecProcessEvent> {
468+
self.event_tx.subscribe()
469+
}
470+
452471
async fn read(
453472
&self,
454473
after_seq: Option<u64>,
@@ -549,11 +568,19 @@ async fn stream_output(
549568
process.retained_bytes = process.retained_bytes.saturating_sub(evicted.chunk.len());
550569
}
551570
let _ = process.wake_tx.send(seq);
571+
let output = ProcessOutputChunk {
572+
seq,
573+
stream,
574+
chunk: chunk.into(),
575+
};
576+
let _ = process
577+
.event_tx
578+
.send(ExecProcessEvent::Output(output.clone()));
552579
ExecOutputDeltaNotification {
553580
process_id: process_id.clone(),
554581
seq,
555582
stream,
556-
chunk: chunk.into(),
583+
chunk: output.chunk,
557584
}
558585
};
559586
output_notify.notify_waiters();
@@ -581,6 +608,9 @@ async fn watch_exit(
581608
process.next_seq += 1;
582609
process.exit_code = Some(exit_code);
583610
let _ = process.wake_tx.send(seq);
611+
let _ = process
612+
.event_tx
613+
.send(ExecProcessEvent::Exited { seq, exit_code });
584614
Some(ExecExitedNotification {
585615
process_id: process_id.clone(),
586616
seq,
@@ -641,6 +671,7 @@ async fn maybe_emit_closed(process_id: ProcessId, inner: Arc<Inner>) {
641671
let seq = process.next_seq;
642672
process.next_seq += 1;
643673
let _ = process.wake_tx.send(seq);
674+
let _ = process.event_tx.send(ExecProcessEvent::Closed { seq });
644675
Some(ExecClosedNotification {
645676
process_id: process_id.clone(),
646677
seq,

codex-rs/exec-server/src/process.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,50 @@
11
use std::sync::Arc;
22

33
use async_trait::async_trait;
4+
use tokio::sync::broadcast;
45
use tokio::sync::watch;
56

67
use crate::ExecServerError;
78
use crate::ProcessId;
89
use crate::protocol::ExecParams;
10+
use crate::protocol::ProcessOutputChunk;
911
use crate::protocol::ReadResponse;
1012
use crate::protocol::WriteResponse;
1113

1214
pub struct StartedExecProcess {
1315
pub process: Arc<dyn ExecProcess>,
1416
}
1517

18+
/// Pushed process events for consumers that want to follow process output as it
19+
/// arrives instead of polling retained output with [`ExecProcess::read`].
20+
///
21+
/// The stream is scoped to one [`ExecProcess`] handle. `Output` events carry
22+
/// stdout, stderr, or pty bytes. `Exited` reports the process exit status, while
23+
/// `Closed` means all output streams have ended and no more output events will
24+
/// arrive. `Failed` is used when the process session cannot continue, for
25+
/// example because the remote executor connection disconnected.
26+
#[derive(Debug, Clone, PartialEq, Eq)]
27+
pub enum ExecProcessEvent {
28+
Output(ProcessOutputChunk),
29+
Exited { seq: u64, exit_code: i32 },
30+
Closed { seq: u64 },
31+
Failed(String),
32+
}
33+
34+
/// Handle for an executor-managed process.
35+
///
36+
/// Implementations must support both retained-output reads and pushed events:
37+
/// `read` is the request/response API for callers that want to page through
38+
/// buffered output, while `subscribe_events` is the streaming API for callers
39+
/// that want output and lifecycle changes delivered as they happen.
1640
#[async_trait]
1741
pub trait ExecProcess: Send + Sync {
1842
fn process_id(&self) -> &ProcessId;
1943

2044
fn subscribe_wake(&self) -> watch::Receiver<u64>;
2145

46+
fn subscribe_events(&self) -> broadcast::Receiver<ExecProcessEvent>;
47+
2248
async fn read(
2349
&self,
2450
after_seq: Option<u64>,

codex-rs/exec-server/src/remote_process.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
use std::sync::Arc;
22

33
use async_trait::async_trait;
4+
use tokio::sync::broadcast;
45
use tokio::sync::watch;
56
use tracing::trace;
67

78
use crate::ExecBackend;
89
use crate::ExecProcess;
10+
use crate::ExecProcessEvent;
911
use crate::ExecServerError;
1012
use crate::StartedExecProcess;
1113
use crate::client::ExecServerClient;
@@ -56,6 +58,10 @@ impl ExecProcess for RemoteExecProcess {
5658
self.session.subscribe_wake()
5759
}
5860

61+
fn subscribe_events(&self) -> broadcast::Receiver<ExecProcessEvent> {
62+
self.session.subscribe_events()
63+
}
64+
5965
async fn read(
6066
&self,
6167
after_seq: Option<u64>,

0 commit comments

Comments
 (0)