diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 50442021747..690dab1b972 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -7374,6 +7374,9 @@ async fn try_run_sampling_request( let mut should_emit_turn_diff = false; let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan; let mut assistant_message_stream_parsers = AssistantMessageStreamParsers::new(plan_mode); + // Some streams send output text deltas before the assistant message item is announced. + // Hold those early bytes so we can replay them once `output_item.added` arrives. + let mut pending_output_text_delta = String::new(); let mut plan_mode_state = plan_mode.then(|| PlanModeStreamState::new(&turn_context.sub_id)); let receiving_span = trace_span!("receiving_stream"); let outcome: CodexResult = loop { @@ -7413,6 +7416,8 @@ async fn try_run_sampling_request( match event { ResponseEvent::Created => {} ResponseEvent::OutputItemDone(item) => { + let completed_assistant_message = + matches!(&item, ResponseItem::Message { role, .. } if role == "assistant"); let previously_active_item = active_item.take(); if let Some(previous) = previously_active_item.as_ref() && matches!(previous, TurnItem::AgentMessage(_)) @@ -7470,6 +7475,17 @@ async fn try_run_sampling_request( let output_result = handle_output_item_done(&mut ctx, item, previously_active_item) .instrument(handle_responses) .await?; + if completed_assistant_message && !pending_output_text_delta.is_empty() { + // If an assistant message completed while this buffer is still non-empty, + // those bytes could not be matched to that message item. Clear them so they + // don't leak into a later assistant item, but keep the buffer across + // reasoning/tool completions so a delayed message item can still claim it. + warn!( + buffered_len = pending_output_text_delta.len(), + "dropping buffered output text deltas after item completion" + ); + pending_output_text_delta.clear(); + } if let Some(tool_future) = output_result.tool_future { in_flight.push_back(tool_future); } @@ -7495,26 +7511,46 @@ async fn try_run_sampling_request( .await { let mut turn_item = turn_item; - let mut seeded_parsed: Option = None; - let mut seeded_item_id: Option = None; - if matches!(turn_item, TurnItem::AgentMessage(_)) - && let Some(raw_text) = raw_assistant_output_text_from_item(&item) - { + let mut seeded_emit: Option<(String, ParsedAssistantTextDelta)> = None; + let mut pending_emit: Option<(String, ParsedAssistantTextDelta)> = None; + let raw_output_text = matches!(turn_item, TurnItem::AgentMessage(_)) + .then(|| raw_assistant_output_text_from_item(&item)) + .flatten(); + if matches!(turn_item, TurnItem::AgentMessage(_)) { let item_id = turn_item.id(); - let mut seeded = - assistant_message_stream_parsers.seed_item_text(&item_id, &raw_text); - if let TurnItem::AgentMessage(agent_message) = &mut turn_item { - agent_message.content = - vec![codex_protocol::items::AgentMessageContent::Text { - text: if plan_mode { - String::new() - } else { - std::mem::take(&mut seeded.visible_text) - }, - }]; + if let Some(raw_text) = raw_output_text.as_deref() { + let mut seeded = + assistant_message_stream_parsers.seed_item_text(&item_id, raw_text); + if let TurnItem::AgentMessage(agent_message) = &mut turn_item { + agent_message.content = + vec![codex_protocol::items::AgentMessageContent::Text { + text: if plan_mode { + String::new() + } else { + std::mem::take(&mut seeded.visible_text) + }, + }]; + } + seeded_emit = plan_mode.then_some((item_id.clone(), seeded)); + } + if !pending_output_text_delta.is_empty() { + let mut pending_delta = std::mem::take(&mut pending_output_text_delta); + if let Some(raw_text) = raw_output_text.as_deref() { + // The item payload may already contain all or part of the text we + // buffered from pre-item deltas. Trim the overlap so replayed + // deltas don't duplicate visible output. + if pending_delta.starts_with(raw_text) { + pending_delta.drain(..raw_text.len()); + } else if raw_text.starts_with(&pending_delta) { + pending_delta.clear(); + } + } + if !pending_delta.is_empty() { + let parsed = assistant_message_stream_parsers + .parse_delta(&item_id, &pending_delta); + pending_emit = Some((item_id, parsed)); + } } - seeded_parsed = plan_mode.then_some(seeded); - seeded_item_id = Some(item_id); } if let Some(state) = plan_mode_state.as_mut() && matches!(turn_item, TurnItem::AgentMessage(_)) @@ -7526,16 +7562,22 @@ async fn try_run_sampling_request( } else { sess.emit_turn_item_started(&turn_context, &turn_item).await; } - if let (Some(state), Some(item_id), Some(parsed)) = ( - plan_mode_state.as_mut(), - seeded_item_id.as_deref(), - seeded_parsed, - ) { + if let Some((item_id, parsed)) = seeded_emit { emit_streamed_assistant_text_delta( &sess, &turn_context, - Some(state), - item_id, + plan_mode_state.as_mut(), + &item_id, + parsed, + ) + .await; + } + if let Some((item_id, parsed)) = pending_emit { + emit_streamed_assistant_text_delta( + &sess, + &turn_context, + plan_mode_state.as_mut(), + &item_id, parsed, ) .await; @@ -7612,7 +7654,12 @@ async fn try_run_sampling_request( .await; } } else { - error_or_panic("OutputTextDelta without active item".to_string()); + // This event should normally follow `output_item.added`, but tolerate the + // reversed ordering by buffering text until the item metadata arrives. + if pending_output_text_delta.is_empty() { + warn!("buffering OutputTextDelta without active item"); + } + pending_output_text_delta.push_str(&delta); } } ResponseEvent::ReasoningSummaryDelta { @@ -7630,7 +7677,12 @@ async fn try_run_sampling_request( sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event)) .await; } else { - error_or_panic("ReasoningSummaryDelta without active item".to_string()); + // Without an active reasoning item there is nowhere safe to attach this delta. + // Drop the orphan event and rely on the eventual completed item snapshot. + warn!( + summary_index, + "dropping ReasoningSummaryDelta without active item" + ); } } ResponseEvent::ReasoningSummaryPartAdded { summary_index } => { @@ -7642,7 +7694,12 @@ async fn try_run_sampling_request( }); sess.send_event(&turn_context, event).await; } else { - error_or_panic("ReasoningSummaryPartAdded without active item".to_string()); + // Keep section-break handling consistent with summary deltas: a break without + // an active reasoning item is orphaned stream state, not a fatal parser error. + warn!( + summary_index, + "dropping ReasoningSummaryPartAdded without active item" + ); } } ResponseEvent::ReasoningContentDelta { @@ -7660,7 +7717,12 @@ async fn try_run_sampling_request( sess.send_event(&turn_context, EventMsg::ReasoningRawContentDelta(event)) .await; } else { - error_or_panic("ReasoningRawContentDelta without active item".to_string()); + // Raw reasoning deltas can also arrive before their item in malformed or + // reordered streams. Preserve liveness by dropping the orphan event. + warn!( + content_index, + "dropping ReasoningRawContentDelta without active item" + ); } } } diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 2e2155ebddf..cf286b4b718 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -719,6 +719,13 @@ pub fn ev_reasoning_summary_text_delta(delta: &str) -> Value { }) } +pub fn ev_reasoning_summary_part_added(summary_index: i64) -> Value { + serde_json::json!({ + "type": "response.reasoning_summary_part.added", + "summary_index": summary_index, + }) +} + pub fn ev_reasoning_text_delta(delta: &str) -> Value { serde_json::json!({ "type": "response.reasoning_text.delta", diff --git a/codex-rs/core/tests/suite/items.rs b/codex-rs/core/tests/suite/items.rs index f949cd4b975..3980cdad41c 100644 --- a/codex-rs/core/tests/suite/items.rs +++ b/codex-rs/core/tests/suite/items.rs @@ -21,6 +21,7 @@ use core_test_support::responses::ev_message_item_added; use core_test_support::responses::ev_output_text_delta; use core_test_support::responses::ev_reasoning_item; use core_test_support::responses::ev_reasoning_item_added; +use core_test_support::responses::ev_reasoning_summary_part_added; use core_test_support::responses::ev_reasoning_summary_text_delta; use core_test_support::responses::ev_reasoning_text_delta; use core_test_support::responses::ev_response_created; @@ -1061,6 +1062,186 @@ async fn plan_mode_handles_missing_plan_close_tag() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn output_text_delta_before_output_item_added_is_buffered() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let TestCodex { + codex, + session_configured, + .. + } = test_codex().build(&server).await?; + + let stream = sse(vec![ + ev_response_created("resp-1"), + ev_output_text_delta("Hello "), + ev_message_item_added("msg-1", ""), + ev_output_text_delta("world"), + ev_assistant_message("msg-1", "Hello world"), + ev_completed("resp-1"), + ]); + mount_sse_once(&server, stream).await; + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "hello".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: std::env::current_dir()?, + approval_policy: codex_protocol::protocol::AskForApproval::Never, + approvals_reviewer: None, + sandbox_policy: codex_protocol::protocol::SandboxPolicy::DangerFullAccess, + model: session_configured.model.clone(), + effort: None, + summary: None, + service_tier: None, + collaboration_mode: None, + personality: None, + }) + .await?; + + let mut agent_deltas = Vec::new(); + let mut completed = None; + loop { + match wait_for_event(&codex, |_| true).await { + EventMsg::AgentMessageContentDelta(event) => agent_deltas.push(event.delta), + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => completed = Some(item), + EventMsg::TurnComplete(_) => break, + _ => {} + } + } + + assert_eq!(agent_deltas.concat(), "Hello world"); + let completed_text: String = completed + .expect("assistant item completion should be emitted") + .content + .iter() + .map(|entry| match entry { + AgentMessageContent::Text { text } => text.as_str(), + }) + .collect(); + assert_eq!(completed_text, "Hello world"); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn buffered_output_text_survives_intervening_reasoning_item_done() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let TestCodex { + codex, + session_configured, + .. + } = test_codex().build(&server).await?; + + let stream = sse(vec![ + ev_response_created("resp-1"), + ev_output_text_delta("Hello "), + ev_reasoning_item("reasoning-1", &["thinking"], &[]), + ev_message_item_added("msg-1", ""), + ev_output_text_delta("world"), + ev_assistant_message("msg-1", "Hello world"), + ev_completed("resp-1"), + ]); + mount_sse_once(&server, stream).await; + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "hello".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: std::env::current_dir()?, + approval_policy: codex_protocol::protocol::AskForApproval::Never, + approvals_reviewer: None, + sandbox_policy: codex_protocol::protocol::SandboxPolicy::DangerFullAccess, + model: session_configured.model.clone(), + effort: None, + summary: None, + service_tier: None, + collaboration_mode: None, + personality: None, + }) + .await?; + + let mut agent_deltas = Vec::new(); + let mut completed = None; + loop { + match wait_for_event(&codex, |_| true).await { + EventMsg::AgentMessageContentDelta(event) => agent_deltas.push(event.delta), + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => completed = Some(item), + EventMsg::TurnComplete(_) => break, + _ => {} + } + } + + assert_eq!(agent_deltas.concat(), "Hello world"); + let completed_text: String = completed + .expect("assistant item completion should be emitted") + .content + .iter() + .map(|entry| match entry { + AgentMessageContent::Text { text } => text.as_str(), + }) + .collect(); + assert_eq!(completed_text, "Hello world"); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn orphan_reasoning_summary_events_do_not_break_completed_reasoning_item() +-> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let TestCodex { codex, .. } = test_codex().build(&server).await?; + + let stream = sse(vec![ + ev_response_created("resp-1"), + ev_reasoning_summary_part_added(/*summary_index*/ 0), + ev_reasoning_summary_text_delta("Summary only"), + ev_reasoning_item("reasoning-1", &["Summary only"], &[]), + ev_completed("resp-1"), + ]); + mount_sse_once(&server, stream).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "think".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + + let completed = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::Reasoning(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + + assert_eq!(completed.summary_text, vec!["Summary only".to_string()]); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn reasoning_content_delta_has_item_metadata() -> anyhow::Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index b0ddb631bd0..96edfcf98df 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -6016,7 +6016,16 @@ impl ChatWidget { ThreadItem::Reasoning { summary, content, .. } => { - if from_replay { + let mut completed_reasoning = summary.join("\n\n"); + if self.config.show_raw_agent_reasoning { + if !completed_reasoning.is_empty() && !content.is_empty() { + completed_reasoning.push_str("\n\n"); + } + completed_reasoning.push_str(&content.join("\n\n")); + } + let streamed_reasoning = + format!("{}{}", self.full_reasoning_buffer, self.reasoning_buffer); + if from_replay || streamed_reasoning.trim().is_empty() { for delta in summary { self.on_agent_reasoning_delta(delta); } @@ -6025,6 +6034,17 @@ impl ChatWidget { self.on_agent_reasoning_delta(delta); } } + } else if !completed_reasoning.is_empty() + && completed_reasoning != streamed_reasoning + && let Some(missing_prefix) = completed_reasoning + .strip_suffix(&streamed_reasoning) + .filter(|missing_prefix| !missing_prefix.is_empty()) + { + // If early summary deltas were dropped before the item became active, prepend + // the missing prefix from the completed item snapshot and keep the streamed + // suffix so we still avoid double-rendering. + self.full_reasoning_buffer = + format!("{missing_prefix}{}", self.full_reasoning_buffer); } self.on_agent_reasoning_final(); } diff --git a/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__live_reasoning_item_completed_backfills_missing_prefix_across_section_breaks.snap b/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__live_reasoning_item_completed_backfills_missing_prefix_across_section_breaks.snap new file mode 100644 index 00000000000..e4192c48056 --- /dev/null +++ b/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__live_reasoning_item_completed_backfills_missing_prefix_across_section_breaks.snap @@ -0,0 +1,7 @@ +--- +source: tui/src/chatwidget/tests/history_replay.rs +expression: rendered +--- +• prefix middle + + tail diff --git a/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__live_reasoning_item_completed_backfills_missing_prefix_before_streamed_suffix.snap b/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__live_reasoning_item_completed_backfills_missing_prefix_before_streamed_suffix.snap new file mode 100644 index 00000000000..dce699b7bd1 --- /dev/null +++ b/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__live_reasoning_item_completed_backfills_missing_prefix_before_streamed_suffix.snap @@ -0,0 +1,5 @@ +--- +source: tui/src/chatwidget/tests/history_replay.rs +expression: rendered +--- +• prefix suffix diff --git a/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__live_reasoning_item_completed_renders_summary_without_prior_delta.snap b/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__live_reasoning_item_completed_renders_summary_without_prior_delta.snap new file mode 100644 index 00000000000..b62a32a1fcb --- /dev/null +++ b/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__live_reasoning_item_completed_renders_summary_without_prior_delta.snap @@ -0,0 +1,5 @@ +--- +source: tui/src/chatwidget/tests/history_replay.rs +expression: rendered +--- +• Summary only diff --git a/codex-rs/tui/src/chatwidget/tests/history_replay.rs b/codex-rs/tui/src/chatwidget/tests/history_replay.rs index 58993d67e89..e22b8002120 100644 --- a/codex-rs/tui/src/chatwidget/tests/history_replay.rs +++ b/codex-rs/tui/src/chatwidget/tests/history_replay.rs @@ -1,4 +1,5 @@ use super::*; +use codex_app_server_protocol::ReasoningSummaryPartAddedNotification; use pretty_assertions::assert_eq; #[tokio::test] @@ -725,6 +726,183 @@ async fn live_reasoning_summary_is_not_rendered_twice_when_item_completes() { assert_eq!(rendered.matches("Summary only").count(), 1); } +#[tokio::test] +async fn live_reasoning_item_completed_renders_summary_without_prior_delta() { + let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await; + chat.show_welcome_banner = false; + + chat.handle_server_notification( + ServerNotification::TurnStarted(TurnStartedNotification { + thread_id: "thread-1".to_string(), + turn: AppServerTurn { + id: "turn-1".to_string(), + items: Vec::new(), + status: AppServerTurnStatus::InProgress, + error: None, + }, + }), + /*replay_kind*/ None, + ); + let _ = drain_insert_history(&mut rx); + + chat.handle_server_notification( + ServerNotification::ItemCompleted(ItemCompletedNotification { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + item: AppServerThreadItem::Reasoning { + id: "reasoning-1".to_string(), + summary: vec!["Summary only".to_string()], + content: Vec::new(), + }, + }), + /*replay_kind*/ None, + ); + + let rendered = match rx.try_recv() { + Ok(AppEvent::InsertHistoryCell(cell)) => { + lines_to_single_string(&cell.transcript_lines(/*width*/ 80)) + } + other => panic!("expected InsertHistoryCell, got {other:?}"), + }; + assert!(rendered.contains("Summary only")); + assert_chatwidget_snapshot!( + "live_reasoning_item_completed_renders_summary_without_prior_delta", + rendered + ); +} + +#[tokio::test] +async fn live_reasoning_item_completed_backfills_missing_prefix_before_streamed_suffix() { + let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await; + chat.show_welcome_banner = false; + + chat.handle_server_notification( + ServerNotification::TurnStarted(TurnStartedNotification { + thread_id: "thread-1".to_string(), + turn: AppServerTurn { + id: "turn-1".to_string(), + items: Vec::new(), + status: AppServerTurnStatus::InProgress, + error: None, + }, + }), + /*replay_kind*/ None, + ); + let _ = drain_insert_history(&mut rx); + + chat.handle_server_notification( + ServerNotification::ReasoningSummaryTextDelta(ReasoningSummaryTextDeltaNotification { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + item_id: "reasoning-1".to_string(), + delta: "suffix".to_string(), + summary_index: 0, + }), + /*replay_kind*/ None, + ); + + chat.handle_server_notification( + ServerNotification::ItemCompleted(ItemCompletedNotification { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + item: AppServerThreadItem::Reasoning { + id: "reasoning-1".to_string(), + summary: vec!["prefix suffix".to_string()], + content: Vec::new(), + }, + }), + /*replay_kind*/ None, + ); + + let rendered = match rx.try_recv() { + Ok(AppEvent::InsertHistoryCell(cell)) => { + lines_to_single_string(&cell.transcript_lines(/*width*/ 80)) + } + other => panic!("expected InsertHistoryCell, got {other:?}"), + }; + assert_eq!(rendered.matches("prefix suffix").count(), 1); + assert_chatwidget_snapshot!( + "live_reasoning_item_completed_backfills_missing_prefix_before_streamed_suffix", + rendered + ); +} + +#[tokio::test] +async fn live_reasoning_item_completed_backfills_missing_prefix_across_section_breaks() { + let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await; + chat.show_welcome_banner = false; + + chat.handle_server_notification( + ServerNotification::TurnStarted(TurnStartedNotification { + thread_id: "thread-1".to_string(), + turn: AppServerTurn { + id: "turn-1".to_string(), + items: Vec::new(), + status: AppServerTurnStatus::InProgress, + error: None, + }, + }), + /*replay_kind*/ None, + ); + let _ = drain_insert_history(&mut rx); + + chat.handle_server_notification( + ServerNotification::ReasoningSummaryTextDelta(ReasoningSummaryTextDeltaNotification { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + item_id: "reasoning-1".to_string(), + delta: "middle".to_string(), + summary_index: 0, + }), + /*replay_kind*/ None, + ); + chat.handle_server_notification( + ServerNotification::ReasoningSummaryPartAdded(ReasoningSummaryPartAddedNotification { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + item_id: "reasoning-1".to_string(), + summary_index: 1, + }), + /*replay_kind*/ None, + ); + chat.handle_server_notification( + ServerNotification::ReasoningSummaryTextDelta(ReasoningSummaryTextDeltaNotification { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + item_id: "reasoning-1".to_string(), + delta: "tail".to_string(), + summary_index: 1, + }), + /*replay_kind*/ None, + ); + + chat.handle_server_notification( + ServerNotification::ItemCompleted(ItemCompletedNotification { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + item: AppServerThreadItem::Reasoning { + id: "reasoning-1".to_string(), + summary: vec!["prefix middle".to_string(), "tail".to_string()], + content: Vec::new(), + }, + }), + /*replay_kind*/ None, + ); + + let rendered = match rx.try_recv() { + Ok(AppEvent::InsertHistoryCell(cell)) => { + lines_to_single_string(&cell.transcript_lines(/*width*/ 80)) + } + other => panic!("expected InsertHistoryCell, got {other:?}"), + }; + assert_eq!(rendered.matches("prefix middle").count(), 1); + assert!(rendered.contains("tail")); + assert_chatwidget_snapshot!( + "live_reasoning_item_completed_backfills_missing_prefix_across_section_breaks", + rendered + ); +} + #[tokio::test] async fn replayed_turn_started_does_not_mark_task_running() { let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;