diff --git a/codex-rs/utils/pty/src/tests.rs b/codex-rs/utils/pty/src/tests.rs index 528bdf9890f..2b2bacb1051 100644 --- a/codex-rs/utils/pty/src/tests.rs +++ b/codex-rs/utils/pty/src/tests.rs @@ -56,7 +56,13 @@ fn echo_sleep_command(marker: &str) -> String { } fn split_stdout_stderr_command() -> String { - "printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string() + if cfg!(windows) { + // Keep this in cmd.exe syntax so the test does not depend on a runner-local + // PowerShell/Python setup just to produce deterministic split output. + "(echo split-out)&(>&2 echo split-err)".to_string() + } else { + "printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string() + } } async fn collect_split_output(mut output_rx: tokio::sync::mpsc::Receiver>) -> Vec { @@ -130,52 +136,36 @@ async fn collect_output_until_exit( } async fn wait_for_python_repl_ready( - writer: &tokio::sync::mpsc::Sender>, output_rx: &mut tokio::sync::broadcast::Receiver>, timeout_ms: u64, - newline: &str, + ready_marker: &str, ) -> anyhow::Result> { let mut collected = Vec::new(); - let marker = "__codex_pty_ready__"; let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms); - let probe_window = tokio::time::Duration::from_millis(if cfg!(windows) { 750 } else { 250 }); while tokio::time::Instant::now() < deadline { - writer - .send(format!("print('{marker}'){newline}").into_bytes()) - .await?; - - let probe_deadline = tokio::time::Instant::now() + probe_window; - loop { - let now = tokio::time::Instant::now(); - if now >= deadline || now >= probe_deadline { - break; - } - let remaining = std::cmp::min( - deadline.saturating_duration_since(now), - probe_deadline.saturating_duration_since(now), - ); - match tokio::time::timeout(remaining, output_rx.recv()).await { - Ok(Ok(chunk)) => { - collected.extend_from_slice(&chunk); - if String::from_utf8_lossy(&collected).contains(marker) { - return Ok(collected); - } - } - Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue, - Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => { - anyhow::bail!( - "PTY output closed while waiting for Python REPL readiness: {:?}", - String::from_utf8_lossy(&collected) - ); + let now = tokio::time::Instant::now(); + let remaining = deadline.saturating_duration_since(now); + match tokio::time::timeout(remaining, output_rx.recv()).await { + Ok(Ok(chunk)) => { + collected.extend_from_slice(&chunk); + if String::from_utf8_lossy(&collected).contains(ready_marker) { + return Ok(collected); } - Err(_) => break, } + Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue, + Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => { + anyhow::bail!( + "PTY output closed while waiting for Python REPL readiness: {:?}", + String::from_utf8_lossy(&collected) + ); + } + Err(_) => break, } } anyhow::bail!( - "timed out waiting for Python REPL readiness in PTY: {:?}", + "timed out waiting for Python REPL readiness marker {ready_marker:?} in PTY: {:?}", String::from_utf8_lossy(&collected) ); } @@ -254,10 +244,17 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> { return Ok(()); }; + let ready_marker = "__codex_pty_ready__"; + let args = vec![ + "-i".to_string(), + "-q".to_string(), + "-c".to_string(), + format!("print('{ready_marker}')"), + ]; let env_map: HashMap = std::env::vars().collect(); let spawned = spawn_pty_process( &python, - &[], + &args, Path::new("."), &env_map, &None, @@ -269,7 +266,7 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> { let newline = if cfg!(windows) { "\r\n" } else { "\n" }; let startup_timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 }; let mut output = - wait_for_python_repl_ready(&writer, &mut output_rx, startup_timeout_ms, newline).await?; + wait_for_python_repl_ready(&mut output_rx, startup_timeout_ms, ready_marker).await?; writer .send(format!("print('hello from pty'){newline}").into_bytes()) .await?; @@ -427,21 +424,7 @@ async fn pipe_drains_stderr_without_stdout_activity() -> anyhow::Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()> { let env_map: HashMap = std::env::vars().collect(); - let (program, args) = if cfg!(windows) { - let Some(python) = find_python() else { - eprintln!("python not found; skipping pipe_process_can_expose_split_stdout_and_stderr"); - return Ok(()); - }; - ( - python, - vec![ - "-c".to_string(), - "import sys; sys.stdout.buffer.write(b'split-out\\n'); sys.stdout.buffer.flush(); sys.stderr.buffer.write(b'split-err\\n'); sys.stderr.buffer.flush()".to_string(), - ], - ) - } else { - shell_command(&split_stdout_stderr_command()) - }; + let (program, args) = shell_command(&split_stdout_stderr_command()); let spawned = spawn_pipe_process_no_stdin(&program, &args, Path::new("."), &env_map, &None).await?; let SpawnedProcess { @@ -464,8 +447,19 @@ async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()> .await .map_err(|_| anyhow::anyhow!("timed out waiting to drain split stderr"))??; - assert_eq!(stdout, b"split-out\n".to_vec()); - assert_eq!(stderr, b"split-err\n".to_vec()); + let expected_stdout = if cfg!(windows) { + b"split-out\r\n".to_vec() + } else { + b"split-out\n".to_vec() + }; + let expected_stderr = if cfg!(windows) { + b"split-err\r\n".to_vec() + } else { + b"split-err\n".to_vec() + }; + + assert_eq!(stdout, expected_stdout); + assert_eq!(stderr, expected_stderr); assert_eq!(code, 0); Ok(())