Skip to content

Commit 6612840

Browse files
cooper-oaicodex
andcommitted
[codex-core] Clean up server-side compaction handling [ci changed_files]
Co-authored-by: Codex <noreply@openai.com>
1 parent f92ce48 commit 6612840

3 files changed

Lines changed: 41 additions & 139 deletions

File tree

codex-rs/core/src/codex.rs

Lines changed: 37 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ use crate::realtime_conversation::handle_start as handle_realtime_conversation_s
4141
use crate::realtime_conversation::handle_text as handle_realtime_conversation_text;
4242
use crate::rollout::session_index;
4343
use crate::stream_events_utils::HandleOutputCtx;
44-
use crate::stream_events_utils::ServerSideCompaction;
4544
use crate::stream_events_utils::handle_non_tool_response_item;
4645
use crate::stream_events_utils::handle_output_item_done;
4746
use crate::stream_events_utils::last_assistant_message_from_item;
@@ -81,6 +80,7 @@ use codex_protocol::config_types::Settings;
8180
use codex_protocol::config_types::WebSearchMode;
8281
use codex_protocol::dynamic_tools::DynamicToolResponse;
8382
use codex_protocol::dynamic_tools::DynamicToolSpec;
83+
use codex_protocol::items::ContextCompactionItem;
8484
use codex_protocol::items::PlanItem;
8585
use codex_protocol::items::TurnItem;
8686
use codex_protocol::items::UserMessageItem;
@@ -3487,7 +3487,7 @@ impl Session {
34873487
pub(crate) async fn record_context_updates_and_set_reference_context_item(
34883488
&self,
34893489
turn_context: &TurnContext,
3490-
) -> Vec<ResponseItem> {
3490+
) {
34913491
let reference_context_item = {
34923492
let state = self.state.lock().await;
34933493
state.reference_context_item()
@@ -3514,7 +3514,6 @@ impl Session {
35143514
// context items. This keeps later runtime diffing aligned with the current turn state.
35153515
let mut state = self.state.lock().await;
35163516
state.set_reference_context_item(Some(turn_context_item));
3517-
context_items
35183517
}
35193518

35203519
pub(crate) async fn update_token_usage_info(
@@ -5467,12 +5466,8 @@ pub(crate) async fn run_turn(
54675466

54685467
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
54695468
let response_item: ResponseItem = initial_input_for_turn.clone().into();
5470-
sess.record_user_prompt_and_emit_turn_item(
5471-
turn_context.as_ref(),
5472-
&input,
5473-
response_item.clone(),
5474-
)
5475-
.await;
5469+
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
5470+
.await;
54765471
// Track the previous-turn baseline from the regular user-turn path only so
54775472
// standalone tasks (compact/shell/review/undo) cannot suppress future
54785473
// model/realtime injections.
@@ -5567,22 +5562,9 @@ pub(crate) async fn run_turn(
55675562
.await
55685563
{
55695564
Ok(sampling_request_output) => {
5570-
if inline_server_side_compaction_threshold(&sess, &turn_context).is_some() {
5571-
let result = if sampling_request_output.observed_server_side_compaction {
5572-
"applied"
5573-
} else {
5574-
"skipped"
5575-
};
5576-
sess.services.session_telemetry.counter(
5577-
"codex.inline_compaction",
5578-
1,
5579-
&[("result", result)],
5580-
);
5581-
}
55825565
let SamplingRequestResult {
55835566
needs_follow_up,
55845567
last_agent_message: sampling_request_last_agent_message,
5585-
observed_server_side_compaction: _,
55865568
} = sampling_request_output;
55875569
let total_usage_tokens = sess.get_total_token_usage().await;
55885570
let token_limit_reached = total_usage_tokens >= auto_compact_limit;
@@ -5601,19 +5583,18 @@ pub(crate) async fn run_turn(
56015583
);
56025584

56035585
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
5604-
if token_limit_reached && needs_follow_up {
5605-
if inline_server_side_compaction_threshold(&sess, &turn_context).is_some() {
5606-
} else if run_auto_compact(
5586+
if token_limit_reached
5587+
&& needs_follow_up
5588+
&& inline_server_side_compaction_threshold(&sess, &turn_context).is_none()
5589+
&& run_auto_compact(
56075590
&sess,
56085591
&turn_context,
56095592
InitialContextInjection::BeforeLastUserMessage,
56105593
)
56115594
.await
56125595
.is_err()
5613-
{
5614-
return None;
5615-
}
5616-
continue;
5596+
{
5597+
return None;
56175598
}
56185599

56195600
if !needs_follow_up {
@@ -5723,9 +5704,8 @@ fn inline_server_side_compaction_threshold(
57235704
if !should_use_remote_compact_task(&turn_context.provider) {
57245705
return None;
57255706
}
5726-
// OpenAI inline auto-compaction uses Responses `context_management`, which has no
5727-
// compaction-prompt field. Auto-compaction therefore ignores `compact_prompt`, while manual
5728-
// `/compact` still uses the point-in-time compact endpoint.
5707+
// Inline Responses compaction has no prompt override; manual `/compact` still uses the
5708+
// point-in-time compact endpoint.
57295709
turn_context.model_info.auto_compact_token_limit()
57305710
}
57315711

@@ -6217,7 +6197,6 @@ async fn built_tools(
62176197
struct SamplingRequestResult {
62186198
needs_follow_up: bool,
62196199
last_agent_message: Option<String>,
6220-
observed_server_side_compaction: bool,
62216200
}
62226201

62236202
/// Ephemeral per-response state for streaming a single proposed plan.
@@ -6806,7 +6785,6 @@ async fn try_run_sampling_request(
68066785
FuturesOrdered::new();
68076786
let mut needs_follow_up = false;
68086787
let mut last_agent_message: Option<String> = None;
6809-
let mut observed_server_side_compaction = false;
68106788
let mut active_item: Option<TurnItem> = None;
68116789
let mut should_emit_turn_diff = false;
68126790
let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan;
@@ -6850,7 +6828,6 @@ async fn try_run_sampling_request(
68506828
match event {
68516829
ResponseEvent::Created => {}
68526830
ResponseEvent::OutputItemDone(item) => {
6853-
let saw_server_side_compaction = matches!(item, ResponseItem::Compaction { .. });
68546831
let previously_active_item = active_item.take();
68556832
if let Some(previous) = previously_active_item.as_ref()
68566833
&& matches!(previous, TurnItem::AgentMessage(_))
@@ -6879,6 +6856,28 @@ async fn try_run_sampling_request(
68796856
continue;
68806857
}
68816858

6859+
if matches!(item, ResponseItem::Compaction { .. }) {
6860+
let turn_item = TurnItem::ContextCompaction(match previously_active_item {
6861+
Some(TurnItem::ContextCompaction(item)) => item,
6862+
_ => ContextCompactionItem::new(),
6863+
});
6864+
debug!(
6865+
turn_id = %turn_context.sub_id,
6866+
"emitting streamed server-side raw compaction item for immediate local checkpoint apply"
6867+
);
6868+
sess.send_event(
6869+
&turn_context,
6870+
EventMsg::RawResponseItem(RawResponseItemEvent { item: item.clone() }),
6871+
)
6872+
.await;
6873+
sess.apply_server_side_compaction(turn_context.as_ref(), item)
6874+
.await;
6875+
sess.emit_turn_item_started(&turn_context, &turn_item).await;
6876+
sess.emit_turn_item_completed(&turn_context, turn_item)
6877+
.await;
6878+
continue;
6879+
}
6880+
68826881
let mut ctx = HandleOutputCtx {
68836882
sess: sess.clone(),
68846883
turn_context: turn_context.clone(),
@@ -6889,16 +6888,6 @@ async fn try_run_sampling_request(
68896888
let output_result = handle_output_item_done(&mut ctx, item, previously_active_item)
68906889
.instrument(handle_responses)
68916890
.await?;
6892-
observed_server_side_compaction |= saw_server_side_compaction;
6893-
if let Some(ServerSideCompaction { item, turn_item }) =
6894-
output_result.server_side_compaction
6895-
{
6896-
sess.apply_server_side_compaction(turn_context.as_ref(), item)
6897-
.await;
6898-
sess.emit_turn_item_started(&turn_context, &turn_item).await;
6899-
sess.emit_turn_item_completed(&turn_context, turn_item)
6900-
.await;
6901-
}
69026891
if let Some(tool_future) = output_result.tool_future {
69036892
in_flight.push_back(tool_future);
69046893
}
@@ -6908,14 +6897,13 @@ async fn try_run_sampling_request(
69086897
needs_follow_up |= output_result.needs_follow_up;
69096898
}
69106899
ResponseEvent::OutputItemAdded(item) => {
6900+
if matches!(item, ResponseItem::Compaction { .. }) {
6901+
continue;
6902+
}
69116903
if let Some(turn_item) =
69126904
handle_non_tool_response_item(&item, plan_mode, Some(&turn_context.cwd)).await
69136905
{
69146906
let mut turn_item = turn_item;
6915-
if matches!(turn_item, TurnItem::ContextCompaction(_)) {
6916-
active_item = Some(turn_item);
6917-
continue;
6918-
}
69196907
let mut seeded_parsed: Option<ParsedAssistantTextDelta> = None;
69206908
let mut seeded_item_id: Option<String> = None;
69216909
if matches!(turn_item, TurnItem::AgentMessage(_))
@@ -7005,7 +6993,6 @@ async fn try_run_sampling_request(
70056993
break Ok(SamplingRequestResult {
70066994
needs_follow_up,
70076995
last_agent_message,
7008-
observed_server_side_compaction,
70096996
});
70106997
}
70116998
ResponseEvent::OutputTextDelta(delta) => {

codex-rs/core/src/codex_tests.rs

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3703,62 +3703,6 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input()
37033703
));
37043704
}
37053705

3706-
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
3707-
async fn inline_compaction_output_item_returns_immediate_compaction_apply_signal() {
3708-
let (sess, tc, rx) = make_session_and_context_with_rx().await;
3709-
while rx.try_recv().is_ok() {}
3710-
3711-
let router = Arc::new(ToolRouter::from_config(
3712-
&tc.tools_config,
3713-
None,
3714-
None,
3715-
tc.dynamic_tools.as_slice(),
3716-
));
3717-
let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
3718-
let mut ctx = crate::stream_events_utils::HandleOutputCtx {
3719-
sess: Arc::clone(&sess),
3720-
turn_context: Arc::clone(&tc),
3721-
tool_runtime: crate::tools::parallel::ToolCallRuntime::new(
3722-
router,
3723-
Arc::clone(&sess),
3724-
Arc::clone(&tc),
3725-
tracker,
3726-
),
3727-
cancellation_token: tokio_util::sync::CancellationToken::new(),
3728-
};
3729-
let item = ResponseItem::Compaction {
3730-
encrypted_content: "INLINE_SERVER_SUMMARY".to_string(),
3731-
};
3732-
3733-
let output = crate::stream_events_utils::handle_output_item_done(
3734-
&mut ctx,
3735-
item.clone(),
3736-
Some(TurnItem::ContextCompaction(
3737-
codex_protocol::items::ContextCompactionItem::new(),
3738-
)),
3739-
)
3740-
.await
3741-
.expect("handle output item");
3742-
3743-
assert!(output.server_side_compaction.is_some());
3744-
assert!(output.tool_future.is_none());
3745-
assert!(!output.needs_follow_up);
3746-
assert!(output.last_agent_message.is_none());
3747-
3748-
let first = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
3749-
.await
3750-
.expect("expected raw response item event")
3751-
.expect("channel open");
3752-
assert!(matches!(
3753-
first.msg,
3754-
EventMsg::RawResponseItem(raw) if raw.item == item
3755-
));
3756-
assert!(
3757-
rx.try_recv().is_err(),
3758-
"expected the caller to decide when to commit the compaction lifecycle"
3759-
);
3760-
}
3761-
37623706
#[tokio::test]
37633707
async fn steer_input_requires_active_turn() {
37643708
let (sess, _tc, _rx) = make_session_and_context_with_rx().await;

codex-rs/core/src/stream_events_utils.rs

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use std::sync::Arc;
66
use base64::Engine;
77
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
88
use codex_protocol::config_types::ModeKind;
9-
use codex_protocol::items::ContextCompactionItem;
109
use codex_protocol::items::TurnItem;
1110
use codex_utils_stream_parser::strip_citations;
1211
use tokio_util::sync::CancellationToken;
@@ -25,8 +24,6 @@ use codex_protocol::models::FunctionCallOutputBody;
2524
use codex_protocol::models::FunctionCallOutputPayload;
2625
use codex_protocol::models::ResponseInputItem;
2726
use codex_protocol::models::ResponseItem;
28-
use codex_protocol::protocol::EventMsg;
29-
use codex_protocol::protocol::RawResponseItemEvent;
3027
use codex_utils_stream_parser::strip_proposed_plan_blocks;
3128
use futures::Future;
3229
use tracing::debug;
@@ -147,15 +144,9 @@ pub(crate) type InFlightFuture<'f> =
147144
pub(crate) struct OutputItemResult {
148145
pub last_agent_message: Option<String>,
149146
pub needs_follow_up: bool,
150-
pub server_side_compaction: Option<ServerSideCompaction>,
151147
pub tool_future: Option<InFlightFuture<'static>>,
152148
}
153149

154-
pub(crate) struct ServerSideCompaction {
155-
pub item: ResponseItem,
156-
pub turn_item: TurnItem,
157-
}
158-
159150
pub(crate) struct HandleOutputCtx {
160151
pub sess: Arc<Session>,
161152
pub turn_context: Arc<TurnContext>,
@@ -171,27 +162,10 @@ pub(crate) async fn handle_output_item_done(
171162
) -> Result<OutputItemResult> {
172163
let mut output = OutputItemResult::default();
173164
let plan_mode = ctx.turn_context.collaboration_mode.mode == ModeKind::Plan;
174-
175-
if matches!(item, ResponseItem::Compaction { .. }) {
176-
let turn_item = TurnItem::ContextCompaction(match previously_active_item {
177-
Some(TurnItem::ContextCompaction(item)) => item,
178-
_ => ContextCompactionItem::new(),
179-
});
180-
// Preserve the raw wire event immediately; the caller rewrites local history inline and
181-
// then continues appending later same-turn output after the compaction item.
182-
debug!(
183-
turn_id = %ctx.turn_context.sub_id,
184-
"emitting streamed server-side raw compaction item for immediate local checkpoint apply"
185-
);
186-
ctx.sess
187-
.send_event(
188-
&ctx.turn_context,
189-
EventMsg::RawResponseItem(RawResponseItemEvent { item: item.clone() }),
190-
)
191-
.await;
192-
output.server_side_compaction = Some(ServerSideCompaction { item, turn_item });
193-
return Ok(output);
194-
}
165+
debug_assert!(
166+
!matches!(item, ResponseItem::Compaction { .. }),
167+
"compaction items should be handled before handle_output_item_done"
168+
);
195169

196170
match ToolRouter::build_tool_call(ctx.sess.as_ref(), item.clone()).await {
197171
// The model emitted a tool call; log it, persist the item immediately, and queue the tool execution.
@@ -350,9 +324,6 @@ pub(crate) async fn handle_non_tool_response_item(
350324
}
351325
Some(turn_item)
352326
}
353-
ResponseItem::Compaction { .. } => {
354-
Some(TurnItem::ContextCompaction(ContextCompactionItem::new()))
355-
}
356327
ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => {
357328
debug!("unexpected tool output from stream");
358329
None

0 commit comments

Comments
 (0)