Skip to content

Commit 3e2028a

Browse files
merge(upstream): sync with openai/codex@main; adopt exec exit status
- Keep ours for TUI and workflows per policy; purge codex-cli images if present - Update SessionManager to set/pass Arc<AtomicBool> to ExecCommandSession - scripts/upstream-merge/verify.sh: ok; ./build-fast.sh: ok (zero warnings)
2 parents 1139464 + 6aafe37 commit 3e2028a

File tree

5 files changed

+127
-71
lines changed

5 files changed

+127
-71
lines changed

.github/auto/MERGE_PLAN.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Upstream Merge Plan (by-bucket)
2+
3+
Mode: by-bucket
4+
Upstream: openai/codex@main → branch `upstream-merge`
5+
6+
Policy
7+
- Prefer ours: codex-rs/tui/**, codex-cli/**, core wiring files (openai_tools.rs, codex.rs, agent_tool.rs, default_client.rs), protocol models, top-level docs and workflows.
8+
- Prefer theirs: codex-rs/common/**, codex-rs/exec/**, codex-rs/file-search/** (unless it breaks our build or fork-specific invariants).
9+
- Purge images matching .github/codex-cli-*.{png,jpg,jpeg,webp}.
10+
- Outside protected paths, adopt upstream to stay current.
11+
12+
Buckets
13+
1) Shared infra/docs/workflows: adopt upstream except for our custom CI/workflow/doco where policy prefers ours.
14+
2) Rust common/exec/file-search: adopt upstream (prefer-theirs), reconcile minimal API drift.
15+
3) Core + Protocol: keep ours; cherry-pick clearly compatible upstream fixes without breaking browser/agent/web_fetch, UA/version, and re-exports.
16+
4) TUI/CLI: keep ours; port obviously-safe bug fixes when non-invasive.
17+
18+
Invariants
19+
- Tool families preserved: browser_*, agent_*, web_fetch. Keep gating/exposure logic intact.
20+
- Screenshot queue semantics unchanged across turns.
21+
- Version/UA helpers: keep codex_version::version() and get_codex_user_agent_default() uses.
22+
- Public re-exports in codex-core stay (ModelClient, Prompt, ResponseEvent, ResponseStream; models alias).
23+
24+
Process
25+
- Merge upstream/main with --no-commit and reconcile per policy.
26+
- Ensure purge_globs remain deleted.
27+
- Run scripts/upstream-merge/verify.sh and fix minimal issues.
28+
- Validate build with ./build-fast.sh with zero warnings.
29+
- Commit with a clear merge message and generate MERGE_REPORT.md.

.github/auto/MERGE_REPORT.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Upstream Merge Report
2+
3+
Branch: `upstream-merge`
4+
Upstream: `openai/codex@main`
5+
Mode: by-bucket
6+
Date: $(date -u +%Y-%m-%dT%H:%M:%SZ)
7+
8+
## Incorporated
9+
- core/exec_command: adopted upstream `ExecCommandSession` changes (added exit status tracking via `Arc<AtomicBool>`), and updated our `SessionManager` to pass the new argument and set the flag in the wait thread.
10+
- core/unified_exec: upstream module and wiring retained (already present); uses `has_exited()` without changing our external API.
11+
- General upstream updates merged outside protected areas per policy.
12+
13+
## Kept Ours
14+
- TUI: resolved conflicts in `codex-rs/tui/**` in favor of our fork (strict ordering, history cell UX, tool previews, etc.).
15+
- CI/Workflows: kept our `.github/workflows/**` (removed upstream `rust-release.yml` which we previously deleted).
16+
- Core invariants preserved: `openai_tools.rs`, `codex.rs`, `agent_tool.rs`, `default_client.rs`, and protocol models mapping; UA/version helpers and tool gating remain intact (verify guard passed).
17+
18+
## Dropped/Rejected
19+
- Purged any reintroduced `.github/codex-cli-*` images if present (none remained after merge).
20+
21+
## Notes
22+
- Addressed post-merge compile drift by adding exit status handling in `SessionManager` and silencing dead_code warnings on the new `exit_status` field and `has_exited()` to maintain zero-warning policy.
23+
- `scripts/upstream-merge/verify.sh` passed (build_fast=ok, api_check=ok).
24+
- `./build-fast.sh` completes with no warnings.
25+
26+
## Follow-ups (none required)
27+
- No API surface changes; re-exports in `codex-core` preserved.
28+
- No ICU/sys-locale removals detected.
Lines changed: 39 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
#![allow(dead_code)]
21
use std::sync::Mutex as StdMutex;
32

43
use tokio::sync::broadcast;
@@ -12,9 +11,6 @@ pub(crate) struct ExecCommandSession {
1211
/// Broadcast stream of output chunks read from the PTY. New subscribers
1312
/// receive only chunks emitted after they subscribe.
1413
output_tx: broadcast::Sender<Vec<u8>>,
15-
/// Receiver subscribed before the child process starts emitting output so
16-
/// the first caller can consume any early data without races.
17-
initial_output_rx: StdMutex<Option<broadcast::Receiver<Vec<u8>>>>,
1814

1915
/// Child killer handle for termination on drop (can signal independently
2016
/// of a thread blocked in `.wait()`).
@@ -28,6 +24,9 @@ pub(crate) struct ExecCommandSession {
2824

2925
/// JoinHandle for the child wait task.
3026
wait_handle: StdMutex<Option<JoinHandle<()>>>,
27+
28+
/// Tracks whether the underlying process has exited.
29+
exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
3130
}
3231

3332
impl ExecCommandSession {
@@ -38,65 +37,60 @@ impl ExecCommandSession {
3837
reader_handle: JoinHandle<()>,
3938
writer_handle: JoinHandle<()>,
4039
wait_handle: JoinHandle<()>,
41-
) -> Self {
42-
Self {
43-
writer_tx,
44-
output_tx,
45-
initial_output_rx: StdMutex::new(None),
46-
killer: StdMutex::new(Some(killer)),
47-
reader_handle: StdMutex::new(Some(reader_handle)),
48-
writer_handle: StdMutex::new(Some(writer_handle)),
49-
wait_handle: StdMutex::new(Some(wait_handle)),
50-
}
51-
}
52-
53-
pub(crate) fn set_initial_output_receiver(&self, receiver: broadcast::Receiver<Vec<u8>>) {
54-
if let Ok(mut guard) = self.initial_output_rx.lock()
55-
&& guard.is_none()
56-
{
57-
*guard = Some(receiver);
58-
}
40+
exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
41+
) -> (Self, broadcast::Receiver<Vec<u8>>) {
42+
let initial_output_rx = output_tx.subscribe();
43+
(
44+
Self {
45+
writer_tx,
46+
output_tx,
47+
killer: StdMutex::new(Some(killer)),
48+
reader_handle: StdMutex::new(Some(reader_handle)),
49+
writer_handle: StdMutex::new(Some(writer_handle)),
50+
wait_handle: StdMutex::new(Some(wait_handle)),
51+
exit_status,
52+
},
53+
initial_output_rx,
54+
)
5955
}
6056

6157
pub(crate) fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
6258
self.writer_tx.clone()
6359
}
6460

6561
pub(crate) fn output_receiver(&self) -> broadcast::Receiver<Vec<u8>> {
66-
if let Ok(mut guard) = self.initial_output_rx.lock()
67-
&& let Some(receiver) = guard.take()
68-
{
69-
receiver
70-
} else {
71-
self.output_tx.subscribe()
72-
}
62+
self.output_tx.subscribe()
63+
}
64+
65+
pub(crate) fn has_exited(&self) -> bool {
66+
self.exit_status.load(std::sync::atomic::Ordering::SeqCst)
7367
}
7468
}
7569

7670
impl Drop for ExecCommandSession {
7771
fn drop(&mut self) {
7872
// Best-effort: terminate child first so blocking tasks can complete.
79-
if let Ok(mut killer_opt) = self.killer.lock() {
80-
if let Some(mut killer) = killer_opt.take() {
81-
let _ = killer.kill();
82-
}
73+
if let Ok(mut killer_opt) = self.killer.lock()
74+
&& let Some(mut killer) = killer_opt.take()
75+
{
76+
let _ = killer.kill();
8377
}
8478

8579
// Abort background tasks; they may already have exited after kill.
86-
if let Ok(mut h) = self.reader_handle.lock() {
87-
if let Some(handle) = h.take() {
88-
handle.abort();
89-
}
80+
if let Ok(mut h) = self.reader_handle.lock()
81+
&& let Some(handle) = h.take()
82+
{
83+
handle.abort();
9084
}
91-
if let Ok(mut h) = self.writer_handle.lock() {
92-
if let Some(handle) = h.take() {
93-
handle.abort();
94-
}
85+
if let Ok(mut h) = self.writer_handle.lock()
86+
&& let Some(handle) = h.take()
87+
{
88+
handle.abort();
9589
}
96-
if let Ok(mut h) = self.wait_handle.lock() {
97-
if let Some(handle) = h.take() {
98-
handle.abort();
99-
}
90+
if let Ok(mut h) = self.wait_handle.lock()
91+
&& let Some(handle) = h.take()
92+
{
93+
handle.abort();
10094
}
10195
}
10296
}

codex-rs/core/src/exec_command/session_manager.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,18 +92,16 @@ impl SessionManager {
9292
.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
9393
);
9494

95-
let (session, mut exit_rx) =
96-
create_exec_command_session(params.clone())
97-
.await
98-
.map_err(|err| {
99-
format!(
100-
"failed to create exec command session for session id {}: {err}",
101-
session_id.0
102-
)
103-
})?;
95+
let (session, mut output_rx, mut exit_rx) = create_exec_command_session(params.clone())
96+
.await
97+
.map_err(|err| {
98+
format!(
99+
"failed to create exec command session for session id {}: {err}",
100+
session_id.0
101+
)
102+
})?;
104103

105104
// Insert into session map.
106-
let mut output_rx = session.output_receiver();
107105
self.sessions.lock().await.insert(session_id, session);
108106

109107
// Collect output until either timeout expires or process exits.
@@ -251,7 +249,11 @@ impl SessionManager {
251249
/// Spawn PTY and child process per spawn_exec_command_session logic.
252250
async fn create_exec_command_session(
253251
params: ExecCommandParams,
254-
) -> anyhow::Result<(ExecCommandSession, oneshot::Receiver<i32>)> {
252+
) -> anyhow::Result<(
253+
ExecCommandSession,
254+
tokio::sync::broadcast::Receiver<Vec<u8>>,
255+
oneshot::Receiver<i32>,
256+
)> {
255257
let ExecCommandParams {
256258
cmd,
257259
yield_time_ms: _,
@@ -285,8 +287,6 @@ async fn create_exec_command_session(
285287
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
286288
// Broadcast for streaming PTY output to readers: subscribers receive from subscription time.
287289
let (output_tx, _) = tokio::sync::broadcast::channel::<Vec<u8>>(256);
288-
let initial_output_rx = output_tx.subscribe();
289-
290290
// Reader task: drain PTY and forward chunks to output channel.
291291
let mut reader = pair.master.try_clone_reader()?;
292292
let output_tx_clone = output_tx.clone();
@@ -345,16 +345,15 @@ async fn create_exec_command_session(
345345
});
346346

347347
// Create and store the session with channels.
348-
let session = ExecCommandSession::new(
348+
let (session, initial_output_rx) = ExecCommandSession::new(
349349
writer_tx,
350350
output_tx,
351351
killer,
352352
reader_handle,
353353
writer_handle,
354354
wait_handle,
355355
);
356-
session.set_initial_output_receiver(initial_output_rx);
357-
Ok((session, exit_rx))
356+
Ok((session, initial_output_rx, exit_rx))
358357
}
359358

360359
/// Truncate the middle of a UTF-8 string to at most `max_bytes` bytes,

codex-rs/core/src/unified_exec/mod.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,13 @@ type OutputBuffer = Arc<Mutex<OutputBufferState>>;
100100
type OutputHandles = (OutputBuffer, Arc<Notify>);
101101

102102
impl ManagedUnifiedExecSession {
103-
fn new(session: ExecCommandSession) -> Self {
103+
fn new(
104+
session: ExecCommandSession,
105+
initial_output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
106+
) -> Self {
104107
let output_buffer = Arc::new(Mutex::new(OutputBufferState::default()));
105108
let output_notify = Arc::new(Notify::new());
106-
let mut receiver = session.output_receiver();
109+
let mut receiver = initial_output_rx;
107110
let buffer_clone = Arc::clone(&output_buffer);
108111
let notify_clone = Arc::clone(&output_notify);
109112
let output_task = tokio::spawn(async move {
@@ -193,8 +196,8 @@ impl UnifiedExecSessionManager {
193196
} else {
194197
let command = request.input_chunks.to_vec();
195198
let new_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
196-
let session = create_unified_exec_session(&command).await?;
197-
let managed_session = ManagedUnifiedExecSession::new(session);
199+
let (session, initial_output_rx) = create_unified_exec_session(&command).await?;
200+
let managed_session = ManagedUnifiedExecSession::new(session, initial_output_rx);
198201
let (buffer, notify) = managed_session.output_handles();
199202
writer_tx = managed_session.writer_sender();
200203
output_buffer = buffer;
@@ -297,7 +300,13 @@ impl UnifiedExecSessionManager {
297300

298301
async fn create_unified_exec_session(
299302
command: &[String],
300-
) -> Result<ExecCommandSession, UnifiedExecError> {
303+
) -> Result<
304+
(
305+
ExecCommandSession,
306+
tokio::sync::broadcast::Receiver<Vec<u8>>,
307+
),
308+
UnifiedExecError,
309+
> {
301310
if command.is_empty() {
302311
return Err(UnifiedExecError::MissingCommandLine);
303312
}
@@ -327,7 +336,6 @@ async fn create_unified_exec_session(
327336

328337
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
329338
let (output_tx, _) = tokio::sync::broadcast::channel::<Vec<u8>>(256);
330-
let initial_output_rx = output_tx.subscribe();
331339

332340
let mut reader = pair
333341
.master
@@ -381,7 +389,7 @@ async fn create_unified_exec_session(
381389
wait_exit_status.store(true, Ordering::SeqCst);
382390
});
383391

384-
let session = ExecCommandSession::new(
392+
let (session, initial_output_rx) = ExecCommandSession::new(
385393
writer_tx,
386394
output_tx,
387395
killer,
@@ -390,9 +398,7 @@ async fn create_unified_exec_session(
390398
wait_handle,
391399
exit_status,
392400
);
393-
session.set_initial_output_receiver(initial_output_rx);
394-
395-
Ok(session)
401+
Ok((session, initial_output_rx))
396402
}
397403

398404
#[cfg(test)]

0 commit comments

Comments
 (0)