diff --git a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs index 8250d84f317..cbf0ea6bb23 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs @@ -1,4 +1,5 @@ use super::*; +use crate::CodexThread; use crate::ThreadManager; use crate::codex::make_session_and_context; use crate::config::DEFAULT_AGENT_MAX_DEPTH; @@ -112,6 +113,73 @@ fn history_contains_inter_agent_communication( }) } +async fn wait_for_turn_aborted( + thread: &Arc, + expected_turn_id: &str, + expected_reason: TurnAbortReason, +) { + timeout(Duration::from_secs(5), async { + loop { + let event = thread + .next_event() + .await + .expect("child thread should emit events"); + if matches!( + event.msg, + EventMsg::TurnAborted(TurnAbortedEvent { + turn_id: Some(ref turn_id), + ref reason, + }) if turn_id == expected_turn_id && *reason == expected_reason + ) { + break; + } + } + }) + .await + .expect("expected child turn to be interrupted"); +} + +async fn wait_for_redirected_envelope_in_history( + thread: &Arc, + expected: &InterAgentCommunication, +) { + timeout(Duration::from_secs(5), async { + loop { + let history_items = thread + .codex + .session + .clone_history() + .await + .raw_items() + .to_vec(); + let saw_envelope = + history_contains_inter_agent_communication(&history_items, expected); + let saw_user_message = history_items.iter().any(|item| { + matches!( + item, + ResponseItem::Message { role, content, .. } + if role == "user" + && content.iter().any(|content_item| matches!( + content_item, + ContentItem::InputText { text } + if text == &expected.content + )) + ) + }); + if saw_envelope { + assert!( + !saw_user_message, + "redirected followup should be stored as an assistant envelope, not a plain user message" + ); + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("redirected followup envelope should appear in history"); +} + #[derive(Clone, Copy)] struct NeverEndingTask; @@ -1199,6 +1267,7 @@ async fn multi_agent_v2_followup_task_interrupts_busy_child_without_losing_messa .expect("worker thread should exist"); let active_turn = thread.codex.session.new_default_turn().await; + let interrupted_turn_id = active_turn.sub_id.clone(); thread .codex .session @@ -1243,44 +1312,18 @@ async fn multi_agent_v2_followup_task_interrupts_busy_child_without_losing_messa ) })); - timeout(Duration::from_secs(5), async { - loop { - let history_items = thread - .codex - .session - .clone_history() - .await - .raw_items() - .to_vec(); - let saw_envelope = history_contains_inter_agent_communication( - &history_items, - &InterAgentCommunication::new( - AgentPath::root(), - AgentPath::try_from("/root/worker").expect("agent path"), - Vec::new(), - "continue".to_string(), - /*trigger_turn*/ true, - ), - ); - let saw_user_message = history_items.iter().any(|item| { - matches!( - item, - ResponseItem::Message { role, content, .. } - if role == "user" - && content.iter().any(|content_item| matches!( - content_item, - ContentItem::InputText { text } if text == "continue" - )) - ) - }); - if saw_envelope && !saw_user_message { - break; - } - tokio::time::sleep(Duration::from_millis(10)).await; - } - }) - .await - .expect("interrupting v2 followup_task should preserve the redirected message"); + wait_for_turn_aborted(&thread, &interrupted_turn_id, TurnAbortReason::Interrupted).await; + wait_for_redirected_envelope_in_history( + &thread, + &InterAgentCommunication::new( + AgentPath::root(), + AgentPath::try_from("/root/worker").expect("agent path"), + Vec::new(), + "continue".to_string(), + /*trigger_turn*/ true, + ), + ) + .await; let _ = thread .submit(Op::Shutdown {})