Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 91 additions & 29 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7374,6 +7374,9 @@ async fn try_run_sampling_request(
let mut should_emit_turn_diff = false;
let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan;
let mut assistant_message_stream_parsers = AssistantMessageStreamParsers::new(plan_mode);
// Some streams send output text deltas before the assistant message item is announced.
// Hold those early bytes so we can replay them once `output_item.added` arrives.
let mut pending_output_text_delta = String::new();
let mut plan_mode_state = plan_mode.then(|| PlanModeStreamState::new(&turn_context.sub_id));
let receiving_span = trace_span!("receiving_stream");
let outcome: CodexResult<SamplingRequestResult> = loop {
Expand Down Expand Up @@ -7413,6 +7416,8 @@ async fn try_run_sampling_request(
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
let completed_assistant_message =
matches!(&item, ResponseItem::Message { role, .. } if role == "assistant");
let previously_active_item = active_item.take();
if let Some(previous) = previously_active_item.as_ref()
&& matches!(previous, TurnItem::AgentMessage(_))
Expand Down Expand Up @@ -7470,6 +7475,17 @@ async fn try_run_sampling_request(
let output_result = handle_output_item_done(&mut ctx, item, previously_active_item)
.instrument(handle_responses)
.await?;
if completed_assistant_message && !pending_output_text_delta.is_empty() {
// If an assistant message completed while this buffer is still non-empty,
// those bytes could not be matched to that message item. Clear them so they
// don't leak into a later assistant item, but keep the buffer across
// reasoning/tool completions so a delayed message item can still claim it.
warn!(
buffered_len = pending_output_text_delta.len(),
"dropping buffered output text deltas after item completion"
);
pending_output_text_delta.clear();
}
if let Some(tool_future) = output_result.tool_future {
in_flight.push_back(tool_future);
}
Expand All @@ -7495,26 +7511,46 @@ async fn try_run_sampling_request(
.await
{
let mut turn_item = turn_item;
let mut seeded_parsed: Option<ParsedAssistantTextDelta> = None;
let mut seeded_item_id: Option<String> = None;
if matches!(turn_item, TurnItem::AgentMessage(_))
&& let Some(raw_text) = raw_assistant_output_text_from_item(&item)
{
let mut seeded_emit: Option<(String, ParsedAssistantTextDelta)> = None;
let mut pending_emit: Option<(String, ParsedAssistantTextDelta)> = None;
let raw_output_text = matches!(turn_item, TurnItem::AgentMessage(_))
.then(|| raw_assistant_output_text_from_item(&item))
.flatten();
if matches!(turn_item, TurnItem::AgentMessage(_)) {
let item_id = turn_item.id();
let mut seeded =
assistant_message_stream_parsers.seed_item_text(&item_id, &raw_text);
if let TurnItem::AgentMessage(agent_message) = &mut turn_item {
agent_message.content =
vec![codex_protocol::items::AgentMessageContent::Text {
text: if plan_mode {
String::new()
} else {
std::mem::take(&mut seeded.visible_text)
},
}];
if let Some(raw_text) = raw_output_text.as_deref() {
let mut seeded =
assistant_message_stream_parsers.seed_item_text(&item_id, raw_text);
if let TurnItem::AgentMessage(agent_message) = &mut turn_item {
agent_message.content =
vec![codex_protocol::items::AgentMessageContent::Text {
text: if plan_mode {
String::new()
} else {
std::mem::take(&mut seeded.visible_text)
},
}];
}
seeded_emit = plan_mode.then_some((item_id.clone(), seeded));
}
if !pending_output_text_delta.is_empty() {
let mut pending_delta = std::mem::take(&mut pending_output_text_delta);
if let Some(raw_text) = raw_output_text.as_deref() {
// The item payload may already contain all or part of the text we
// buffered from pre-item deltas. Trim the overlap so replayed
// deltas don't duplicate visible output.
if pending_delta.starts_with(raw_text) {
pending_delta.drain(..raw_text.len());
} else if raw_text.starts_with(&pending_delta) {
pending_delta.clear();
}
}
if !pending_delta.is_empty() {
let parsed = assistant_message_stream_parsers
.parse_delta(&item_id, &pending_delta);
pending_emit = Some((item_id, parsed));
}
}
seeded_parsed = plan_mode.then_some(seeded);
seeded_item_id = Some(item_id);
}
if let Some(state) = plan_mode_state.as_mut()
&& matches!(turn_item, TurnItem::AgentMessage(_))
Expand All @@ -7526,16 +7562,22 @@ async fn try_run_sampling_request(
} else {
sess.emit_turn_item_started(&turn_context, &turn_item).await;
}
if let (Some(state), Some(item_id), Some(parsed)) = (
plan_mode_state.as_mut(),
seeded_item_id.as_deref(),
seeded_parsed,
) {
if let Some((item_id, parsed)) = seeded_emit {
emit_streamed_assistant_text_delta(
&sess,
&turn_context,
Some(state),
item_id,
plan_mode_state.as_mut(),
&item_id,
parsed,
)
.await;
}
if let Some((item_id, parsed)) = pending_emit {
emit_streamed_assistant_text_delta(
&sess,
&turn_context,
plan_mode_state.as_mut(),
&item_id,
parsed,
)
.await;
Expand Down Expand Up @@ -7612,7 +7654,12 @@ async fn try_run_sampling_request(
.await;
}
} else {
error_or_panic("OutputTextDelta without active item".to_string());
// This event should normally follow `output_item.added`, but tolerate the
// reversed ordering by buffering text until the item metadata arrives.
if pending_output_text_delta.is_empty() {
warn!("buffering OutputTextDelta without active item");
}
pending_output_text_delta.push_str(&delta);
}
}
ResponseEvent::ReasoningSummaryDelta {
Expand All @@ -7630,7 +7677,12 @@ async fn try_run_sampling_request(
sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event))
.await;
} else {
error_or_panic("ReasoningSummaryDelta without active item".to_string());
// Without an active reasoning item there is nowhere safe to attach this delta.
// Drop the orphan event and rely on the eventual completed item snapshot.
warn!(
summary_index,
"dropping ReasoningSummaryDelta without active item"
);
}
}
ResponseEvent::ReasoningSummaryPartAdded { summary_index } => {
Expand All @@ -7642,7 +7694,12 @@ async fn try_run_sampling_request(
});
sess.send_event(&turn_context, event).await;
} else {
error_or_panic("ReasoningSummaryPartAdded without active item".to_string());
// Keep section-break handling consistent with summary deltas: a break without
// an active reasoning item is orphaned stream state, not a fatal parser error.
warn!(
summary_index,
"dropping ReasoningSummaryPartAdded without active item"
);
}
}
ResponseEvent::ReasoningContentDelta {
Expand All @@ -7660,7 +7717,12 @@ async fn try_run_sampling_request(
sess.send_event(&turn_context, EventMsg::ReasoningRawContentDelta(event))
.await;
} else {
error_or_panic("ReasoningRawContentDelta without active item".to_string());
// Raw reasoning deltas can also arrive before their item in malformed or
// reordered streams. Preserve liveness by dropping the orphan event.
warn!(
content_index,
"dropping ReasoningRawContentDelta without active item"
);
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions codex-rs/core/tests/common/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,13 @@ pub fn ev_reasoning_summary_text_delta(delta: &str) -> Value {
})
}

pub fn ev_reasoning_summary_part_added(summary_index: i64) -> Value {
serde_json::json!({
"type": "response.reasoning_summary_part.added",
"summary_index": summary_index,
})
}

pub fn ev_reasoning_text_delta(delta: &str) -> Value {
serde_json::json!({
"type": "response.reasoning_text.delta",
Expand Down
181 changes: 181 additions & 0 deletions codex-rs/core/tests/suite/items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use core_test_support::responses::ev_message_item_added;
use core_test_support::responses::ev_output_text_delta;
use core_test_support::responses::ev_reasoning_item;
use core_test_support::responses::ev_reasoning_item_added;
use core_test_support::responses::ev_reasoning_summary_part_added;
use core_test_support::responses::ev_reasoning_summary_text_delta;
use core_test_support::responses::ev_reasoning_text_delta;
use core_test_support::responses::ev_response_created;
Expand Down Expand Up @@ -1061,6 +1062,186 @@ async fn plan_mode_handles_missing_plan_close_tag() -> anyhow::Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn output_text_delta_before_output_item_added_is_buffered() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));

let server = start_mock_server().await;
let TestCodex {
codex,
session_configured,
..
} = test_codex().build(&server).await?;

let stream = sse(vec![
ev_response_created("resp-1"),
ev_output_text_delta("Hello "),
ev_message_item_added("msg-1", ""),
ev_output_text_delta("world"),
ev_assistant_message("msg-1", "Hello world"),
ev_completed("resp-1"),
]);
mount_sse_once(&server, stream).await;

codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: std::env::current_dir()?,
approval_policy: codex_protocol::protocol::AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy: codex_protocol::protocol::SandboxPolicy::DangerFullAccess,
model: session_configured.model.clone(),
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;

let mut agent_deltas = Vec::new();
let mut completed = None;
loop {
match wait_for_event(&codex, |_| true).await {
EventMsg::AgentMessageContentDelta(event) => agent_deltas.push(event.delta),
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::AgentMessage(item),
..
}) => completed = Some(item),
EventMsg::TurnComplete(_) => break,
_ => {}
}
}

assert_eq!(agent_deltas.concat(), "Hello world");
let completed_text: String = completed
.expect("assistant item completion should be emitted")
.content
.iter()
.map(|entry| match entry {
AgentMessageContent::Text { text } => text.as_str(),
})
.collect();
assert_eq!(completed_text, "Hello world");

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn buffered_output_text_survives_intervening_reasoning_item_done() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));

let server = start_mock_server().await;
let TestCodex {
codex,
session_configured,
..
} = test_codex().build(&server).await?;

let stream = sse(vec![
ev_response_created("resp-1"),
ev_output_text_delta("Hello "),
ev_reasoning_item("reasoning-1", &["thinking"], &[]),
ev_message_item_added("msg-1", ""),
ev_output_text_delta("world"),
ev_assistant_message("msg-1", "Hello world"),
ev_completed("resp-1"),
]);
mount_sse_once(&server, stream).await;

codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: std::env::current_dir()?,
approval_policy: codex_protocol::protocol::AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy: codex_protocol::protocol::SandboxPolicy::DangerFullAccess,
model: session_configured.model.clone(),
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;

let mut agent_deltas = Vec::new();
let mut completed = None;
loop {
match wait_for_event(&codex, |_| true).await {
EventMsg::AgentMessageContentDelta(event) => agent_deltas.push(event.delta),
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::AgentMessage(item),
..
}) => completed = Some(item),
EventMsg::TurnComplete(_) => break,
_ => {}
}
}

assert_eq!(agent_deltas.concat(), "Hello world");
let completed_text: String = completed
.expect("assistant item completion should be emitted")
.content
.iter()
.map(|entry| match entry {
AgentMessageContent::Text { text } => text.as_str(),
})
.collect();
assert_eq!(completed_text, "Hello world");

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn orphan_reasoning_summary_events_do_not_break_completed_reasoning_item()
-> anyhow::Result<()> {
skip_if_no_network!(Ok(()));

let server = start_mock_server().await;
let TestCodex { codex, .. } = test_codex().build(&server).await?;

let stream = sse(vec![
ev_response_created("resp-1"),
ev_reasoning_summary_part_added(/*summary_index*/ 0),
ev_reasoning_summary_text_delta("Summary only"),
ev_reasoning_item("reasoning-1", &["Summary only"], &[]),
ev_completed("resp-1"),
]);
mount_sse_once(&server, stream).await;

codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "think".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;

let completed = wait_for_event_match(&codex, |ev| match ev {
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::Reasoning(item),
..
}) => Some(item.clone()),
_ => None,
})
.await;

assert_eq!(completed.summary_text, vec!["Summary only".to_string()]);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reasoning_content_delta_has_item_metadata() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
Expand Down
Loading
Loading