Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 47 additions & 53 deletions codex-rs/utils/pty/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ fn echo_sleep_command(marker: &str) -> String {
}

fn split_stdout_stderr_command() -> String {
"printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string()
if cfg!(windows) {
// Keep this in cmd.exe syntax so the test does not depend on a runner-local
// PowerShell/Python setup just to produce deterministic split output.
"(echo split-out)&(>&2 echo split-err)".to_string()
} else {
"printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string()
}
}

async fn collect_split_output(mut output_rx: tokio::sync::mpsc::Receiver<Vec<u8>>) -> Vec<u8> {
Expand Down Expand Up @@ -130,52 +136,36 @@ async fn collect_output_until_exit(
}

async fn wait_for_python_repl_ready(
writer: &tokio::sync::mpsc::Sender<Vec<u8>>,
output_rx: &mut tokio::sync::broadcast::Receiver<Vec<u8>>,
timeout_ms: u64,
newline: &str,
ready_marker: &str,
) -> anyhow::Result<Vec<u8>> {
let mut collected = Vec::new();
let marker = "__codex_pty_ready__";
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms);
let probe_window = tokio::time::Duration::from_millis(if cfg!(windows) { 750 } else { 250 });

while tokio::time::Instant::now() < deadline {
writer
.send(format!("print('{marker}'){newline}").into_bytes())
.await?;

let probe_deadline = tokio::time::Instant::now() + probe_window;
loop {
let now = tokio::time::Instant::now();
if now >= deadline || now >= probe_deadline {
break;
}
let remaining = std::cmp::min(
deadline.saturating_duration_since(now),
probe_deadline.saturating_duration_since(now),
);
match tokio::time::timeout(remaining, output_rx.recv()).await {
Ok(Ok(chunk)) => {
collected.extend_from_slice(&chunk);
if String::from_utf8_lossy(&collected).contains(marker) {
return Ok(collected);
}
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
anyhow::bail!(
"PTY output closed while waiting for Python REPL readiness: {:?}",
String::from_utf8_lossy(&collected)
);
let now = tokio::time::Instant::now();
let remaining = deadline.saturating_duration_since(now);
match tokio::time::timeout(remaining, output_rx.recv()).await {
Ok(Ok(chunk)) => {
collected.extend_from_slice(&chunk);
if String::from_utf8_lossy(&collected).contains(ready_marker) {
return Ok(collected);
}
Err(_) => break,
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
anyhow::bail!(
"PTY output closed while waiting for Python REPL readiness: {:?}",
String::from_utf8_lossy(&collected)
);
}
Err(_) => break,
}
}

anyhow::bail!(
"timed out waiting for Python REPL readiness in PTY: {:?}",
"timed out waiting for Python REPL readiness marker {ready_marker:?} in PTY: {:?}",
String::from_utf8_lossy(&collected)
);
}
Expand Down Expand Up @@ -254,10 +244,17 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
return Ok(());
};

let ready_marker = "__codex_pty_ready__";
let args = vec![
"-i".to_string(),
"-q".to_string(),
"-c".to_string(),
format!("print('{ready_marker}')"),
];
let env_map: HashMap<String, String> = std::env::vars().collect();
let spawned = spawn_pty_process(
&python,
&[],
&args,
Path::new("."),
&env_map,
&None,
Expand All @@ -269,7 +266,7 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
let newline = if cfg!(windows) { "\r\n" } else { "\n" };
let startup_timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 };
let mut output =
wait_for_python_repl_ready(&writer, &mut output_rx, startup_timeout_ms, newline).await?;
wait_for_python_repl_ready(&mut output_rx, startup_timeout_ms, ready_marker).await?;
writer
.send(format!("print('hello from pty'){newline}").into_bytes())
.await?;
Expand Down Expand Up @@ -427,21 +424,7 @@ async fn pipe_drains_stderr_without_stdout_activity() -> anyhow::Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()> {
let env_map: HashMap<String, String> = std::env::vars().collect();
let (program, args) = if cfg!(windows) {
let Some(python) = find_python() else {
eprintln!("python not found; skipping pipe_process_can_expose_split_stdout_and_stderr");
return Ok(());
};
(
python,
vec![
"-c".to_string(),
"import sys; sys.stdout.buffer.write(b'split-out\\n'); sys.stdout.buffer.flush(); sys.stderr.buffer.write(b'split-err\\n'); sys.stderr.buffer.flush()".to_string(),
],
)
} else {
shell_command(&split_stdout_stderr_command())
};
let (program, args) = shell_command(&split_stdout_stderr_command());
let spawned =
spawn_pipe_process_no_stdin(&program, &args, Path::new("."), &env_map, &None).await?;
let SpawnedProcess {
Expand All @@ -464,8 +447,19 @@ async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()>
.await
.map_err(|_| anyhow::anyhow!("timed out waiting to drain split stderr"))??;

assert_eq!(stdout, b"split-out\n".to_vec());
assert_eq!(stderr, b"split-err\n".to_vec());
let expected_stdout = if cfg!(windows) {
b"split-out\r\n".to_vec()
} else {
b"split-out\n".to_vec()
};
let expected_stderr = if cfg!(windows) {
b"split-err\r\n".to_vec()
} else {
b"split-err\n".to_vec()
};

assert_eq!(stdout, expected_stdout);
assert_eq!(stderr, expected_stderr);
assert_eq!(code, 0);

Ok(())
Expand Down
Loading