Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions codex-rs/.config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,9 @@ test-group = 'app_server_protocol_codegen'
# Keep the library unit tests parallel.
filter = 'package(codex-app-server) & kind(test)'
test-group = 'app_server_integration'

[[profile.default.overrides]]
# Schema fixture generation can take longer than the default timeout on slower
# Windows runners when app-server protocol fixture sets grow.
filter = 'test(schema_fixtures_match_generated)'
slow-timeout = { period = "1m", terminate-after = 2 }
4 changes: 4 additions & 0 deletions codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ async fn thread_unsubscribe_during_turn_interrupts_turn_and_emits_thread_closed(
wait_for_command_execution_item_started(&mut mcp),
)
.await??;
// `item/started` can arrive before the spawned command reports a process id.
// Give the runtime a brief moment to finish wiring the command so unsubscribe
// consistently exercises the shutdown path on slower CI runners.
tokio::time::sleep(std::time::Duration::from_millis(250)).await;

let unsubscribe_id = mcp
.send_thread_unsubscribe_request(ThreadUnsubscribeParams {
Expand Down
130 changes: 112 additions & 18 deletions codex-rs/tui/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,8 +938,10 @@ impl SubagentRegistry {
#[cfg_attr(not(test), allow(dead_code))]
fn on_close_end(&mut self, event: &CollabCloseEndEvent) -> Option<Box<dyn HistoryCell>> {
let receiver_id = event.receiver_thread_id;
let info = self.agents.get_mut(&receiver_id)?;
info.status = event.status.clone();
let mut info = self.agents.remove(&receiver_id)?;
self.order.retain(|thread_id| *thread_id != receiver_id);
self.pending_events.remove(&receiver_id);
info.status = AgentStatus::Shutdown;
info.latest_update_at = Instant::now();

if is_terminal_status(&info.status) && !info.notified_terminal {
Expand Down Expand Up @@ -2237,9 +2239,18 @@ impl App {
_ => {}
}
} else {
let updates = self.subagents.on_agent_event(thread_id, &event.msg);
for cell in updates {
self.emit_or_queue_subagent_history(cell);
match &event.msg {
EventMsg::CollabCloseEnd(ev) => {
if let Some(cell) = self.subagents.on_close_end(ev) {
self.emit_or_queue_subagent_history(cell);
}
}
_ => {
let updates = self.subagents.on_agent_event(thread_id, &event.msg);
for cell in updates {
self.emit_or_queue_subagent_history(cell);
}
}
}
}

Expand Down Expand Up @@ -2270,7 +2281,7 @@ impl App {

let ThreadItem::CollabAgentToolCall {
id,
tool: CollabAgentTool::SpawnAgent,
tool,
sender_thread_id,
receiver_thread_ids,
prompt,
Expand Down Expand Up @@ -2303,18 +2314,38 @@ impl App {
.map(app_server_collab_state_to_agent_status)
.unwrap_or(AgentStatus::PendingInit);

let _ = self.subagents.on_spawn_end(&CollabAgentSpawnEndEvent {
call_id: id.clone(),
sender_thread_id,
new_thread_id: Some(new_thread_id),
new_agent_nickname: entry.and_then(|entry| entry.agent_nickname.clone()),
new_agent_role: entry.and_then(|entry| entry.agent_role.clone()),
prompt: prompt.clone().unwrap_or_default(),
model: String::new(),
reasoning_effort: ReasoningEffortConfig::Medium,
spawn_mode,
status,
});
match tool {
CollabAgentTool::SpawnAgent => {
let _ = self.subagents.on_spawn_end(&CollabAgentSpawnEndEvent {
call_id: id.clone(),
sender_thread_id,
new_thread_id: Some(new_thread_id),
new_agent_nickname: entry.and_then(|entry| entry.agent_nickname.clone()),
new_agent_role: entry.and_then(|entry| entry.agent_role.clone()),
prompt: prompt.clone().unwrap_or_default(),
model: String::new(),
reasoning_effort: ReasoningEffortConfig::Medium,
spawn_mode,
status,
});
}
CollabAgentTool::CloseAgent => {
if let ServerNotification::ItemCompleted(_) = notification
&& let Some(cell) = self.subagents.on_close_end(&CollabCloseEndEvent {
call_id: id.clone(),
sender_thread_id,
receiver_thread_id: new_thread_id,
receiver_agent_nickname: entry
.and_then(|entry| entry.agent_nickname.clone()),
receiver_agent_role: entry.and_then(|entry| entry.agent_role.clone()),
status,
})
{
self.emit_or_queue_subagent_history(cell);
}
}
CollabAgentTool::SendInput | CollabAgentTool::ResumeAgent | CollabAgentTool::Wait => {}
}

self.sync_subagent_panel_state();
}
Expand Down Expand Up @@ -7913,6 +7944,69 @@ mod tests {
assert!(screen.contains("watchdog-agent"));
}

#[tokio::test]
async fn process_subagent_side_effects_handles_non_root_watchdog_close_end() {
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
let root_thread_id = ThreadId::new();
let watchdog_thread_id = ThreadId::new();
let helper_thread_id = ThreadId::new();

app.primary_thread_id = Some(root_thread_id);
app.active_thread_id = Some(root_thread_id);

app.process_subagent_side_effects(
root_thread_id,
&Event {
id: "spawn-watchdog".to_string(),
msg: EventMsg::CollabAgentSpawnEnd(CollabAgentSpawnEndEvent {
call_id: "spawn-watchdog".to_string(),
sender_thread_id: root_thread_id,
new_thread_id: Some(watchdog_thread_id),
new_agent_nickname: Some("Hume".to_string()),
new_agent_role: Some("watchdog".to_string()),
prompt: "watchdog prompt".to_string(),
model: "gpt-test".to_string(),
reasoning_effort: ReasoningEffortConfig::Low,
spawn_mode: CollabAgentSpawnMode::Watchdog,
status: AgentStatus::PendingInit,
}),
},
);
while app_event_rx.try_recv().is_ok() {}

app.process_subagent_side_effects(
helper_thread_id,
&Event {
id: "close-watchdog".to_string(),
msg: EventMsg::CollabCloseEnd(CollabCloseEndEvent {
call_id: "close-watchdog".to_string(),
sender_thread_id: helper_thread_id,
receiver_thread_id: watchdog_thread_id,
receiver_agent_nickname: Some("Hume".to_string()),
receiver_agent_role: Some("watchdog".to_string()),
status: AgentStatus::Shutdown,
}),
},
);

let mut saw_shutdown_update = false;
let mut saw_clear_panel = false;
while let Ok(event) = app_event_rx.try_recv() {
match event {
AppEvent::InsertHistoryCell(cell) => {
let transcript = lines_to_single_string(&cell.transcript_lines(/*width*/ 80));
saw_shutdown_update |= transcript.contains("shutdown");
}
AppEvent::ClearSubagentPanel => saw_clear_panel = true,
_ => {}
}
}

assert!(saw_shutdown_update);
assert!(saw_clear_panel);
assert!(app.subagents.panel_cell().is_none());
}

#[tokio::test]
async fn replay_thread_snapshot_restores_collaboration_mode_for_draft_submit() {
let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await;
Expand Down
1 change: 1 addition & 0 deletions codex-rs/tui/src/app_server_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,7 @@ fn thread_start_params_from_config(
sandbox: sandbox_mode_from_policy(config.permissions.sandbox_policy.get().clone()),
config: config_request_overrides_from_config(config),
ephemeral: Some(config.ephemeral),
experimental_raw_events: true,
persist_extended_history: true,
..ThreadStartParams::default()
}
Expand Down
8 changes: 4 additions & 4 deletions codex-rs/tui/src/chatwidget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ use codex_protocol::protocol::McpToolCallEndEvent;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::PatchApplyBeginEvent;
use codex_protocol::protocol::RateLimitSnapshot;
#[cfg(test)]
use codex_protocol::protocol::RawResponseItemEvent;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::ReviewTarget;
Expand Down Expand Up @@ -994,7 +993,6 @@ pub(crate) struct ChatWidget {
status_line_branch_lookup_complete: bool,
external_editor_state: ExternalEditorState,
realtime_conversation: RealtimeConversationUiState,
#[cfg(test)]
last_replayed_agent_inbox_message: Option<(Option<String>, String)>,
last_rendered_user_message_event: Option<RenderedUserMessageEvent>,
last_non_retry_error: Option<(String, String)>,
Expand Down Expand Up @@ -3743,7 +3741,6 @@ impl ChatWidget {
self.request_redraw();
}

#[cfg(test)]
fn on_raw_response_item(&mut self, event: RawResponseItemEvent, from_replay: bool) {
let Some((sender, message)) = agent_inbox_message_from_item(&event.item) else {
if from_replay {
Expand Down Expand Up @@ -3831,6 +3828,9 @@ impl ChatWidget {
prompt: prompt.unwrap_or_default(),
model: String::new(),
reasoning_effort: ReasoningEffortConfig::Medium,
// Thread history items do not carry spawn_mode yet, so the
// replay path must choose an explicit fallback for reconstructed
// spawn rows. Plain spawn is the least surprising default.
spawn_mode: AgentSpawnMode::Spawn,
status: first_receiver
.as_ref()
Expand Down Expand Up @@ -4814,7 +4814,6 @@ impl ChatWidget {
status_line_branch_lookup_complete: false,
external_editor_state: ExternalEditorState::Closed,
realtime_conversation: RealtimeConversationUiState::default(),
#[cfg(test)]
last_replayed_agent_inbox_message: None,
last_rendered_user_message_event: None,
last_non_retry_error: None,
Expand Down Expand Up @@ -6631,6 +6630,7 @@ impl ChatWidget {
format!("Agent message: {message}"),
hint,
));
self.request_redraw();
}
}
}
Expand Down
Loading