diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index b2d5c0a75b7..4a6425659dc 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -2979,7 +2979,7 @@ impl CodexMessageProcessor { } } } else { - let Some(thread) = loaded_thread else { + let Some(thread) = loaded_thread.as_ref() else { self.send_invalid_request_error( request_id, format!("thread not loaded: {thread_uuid}"), @@ -3033,11 +3033,21 @@ impl CodexMessageProcessor { } } - thread.status = resolve_thread_status( - self.thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await, - false, + let has_live_in_progress_turn = if let Some(loaded_thread) = loaded_thread.as_ref() { + matches!(loaded_thread.agent_status().await, AgentStatus::Running) + } else { + false + }; + + let thread_status = self + .thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await; + + set_thread_status_and_interrupt_stale_turns( + &mut thread, + thread_status, + has_live_in_progress_turn, ); let response = ThreadReadResponse { thread }; self.outgoing.send_response(request_id, response).await; @@ -3245,12 +3255,12 @@ impl CodexMessageProcessor { .upsert_thread(thread.clone()) .await; - thread.status = resolve_thread_status( - self.thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await, - false, - ); + let thread_status = self + .thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await; + + set_thread_status_and_interrupt_stale_turns(&mut thread, thread_status, false); let response = ThreadResumeResponse { thread, @@ -6350,6 +6360,7 @@ impl CodexMessageProcessor { }; handle_thread_listener_command( conversation_id, + &conversation, codex_home.as_path(), &thread_state_manager, &thread_state, @@ -6712,8 +6723,10 @@ impl CodexMessageProcessor { } } +#[allow(clippy::too_many_arguments)] async fn handle_thread_listener_command( conversation_id: ThreadId, + conversation: &Arc, codex_home: &Path, thread_state_manager: &ThreadStateManager, thread_state: &Arc>, @@ -6725,6 +6738,7 @@ async fn handle_thread_listener_command( ThreadListenerCommand::SendThreadResumeResponse(resume_request) => { handle_pending_thread_resume_request( conversation_id, + conversation, codex_home, thread_state_manager, thread_state, @@ -6750,8 +6764,10 @@ async fn handle_thread_listener_command( } } +#[allow(clippy::too_many_arguments)] async fn handle_pending_thread_resume_request( conversation_id: ThreadId, + conversation: &Arc, codex_home: &Path, thread_state_manager: &ThreadStateManager, thread_state: &Arc>, @@ -6771,9 +6787,11 @@ async fn handle_pending_thread_resume_request( active_turn_status = ?active_turn.as_ref().map(|turn| &turn.status), "composing running thread resume response" ); - let mut has_in_progress_turn = active_turn - .as_ref() - .is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress)); + let has_live_in_progress_turn = + matches!(conversation.agent_status().await, AgentStatus::Running) + || active_turn + .as_ref() + .is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress)); let request_id = pending.request_id; let connection_id = request_id.connection_id; @@ -6798,19 +6816,15 @@ async fn handle_pending_thread_resume_request( return; } - has_in_progress_turn = has_in_progress_turn - || thread - .turns - .iter() - .any(|turn| matches!(turn.status, TurnStatus::InProgress)); + let thread_status = thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await; - let status = resolve_thread_status( - thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await, - has_in_progress_turn, + set_thread_status_and_interrupt_stale_turns( + &mut thread, + thread_status, + has_live_in_progress_turn, ); - thread.status = status; match find_thread_name_by_id(codex_home, &conversation_id).await { Ok(thread_name) => thread.name = thread_name, @@ -6908,6 +6922,22 @@ fn merge_turn_history_with_active_turn(turns: &mut Vec, active_turn: Turn) turns.push(active_turn); } +fn set_thread_status_and_interrupt_stale_turns( + thread: &mut Thread, + loaded_status: ThreadStatus, + has_live_in_progress_turn: bool, +) { + let status = resolve_thread_status(loaded_status, has_live_in_progress_turn); + if !matches!(status, ThreadStatus::Active { .. }) { + for turn in &mut thread.turns { + if matches!(turn.status, TurnStatus::InProgress) { + turn.status = TurnStatus::Interrupted; + } + } + } + thread.status = status; +} + fn collect_resume_override_mismatches( request: &ThreadResumeParams, config_snapshot: &ThreadConfigSnapshot, diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 7a3c04f572c..eb8887bde38 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -25,6 +25,8 @@ use codex_app_server_protocol::SessionSource; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadMetadataGitInfoUpdateParams; use codex_app_server_protocol::ThreadMetadataUpdateParams; +use codex_app_server_protocol::ThreadReadParams; +use codex_app_server_protocol::ThreadReadResponse; use codex_app_server_protocol::ThreadResumeParams; use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadStartParams; @@ -38,9 +40,12 @@ use codex_protocol::ThreadId; use codex_protocol::config_types::Personality; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::AgentMessageEvent; +use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource as RolloutSessionSource; +use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::user_input::ByteRange; use codex_protocol::user_input::TextElement; use codex_state::StateRuntime; @@ -398,6 +403,120 @@ stream_max_retries = 0 Ok(()) } +#[tokio::test] +async fn thread_resume_and_read_interrupt_incomplete_rollout_turn_when_thread_is_idle() -> Result<()> +{ + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let filename_ts = "2025-01-05T12-00-00"; + let meta_rfc3339 = "2025-01-05T12:00:00Z"; + let conversation_id = create_fake_rollout_with_text_elements( + codex_home.path(), + filename_ts, + meta_rfc3339, + "Saved user message", + Vec::new(), + Some("mock_provider"), + None, + )?; + let rollout_file_path = rollout_path(codex_home.path(), filename_ts, &conversation_id); + let persisted_rollout = std::fs::read_to_string(&rollout_file_path)?; + let turn_id = "incomplete-turn"; + let appended_rollout = [ + json!({ + "timestamp": meta_rfc3339, + "type": "event_msg", + "payload": serde_json::to_value(EventMsg::TurnStarted(TurnStartedEvent { + turn_id: turn_id.to_string(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }))?, + }) + .to_string(), + json!({ + "timestamp": meta_rfc3339, + "type": "event_msg", + "payload": serde_json::to_value(EventMsg::AgentMessage(AgentMessageEvent { + message: "Still running".to_string(), + phase: None, + }))?, + }) + .to_string(), + ] + .join("\n"); + std::fs::write( + &rollout_file_path, + format!("{persisted_rollout}{appended_rollout}\n"), + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: conversation_id, + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let ThreadResumeResponse { thread, .. } = to_response::(resume_resp)?; + + assert_eq!(thread.status, ThreadStatus::Idle); + assert_eq!(thread.turns.len(), 2); + assert_eq!(thread.turns[0].status, TurnStatus::Completed); + assert_eq!(thread.turns[1].id, turn_id); + assert_eq!(thread.turns[1].status, TurnStatus::Interrupted); + + let second_resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread.id.clone(), + ..Default::default() + }) + .await?; + let second_resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(second_resume_id)), + ) + .await??; + let ThreadResumeResponse { + thread: resumed_again, + .. + } = to_response::(second_resume_resp)?; + + assert_eq!(resumed_again.status, ThreadStatus::Idle); + assert_eq!(resumed_again.turns.len(), 2); + assert_eq!(resumed_again.turns[1].id, turn_id); + assert_eq!(resumed_again.turns[1].status, TurnStatus::Interrupted); + + let read_id = mcp + .send_thread_read_request(ThreadReadParams { + thread_id: resumed_again.id, + include_turns: true, + }) + .await?; + let read_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(read_id)), + ) + .await??; + let ThreadReadResponse { + thread: read_thread, + } = to_response::(read_resp)?; + + assert_eq!(read_thread.status, ThreadStatus::Idle); + assert_eq!(read_thread.turns.len(), 2); + assert_eq!(read_thread.turns[1].id, turn_id); + assert_eq!(read_thread.turns[1].status, TurnStatus::Interrupted); + + Ok(()) +} + #[tokio::test] async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await;