Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 56 additions & 26 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2979,7 +2979,7 @@ impl CodexMessageProcessor {
}
}
} else {
let Some(thread) = loaded_thread else {
let Some(thread) = loaded_thread.as_ref() else {
self.send_invalid_request_error(
request_id,
format!("thread not loaded: {thread_uuid}"),
Expand Down Expand Up @@ -3033,11 +3033,21 @@ impl CodexMessageProcessor {
}
}

thread.status = resolve_thread_status(
self.thread_watch_manager
.loaded_status_for_thread(&thread.id)
.await,
false,
let has_live_in_progress_turn = if let Some(loaded_thread) = loaded_thread.as_ref() {
matches!(loaded_thread.agent_status().await, AgentStatus::Running)
} else {
false
};

let thread_status = self
.thread_watch_manager
.loaded_status_for_thread(&thread.id)
.await;

set_thread_status_and_interrupt_stale_turns(
&mut thread,
thread_status,
has_live_in_progress_turn,
);
let response = ThreadReadResponse { thread };
self.outgoing.send_response(request_id, response).await;
Expand Down Expand Up @@ -3245,12 +3255,12 @@ impl CodexMessageProcessor {
.upsert_thread(thread.clone())
.await;

thread.status = resolve_thread_status(
self.thread_watch_manager
.loaded_status_for_thread(&thread.id)
.await,
false,
);
let thread_status = self
.thread_watch_manager
.loaded_status_for_thread(&thread.id)
.await;

set_thread_status_and_interrupt_stale_turns(&mut thread, thread_status, false);

let response = ThreadResumeResponse {
thread,
Expand Down Expand Up @@ -6350,6 +6360,7 @@ impl CodexMessageProcessor {
};
handle_thread_listener_command(
conversation_id,
&conversation,
codex_home.as_path(),
&thread_state_manager,
&thread_state,
Expand Down Expand Up @@ -6712,8 +6723,10 @@ impl CodexMessageProcessor {
}
}

#[allow(clippy::too_many_arguments)]
async fn handle_thread_listener_command(
conversation_id: ThreadId,
conversation: &Arc<CodexThread>,
codex_home: &Path,
thread_state_manager: &ThreadStateManager,
thread_state: &Arc<Mutex<ThreadState>>,
Expand All @@ -6725,6 +6738,7 @@ async fn handle_thread_listener_command(
ThreadListenerCommand::SendThreadResumeResponse(resume_request) => {
handle_pending_thread_resume_request(
conversation_id,
conversation,
codex_home,
thread_state_manager,
thread_state,
Expand All @@ -6750,8 +6764,10 @@ async fn handle_thread_listener_command(
}
}

#[allow(clippy::too_many_arguments)]
async fn handle_pending_thread_resume_request(
conversation_id: ThreadId,
conversation: &Arc<CodexThread>,
codex_home: &Path,
thread_state_manager: &ThreadStateManager,
thread_state: &Arc<Mutex<ThreadState>>,
Expand All @@ -6771,9 +6787,11 @@ async fn handle_pending_thread_resume_request(
active_turn_status = ?active_turn.as_ref().map(|turn| &turn.status),
"composing running thread resume response"
);
let mut has_in_progress_turn = active_turn
.as_ref()
.is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress));
let has_live_in_progress_turn =
matches!(conversation.agent_status().await, AgentStatus::Running)
|| active_turn
.as_ref()
.is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress));

let request_id = pending.request_id;
let connection_id = request_id.connection_id;
Expand All @@ -6798,19 +6816,15 @@ async fn handle_pending_thread_resume_request(
return;
}

has_in_progress_turn = has_in_progress_turn
|| thread
.turns
.iter()
.any(|turn| matches!(turn.status, TurnStatus::InProgress));
let thread_status = thread_watch_manager
.loaded_status_for_thread(&thread.id)
.await;

let status = resolve_thread_status(
thread_watch_manager
.loaded_status_for_thread(&thread.id)
.await,
has_in_progress_turn,
set_thread_status_and_interrupt_stale_turns(
&mut thread,
thread_status,
has_live_in_progress_turn,
);
thread.status = status;

match find_thread_name_by_id(codex_home, &conversation_id).await {
Ok(thread_name) => thread.name = thread_name,
Expand Down Expand Up @@ -6908,6 +6922,22 @@ fn merge_turn_history_with_active_turn(turns: &mut Vec<Turn>, active_turn: Turn)
turns.push(active_turn);
}

fn set_thread_status_and_interrupt_stale_turns(
thread: &mut Thread,
loaded_status: ThreadStatus,
has_live_in_progress_turn: bool,
) {
let status = resolve_thread_status(loaded_status, has_live_in_progress_turn);
if !matches!(status, ThreadStatus::Active { .. }) {
for turn in &mut thread.turns {
if matches!(turn.status, TurnStatus::InProgress) {
turn.status = TurnStatus::Interrupted;
}
}
}
thread.status = status;
}

fn collect_resume_override_mismatches(
request: &ThreadResumeParams,
config_snapshot: &ThreadConfigSnapshot,
Expand Down
119 changes: 119 additions & 0 deletions codex-rs/app-server/tests/suite/v2/thread_resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadMetadataGitInfoUpdateParams;
use codex_app_server_protocol::ThreadMetadataUpdateParams;
use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadReadResponse;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
Expand All @@ -38,9 +40,12 @@ use codex_protocol::ThreadId;
use codex_protocol::config_types::Personality;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource as RolloutSessionSource;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::user_input::ByteRange;
use codex_protocol::user_input::TextElement;
use codex_state::StateRuntime;
Expand Down Expand Up @@ -398,6 +403,120 @@ stream_max_retries = 0
Ok(())
}

#[tokio::test]
async fn thread_resume_and_read_interrupt_incomplete_rollout_turn_when_thread_is_idle() -> Result<()>
{
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;

let filename_ts = "2025-01-05T12-00-00";
let meta_rfc3339 = "2025-01-05T12:00:00Z";
let conversation_id = create_fake_rollout_with_text_elements(
codex_home.path(),
filename_ts,
meta_rfc3339,
"Saved user message",
Vec::new(),
Some("mock_provider"),
None,
)?;
let rollout_file_path = rollout_path(codex_home.path(), filename_ts, &conversation_id);
let persisted_rollout = std::fs::read_to_string(&rollout_file_path)?;
let turn_id = "incomplete-turn";
let appended_rollout = [
json!({
"timestamp": meta_rfc3339,
"type": "event_msg",
"payload": serde_json::to_value(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_id.to_string(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
}))?,
})
.to_string(),
json!({
"timestamp": meta_rfc3339,
"type": "event_msg",
"payload": serde_json::to_value(EventMsg::AgentMessage(AgentMessageEvent {
message: "Still running".to_string(),
phase: None,
}))?,
})
.to_string(),
]
.join("\n");
std::fs::write(
&rollout_file_path,
format!("{persisted_rollout}{appended_rollout}\n"),
)?;

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: conversation_id,
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;

assert_eq!(thread.status, ThreadStatus::Idle);
assert_eq!(thread.turns.len(), 2);
assert_eq!(thread.turns[0].status, TurnStatus::Completed);
assert_eq!(thread.turns[1].id, turn_id);
assert_eq!(thread.turns[1].status, TurnStatus::Interrupted);

let second_resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id.clone(),
..Default::default()
})
.await?;
let second_resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(second_resume_id)),
)
.await??;
let ThreadResumeResponse {
thread: resumed_again,
..
} = to_response::<ThreadResumeResponse>(second_resume_resp)?;

assert_eq!(resumed_again.status, ThreadStatus::Idle);
assert_eq!(resumed_again.turns.len(), 2);
assert_eq!(resumed_again.turns[1].id, turn_id);
assert_eq!(resumed_again.turns[1].status, TurnStatus::Interrupted);

let read_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: resumed_again.id,
include_turns: true,
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse {
thread: read_thread,
} = to_response::<ThreadReadResponse>(read_resp)?;

assert_eq!(read_thread.status, ThreadStatus::Idle);
assert_eq!(read_thread.turns.len(), 2);
assert_eq!(read_thread.turns[1].id, turn_id);
assert_eq!(read_thread.turns[1].status, TurnStatus::Interrupted);

Ok(())
}

#[tokio::test]
async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
Expand Down
Loading