fix(responses-api): thread creation, GET by ID, streaming delta, context injection#2167
Conversation
…ext injection - Allow new threads from Responses API (metadata.source check) - Session manager adopts external UUID as internal thread ID - Skip duplicate delta when StreamChunks already delivered - x_context field for structured data injection (IronClaw extension) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request introduces structured context injection into the Responses API via a new x_context field and allows the API to create threads with specific UUIDs. It also refactors the streaming worker to handle non-streaming responses more consistently. Review feedback suggests using a shared constant for the "responses_api" source string to improve maintainability and ensuring that response item IDs remain stable between the "added" and "done" events to prevent client-side inconsistencies.
| && message.metadata.get("source").and_then(|v| v.as_str()) | ||
| == Some("responses_api") |
There was a problem hiding this comment.
The string literal "responses_api" is also used in src/channels/web/responses_api.rs. To avoid potential typos and improve maintainability, consider defining this as a constant in a shared location, for instance in src/channels/web/responses_api.rs, and reusing it here.
Example:
// in src/channels/web/responses_api.rs
pub const RESPONSES_API_SOURCE: &str = "responses_api";
// then in create_response_handler
... "source": RESPONSES_API_SOURCE, ...
// and in src/agent/thread_ops.rs
use crate::channels::web::responses_api::RESPONSES_API_SOURCE;
... == Some(RESPONSES_API_SOURCE) ...References
- Constants are beneficial when values need to be synchronized across multiple locations to maintain consistency and prevent typos, but should be avoided if used only once.
There was a problem hiding this comment.
Fixed — replaced the metadata string check entirely. Now uses message.channel == "gateway" which is server-set and unforgeable. No string constant needed since the channel name is already a well-known value used throughout the codebase.
| let item = ResponseOutputItem::Message { | ||
| id: make_item_id(), | ||
| role: "assistant".to_string(), | ||
| content: vec![MessageContent::OutputText { text }], | ||
| }; |
There was a problem hiding this comment.
The id of the ResponseOutputItem::Message is being regenerated here using make_item_id(). However, a placeholder for this item was already created with a different ID, either in the StreamChunk handler or earlier in this Response handler. This leads to an inconsistency where the id of the item in the response.output_item.added event is different from the id in the response.output_item.done event. The item ID should remain stable throughout its lifecycle.
To fix this, you should reuse the ID from the placeholder item that already exists in acc.output[idx].
let item_id = if let Some(ResponseOutputItem::Message { id, .. }) = acc.output.get(idx) {
id.clone()
} else {
// This path should not be taken if a placeholder was correctly inserted.
make_item_id()
};
let item = ResponseOutputItem::Message {
id: item_id,
role: "assistant".to_string(),
content: vec![MessageContent::OutputText { text }],
};There was a problem hiding this comment.
Fixed in d79e590 — now reuses the placeholder ID from acc.output[idx] instead of calling make_item_id() again.
There was a problem hiding this comment.
Pull request overview
This PR addresses several correctness gaps in the Responses API integration, focused on thread identity consistency, streaming event behavior, and adding structured request context injection.
Changes:
- Allow “new” gateway threads when the message originates from the Responses API (
metadata.source == "responses_api"). - Adopt caller-provided UUIDs as internal thread IDs (via
create_thread_with_id) soGET /v1/responses/{id}can find the correct conversation. - Improve streaming behavior by suppressing the single-shot
output_text.deltafallback when incrementalStreamChunkdeltas were already emitted. - Add
x_context(aliascontext) request field and inject it as a human-readable prefix into the user message; persist it into message metadata.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
src/channels/web/responses_api.rs |
Adds x_context support, formats/injects context into user content, and adjusts streaming terminal message emission to avoid duplicated deltas. |
src/agent/thread_ops.rs |
Allows gateway thread creation for Responses API requests even when the UUID isn’t yet present in DB metadata. |
src/agent/session.rs |
Adds create_thread_with_id so callers can create threads using a predetermined UUID. |
src/agent/session_manager.rs |
Uses external UUIDs as internal thread IDs on thread creation so response IDs map back to stored conversations. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Create new thread (always create a new one for a new key). | ||
| // If the external_thread_id is a valid UUID, adopt it as the internal | ||
| // thread ID so that callers (e.g. the Responses API) can look up | ||
| // conversations by the same UUID they encoded in the response ID. | ||
| let thread_id = { | ||
| let mut sess = session.lock().await; | ||
| let thread = sess.create_thread(Some(channel)); | ||
| let thread = if let Some(uuid) = ext_uuid { | ||
| sess.create_thread_with_id(uuid, Some(channel)) | ||
| } else { | ||
| sess.create_thread(Some(channel)) | ||
| }; |
There was a problem hiding this comment.
In resolve_thread_with_parsed_uuid, the create-new-thread path uses ext_uuid directly. That bypasses the earlier “UUID adoption check” that ensures the UUID isn’t already mapped to a different ThreadKey (only adoptable_uuid is guarded). If external_thread_id parses as a UUID that is already mapped elsewhere, this will still call create_thread_with_id and effectively create/alias a second thread with the same UUID, corrupting thread_map/undo_managers invariants. Use the guarded adoptable_uuid (or re-check under the write lock) before adopting ext_uuid as the internal thread id on the new-thread path.
There was a problem hiding this comment.
Fixed in d79e590 — re-checks under read lock that the UUID is not already mapped to another ThreadKey before adopting.
| // Finalize the output item with the complete text. | ||
| let item = ResponseOutputItem::Message { | ||
| id: make_item_id(), | ||
| role: "assistant".to_string(), | ||
| content: vec![MessageContent::OutputText { text }], | ||
| }; |
There was a problem hiding this comment.
streaming_worker finalizes the assistant message by creating a new ResponseOutputItem::Message with a freshly generated id (make_item_id()) and then emitting response.output_item.done. When the message item was previously emitted via response.output_item.added (either from StreamChunk handling or from the new placeholder path), this changes the item id mid-stream. Streaming clients often correlate added→done by item.id, so this can break client-side state. Preserve/reuse the original message item id when finalizing (e.g., create the id once and carry it through, or update the existing output item’s content without changing its id).
There was a problem hiding this comment.
Fixed in d79e590 — reuses the placeholder item ID when finalizing.
|
High Severity
That means an older response ID is not a stable snapshot: once later turns are added to the same thread, the same ID can return different data. This breaks the expectation that a response ID refers to a specific response object rather than the mutable conversation behind it. Suggested fix: persist snapshots keyed by Medium Severity The streaming path now emits Clients that correlate stream events by item ID will see two different objects for one logical message. Suggested fix: preserve the same item ID across both events and update the existing item in place. |
zmanian
left a comment
There was a problem hiding this comment.
Review
+153/-40, 4 files -- fixes four Responses API issues: thread creation bypass, GET by ID lookup, streaming deduplication, and context injection. CI passes.
Must Fix
-
Metadata-based auth bypass in thread_ops.rs -- thread creation is allowed when metadata.get("source") == Some("responses_api"), which is a stringly-typed backdoor. Any code path that constructs an IncomingMessage with that metadata key bypasses requires_preexisting_uuid_thread. Use a dedicated field on IncomingMessage (e.g., allow_thread_creation: bool) instead.
-
Item ID changes between streaming events -- make_item_id() is called separately for the placeholder and the final item, so output_item.added and output_item.done events have different IDs. Clients tracking items by ID will break. Reuse the placeholder's ID.
Suggestions
-
x_context prompt injection risk -- user-supplied JSON is formatted and prepended to the user message as [Context: ...]. Verify this passes through the safety layer's injection detection.
-
create_thread_with_id silently no-ops on collision -- or_insert returns the existing thread but still updates active_thread and last_active_at. Document whether idempotency is intended.
-
Minimal test coverage -- only two tests for format_context. No tests for thread creation bypass, streaming dedup, or create_thread_with_id.
serrrfirat
left a comment
There was a problem hiding this comment.
Paranoid Architect Review — Verdict: ❌ BLOCK (1 Critical, 2 High)
The PR addresses four distinct issues (thread creation, GET-by-ID, streaming delta, context injection). Fixes are reasonable in scope but there are security concerns around the metadata-based authorization bypass and unsanitized context injection.
Findings
| # | Severity | Category | File | Finding | Suggested Fix |
|---|---|---|---|---|---|
| 1 | Critical | Security | agent/thread_ops.rs | Metadata-based auth bypass is spoofable. Bypasses requires_preexisting_uuid_thread when metadata["source"] == "responses_api". WASM channels can forge this via apply_emitted_metadata(), creating threads with arbitrary UUIDs without ownership verification. |
Check message.channel against a known channel name instead of spoofable metadata. Or strip source from WASM-originated metadata. |
| 2 | High | Security | web/responses_api.rs | Unsanitized context injection. x_context is caller-supplied JSON prepended directly to user message with no validation. Enables prompt injection via crafted values. Bypasses safety layer's user/system content distinction. |
Validate against allowlist, enforce max lengths, or mark as system-injected metadata |
| 3 | High | Correctness | web/responses_api.rs (streaming) | Streaming delta dedup issue. If Response content differs from concatenated StreamChunks (post-processing), client never receives corrected final text. | Document assumption or emit correcting delta when they differ |
| 4 | Medium | Correctness | web/responses_api.rs | Non-deterministic item IDs. make_item_id() at finalization differs from output_item.added ID. Clients see phantom add/done with no correlation. |
Capture item ID at creation, reuse at finalization |
| 5 | Medium | Edge Case | agent/session.rs | No collision detection for create_thread_with_id. entry(id).or_insert(thread) silently ignores existing UUID. Adversarial collision maps to wrong conversation. |
Return error or log warning if UUID occupied |
| 6 | Medium | Security | web/responses_api.rs | Unbounded x_context stored in metadata. No size limit. Arbitrarily large JSON exhausts context window or inflates DB. |
Enforce max size (e.g., 10KB) |
| 7 | Medium | Test Coverage | agent/thread_ops.rs | No regression test for metadata-based bypass. CLAUDE.md: "Every bug fix must include a test." | Test: source: "responses_api" bypasses check; without it does NOT |
| 8 | Medium | Test Coverage | agent/session_manager.rs | No test for create_thread_with_id via resolve_thread. New codepath unexercised. |
Test verifying returned thread_id matches input UUID |
| 9 | Low | Correctness | web/responses_api.rs | Non-deterministic key ordering in format_context |
Document that key order not guaranteed |
| 10 | Low | Convention | web/responses_api.rs | alias = "context" could shadow future official OpenAI field |
Remove alias or document risk |
| 11 | Low | Docs | web/responses_api.rs | Doc says "Must be flat" but code accepts nesting via .to_string() |
Enforce flatness or change to "should be" |
| 12 | Nit | Style | agent/session.rs | create_thread_with_id duplicates create_thread |
Refactor to single method with Option<Uuid> |
| 13 | Nit | Style | web/responses_api.rs | Unicode em-dash in format string | Intentional, no action |
Summary
Finding #1 is the most critical: WASM channels can forge metadata to bypass thread ownership checks. Finding #2 enables prompt injection via unsanitized context. Both must be fixed before merge.
1. Replace spoofable metadata check with channel-based check: `message.channel == "gateway"` instead of `metadata["source"] == "responses_api"`. Channel names are server-set and unforgeable by WASM channels. 2. Reuse placeholder item ID in streaming worker: capture ID from acc.output[idx] instead of calling make_item_id() again. Fixes added→done ID mismatch that breaks client correlation. 3. Enforce 10 KB size limit on x_context to prevent context window exhaustion and DB bloat. Returns 400 if exceeded. 4. Log warning on UUID collision in create_thread_with_id. 5. Guard ext_uuid adoption: re-check under read lock that UUID isn't already mapped to another ThreadKey before adopting. 6. Update x_context doc: clarify alias collision risk, max size. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Addressed review feedback (zmanian + serrrfirat)Critical — metadata bypass (#1): Replaced Critical — item ID mismatch (#2, #4): Reuse the placeholder's High — unsanitized x_context (#2, #3): Enforced 10 KB size limit — returns 400 if exceeded. The content (including prepended context) still passes through the safety layer's injection detection in the normal agent pipeline. Updated doc comment to note alias collision risk and max size. → High — ext_uuid adoption guard (#5): Re-checks under read lock that the UUID isn't already mapped to another ThreadKey before adopting. Falls back to generating a new UUID if collision detected. → Medium — collision logging (#5): Medium — unbounded x_context (#6): Covered by the 10 KB limit above. Low/Nits:
Missing tests (#7, #8): Will add in a follow-up — these require integration test fixtures that are better suited to the E2E test PR (#2174). |
Summary
Four fixes for the Responses API:
Thread creation: The gateway channel requires pre-existing conversation UUIDs, but the Responses API generates fresh UUIDs. Now allows new threads when
metadata.source == "responses_api".GET by ID: Session manager now adopts the external UUID as the internal thread ID (via new
create_thread_with_id), so DB lookups match the response ID.Streaming delta: Skip the single-chunk fallback when
StreamChunkevents already delivered the text incrementally.Context injection: New
x_contextfield (aliased ascontext) on the request for structured data. Formatted as a human-readable prefix on the user message. Persisted in metadata.Test plan
POST /v1/responseswith new user — no more "Invalid or unauthorized thread ID"GET /v1/responses/{id}— returns the responseresponse.output_text.deltaevents not duplicated{"x_context": {"notification": {"action": "approved"}}}appears in agent conversation🤖 Generated with Claude Code