Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 81 additions & 38 deletions codex-rs/core/src/tools/handlers/multi_agents_tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use crate::CodexThread;
use crate::ThreadManager;
use crate::codex::make_session_and_context;
use crate::config::DEFAULT_AGENT_MAX_DEPTH;
Expand Down Expand Up @@ -112,6 +113,73 @@ fn history_contains_inter_agent_communication(
})
}

async fn wait_for_turn_aborted(
thread: &Arc<CodexThread>,
expected_turn_id: &str,
expected_reason: TurnAbortReason,
) {
timeout(Duration::from_secs(5), async {
loop {
let event = thread
.next_event()
.await
.expect("child thread should emit events");
if matches!(
event.msg,
EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: Some(ref turn_id),
ref reason,
}) if turn_id == expected_turn_id && *reason == expected_reason
) {
break;
}
}
})
.await
.expect("expected child turn to be interrupted");
}

async fn wait_for_redirected_envelope_in_history(
thread: &Arc<CodexThread>,
expected: &InterAgentCommunication,
) {
timeout(Duration::from_secs(5), async {
loop {
let history_items = thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
let saw_envelope =
history_contains_inter_agent_communication(&history_items, expected);
let saw_user_message = history_items.iter().any(|item| {
matches!(
item,
ResponseItem::Message { role, content, .. }
if role == "user"
&& content.iter().any(|content_item| matches!(
content_item,
ContentItem::InputText { text }
if text == &expected.content
))
)
});
if saw_envelope {
assert!(
!saw_user_message,
"redirected followup should be stored as an assistant envelope, not a plain user message"
);
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("redirected followup envelope should appear in history");
}

#[derive(Clone, Copy)]
struct NeverEndingTask;

Expand Down Expand Up @@ -1199,6 +1267,7 @@ async fn multi_agent_v2_followup_task_interrupts_busy_child_without_losing_messa
.expect("worker thread should exist");

let active_turn = thread.codex.session.new_default_turn().await;
let interrupted_turn_id = active_turn.sub_id.clone();
thread
.codex
.session
Expand Down Expand Up @@ -1243,44 +1312,18 @@ async fn multi_agent_v2_followup_task_interrupts_busy_child_without_losing_messa
)
}));

timeout(Duration::from_secs(5), async {
loop {
let history_items = thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
let saw_envelope = history_contains_inter_agent_communication(
&history_items,
&InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
Vec::new(),
"continue".to_string(),
/*trigger_turn*/ true,
),
);
let saw_user_message = history_items.iter().any(|item| {
matches!(
item,
ResponseItem::Message { role, content, .. }
if role == "user"
&& content.iter().any(|content_item| matches!(
content_item,
ContentItem::InputText { text } if text == "continue"
))
)
});
if saw_envelope && !saw_user_message {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("interrupting v2 followup_task should preserve the redirected message");
wait_for_turn_aborted(&thread, &interrupted_turn_id, TurnAbortReason::Interrupted).await;
wait_for_redirected_envelope_in_history(
&thread,
&InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
Vec::new(),
"continue".to_string(),
/*trigger_turn*/ true,
),
)
.await;

let _ = thread
.submit(Op::Shutdown {})
Expand Down
Loading