Skip to content

feat(core): implement mid-turn queue drain for agent execution#2854

Merged
wenshao merged 6 commits intoQwenLM:mainfrom
wenshao:feat/mid-turn-queue-drain
Apr 7, 2026
Merged

feat(core): implement mid-turn queue drain for agent execution#2854
wenshao merged 6 commits intoQwenLM:mainfrom
wenshao:feat/mid-turn-queue-drain

Conversation

@wenshao
Copy link
Copy Markdown
Collaborator

@wenshao wenshao commented Apr 3, 2026

Summary

Implement mid-turn queue drain for the main session, allowing the model to see user messages immediately during tool execution instead of waiting for the entire round to complete.

Before (Between-Round Drain)

User messages sent during tool execution are buffered locally in React state. They are only flushed after the entire round completes, then processed as a new round:

Round start → [API→Tool→API→Tool→...→Done] → dequeue → New Round(user msg) → ...
                                                 ↑ user must wait until here

The user cannot redirect or supplement the agent mid-execution — they must wait for all tool steps to finish.

After (Mid-Turn Queue Drain)

After each tool execution step, the queue is drained and messages are injected as text parts alongside tool results, so the model sees them in the very next API call:

Turn start → [API→Tool] → queue drain → [API(user msg + tool results)→Tool] → ...
                               ↑ user message injected here, model sees it immediately

Changes

File Change
useGeminiStream.ts Add midTurnDrainRef parameter; drain queue in handleCompletedTools before submitQuery
AppContainer.tsx Create drain ref; bridge useMessageQueue to it via ref pattern (avoids stale closures)

Test plan

Automated tests

  • npx vitest run packages/cli/src/ui/hooks/useGeminiStream.test.tsx — 56 tests pass (no regression)
  • npx vitest run packages/cli/src/ui/AppContainer.test.tsx — 32 tests pass (no regression)
  • npx tsc --noEmit passes for both core and cli packages

Manual test case

  1. Send a multi-step task with a slow tool call:
    Run this command: sleep 15 && echo "step1 done", then after that create a file src/result.ts
    
  2. While sleep 15 is executing (you have a 15-second window), type:
    Write "modified by user" in the file instead
    
  3. After sleep finishes, check src/result.ts:
    • Mid-turn drain working: file contains "modified by user" (model saw the message before creating the file)
    • Not working: file has model-chosen content, user message processed in a separate round

🤖 Generated with Claude Code

Inject queued user messages between tool execution steps within a single
turn, so the model sees them immediately instead of waiting for the
entire round to complete.

- Add `dequeueAll()` to AsyncMessageQueue
- Add `midTurnDrain` callback to ReasoningLoopOptions
- Drain queue after processFunctionCalls, inject as text parts
- AgentComposer always enqueues directly (no local buffering)
- Add QUEUE_MESSAGES_CONSUMED event for UI sync

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 3, 2026

📋 Review Summary

This PR implements mid-turn queue drain functionality, allowing user messages queued during tool execution to be injected into the current reasoning turn rather than waiting for the entire round to complete. The implementation is well-structured with comprehensive test coverage and clean separation of concerns across the queue utility, agent core, interactive agent, and UI components.

🔍 General Feedback

  • Clean architecture: The implementation follows a clear layered approach - AsyncMessageQueue provides the primitive, AgentInteractive orchestrates the drain logic, AgentCore injects messages into the conversation, and AgentComposer handles UI display.
  • Good test coverage: New tests cover the dequeueAll() method, mid-turn drain callback passing, message draining, event emission, and edge cases (empty queue drain).
  • Consistent patterns: Event-driven architecture aligns with existing patterns (QUEUE_MESSAGES_CONSUMED event follows the same structure as other agent events).
  • Type safety: Proper TypeScript typing throughout, including the new AgentQueueDrainEvent interface and AgentEventMap extension.
  • Positive: The midTurnDrain callback pattern in ReasoningLoopOptions is a clean, non-invasive way to inject this behavior into the core reasoning loop.

🎯 Specific Feedback

🟢 Medium

  • File: packages/core/src/agents/runtime/agent-core.ts:504-509 - The injected user message format uses a plain text prefix ([User message received during tool execution]) which could potentially be confused with actual user input by the model. Consider using a more structured approach (e.g., a separate message role or metadata field) to distinguish injected messages from genuine user messages.

  • File: packages/core/src/agents/runtime/agent-interactive.ts:189 - The midTurnDrain callback is created inline in the options object. While functional, this creates a new function reference on each call. Consider defining it as a class method bound in the constructor or using an arrow property to avoid potential memory churn in tight loops.

  • File: packages/cli/src/ui/components/agent-view/AgentComposer.tsx:194-204 - The useEffect for clearing display on QUEUE_MESSAGES_CONSUMED and STATUS_CHANGE events doesn't include streamingState in dependencies, but accesses it in the closure. While not causing bugs here (the event handlers don't use it), adding it would make the dependency array more explicit and future-proof.

🔵 Low

  • File: packages/core/src/agents/runtime/agent-events.ts:177-183 - The AgentQueueDrainEvent interface includes subagentId but other similar events (like AgentStatusChangeEvent) use agentId. Consider standardizing on one naming convention for consistency across event types.

  • File: packages/core/src/utils/asyncMessageQueue.ts:41-44 - The dequeueAll() implementation uses this.items.splice(0) which mutates the internal array. While correct, adding a JSDoc comment noting that it returns a copy (or explicitly returning a copy with [...this.items]) would clarify the ownership semantics for callers.

  • File: packages/cli/src/ui/components/agent-view/AgentComposer.tsx:285 - The variable rename from messageQueue to pendingMessages is clear, but the comment "Track for display when agent is busy" could benefit from explaining why we track pending messages separately (i.e., to show users their queued messages are waiting to be processed).

✅ Highlights

  • Excellent test coverage: The four new test cases in agent-interactive.test.ts thoroughly cover the mid-turn drain functionality, including the callback passing, message draining with metadata, event emission, and empty queue edge case.
  • Clean event-driven design: The QUEUE_MESSAGES_CONSUMED event allows the UI to stay synchronized with backend state changes without polling or tight coupling.
  • Minimal invasive changes: The midTurnDrain callback pattern in ReasoningLoopOptions is an elegant way to add this feature without modifying the core reasoning loop logic significantly.
  • Good metadata tracking: Messages injected mid-turn are marked with metadata: { midTurnInjected: true }, enabling potential debugging or analytics down the line.

Extend mid-turn queue drain to the main session's tool execution path
(useGeminiStream). Previously only agent tabs had this feature.

- Add midTurnDrainRef parameter to useGeminiStream
- Inject queued messages in handleCompletedTools before submitQuery
- Bridge useMessageQueue to drain ref in AppContainer via ref pattern

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds “mid-turn” draining of queued user messages so that messages typed during tool execution can be injected into the very next model call (instead of waiting until the entire round completes), improving interactivity for both in-process agents and the main CLI stream loop.

Changes:

  • Add dequeueAll() to AsyncMessageQueue to atomically drain pending items.
  • Introduce midTurnDrain callback plumbing through AgentCore.runReasoningLoop and implement queue draining + event emission in AgentInteractive.
  • Wire CLI UI/streaming to enqueue immediately and drain mid-turn (via new event + a ref bridge).

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
packages/core/src/utils/asyncMessageQueue.ts Adds dequeueAll() for draining all queued items.
packages/core/src/utils/asyncMessageQueue.test.ts Adds tests for dequeueAll() behavior.
packages/core/src/agents/runtime/agent-interactive.ts Implements queue draining + event emission; passes midTurnDrain into core loop.
packages/core/src/agents/runtime/agent-interactive.test.ts Adds tests for mid-turn draining and event emission.
packages/core/src/agents/runtime/agent-events.ts Adds QUEUE_MESSAGES_CONSUMED event and payload type.
packages/core/src/agents/runtime/agent-core.ts Calls midTurnDrain() after tool execution to inject messages into the next API call.
packages/cli/src/ui/hooks/useGeminiStream.ts Injects drained queued messages into the next tool-result submission.
packages/cli/src/ui/components/agent-view/AgentComposer.tsx Always enqueues messages directly; clears pending display on drain/status events.
packages/cli/src/ui/AppContainer.tsx Bridges useMessageQueue to useGeminiStream via a midTurnDrainRef.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

- Guard midTurnDrain with abort check to prevent message loss on cancel
- Synchronously clear messageQueueRef to prevent duplicate drains
- Only clear pending display on IDLE status, not all status changes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Revert subagent-path changes (AgentCore, AgentInteractive,
AgentComposer, AsyncMessageQueue, agent-events) to keep the PR
focused on the main session, which is easier to test and validate.

Subagent mid-turn drain can be added in a follow-up PR.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Collaborator

@tanzhenxin tanzhenxin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

- Move synchronous queue ref into useMessageQueue itself, expose
  drainQueue() for atomic drain (fixes race between addMessage and drain)
- Record drained messages as USER history items so the transcript
  stays complete
- Simplify AppContainer bridge to just midTurnDrainRef.current = drainQueue

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

- Skip drain when turnCancelledRef or abortController signal is set,
  so queued messages stay for the next turn instead of being lost
- Restore ref-based queue bridge (drainQueue removed from useMessageQueue)
- Keep synchronous ref clear to prevent duplicate drains

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

wenshao added a commit to wenshao/codeagents that referenced this pull request Apr 3, 2026
Major additions to qwen-code-improvement-report.md:

1. New P0 item: Mid-Turn Queue Drain (PR QwenLM/qwen-code#2854 open)
2. Upgraded P2→P1: Tool parallelism with Kind-based batching
   (PR QwenLM/qwen-code#2864 open)
3. New P1 items: startup optimization, CLAUDE.md conditional rules
4. New P2 items: shell security, MDM enterprise, API token counting
5. Added Fork Subagent deep-dive cross-reference link
6. Expanded summary table from 10→16 dimensions with "进展" column
   tracking Qwen Code PR status
7. Fixed forkSubagent.ts line count 211→210

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
wenshao added a commit to wenshao/codeagents that referenced this pull request Apr 3, 2026
…rt (#34)

* docs: add English terms to technical concepts, fix leaked source reference and relative paths

- Remove 'leaked' reference in qwen-code-improvement-report.md
- Fix 14 relative paths from '../claude-code-leaked/' to 'claude-code/'
- Add English annotations to technical terms across 10 comparison docs
  (Speculation, Context Compression, Subagent, Telemetry, etc.)

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* docs: convert source code references to clickable local links in qwen-code-improvement-report.md

- Add local source paths section (claude-code-leaked + qwen-code)
- Convert 14 Claude Code source refs to clickable links
- Convert 7 Qwen Code source refs to clickable links

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* docs: fix qwen-code-improvement-report.md local links

- Replace external source code links with internal article links
- Convert 21 external file links to internal documentation links
- Add related article references for all 5 Top 5 improvement sections
- Use '源码:' format for source code references instead of clickable links

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

* fix: correct improvement report accuracy issues

1. reactiveCompact.ts does NOT exist — changed to apiMicrocompact.ts
2. Qwen Code speculation is complete in v0.15.0 (563 lines + overlayFs +
   speculationToolGate), only default-disabled — updated description,
   matrix row, summary table, and suggestions
3. Claude Code speculation.ts: 992 → 991 lines

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: replace 'leaked 源码' with '源码分析' in startup-optimization disclaimer

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: address Copilot review — rename claude-code-leaked path reference

Changed `../claude-code-leaked/` → `../claude-code/`(源码快照)in the
source path hint to avoid inconsistency with "源码分析" description.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: enhance improvement report with deep-dive findings and PR status

Major additions to qwen-code-improvement-report.md:

1. New P0 item: Mid-Turn Queue Drain (PR QwenLM/qwen-code#2854 open)
2. Upgraded P2→P1: Tool parallelism with Kind-based batching
   (PR QwenLM/qwen-code#2864 open)
3. New P1 items: startup optimization, CLAUDE.md conditional rules
4. New P2 items: shell security, MDM enterprise, API token counting
5. Added Fork Subagent deep-dive cross-reference link
6. Expanded summary table from 10→16 dimensions with "进展" column
   tracking Qwen Code PR status
7. Fixed forkSubagent.ts line count 211→210

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: simplify improvement matrix table to 5 columns, add deep-dive links

- Reduced matrix from 7 columns to 5 (优先级/改进点/现状/难度/进展)
  to eliminate GitHub horizontal scrollbar
- Added section 五 with 12 deep-dive cross-reference links
- Tracked Qwen Code PR status: #2854 (mid-turn drain), #2864 (parallelism),
  #2525 (speculation, merged)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: correct line counts and anchor link in improvement report

- compact.ts: 1706→1705
- AgentTool.tsx: 1398→1397
- runAgent.ts: 974→973
- autoDream.ts: 325→324
- Fix anchor: #相关-deep-dive-文章 → #五相关-deep-dive-文章

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@wenshao wenshao added the DDAR DataWorks Data Agent Ready label Apr 4, 2026
@wenshao wenshao merged commit b6373ac into QwenLM:main Apr 7, 2026
19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

DDAR DataWorks Data Agent Ready

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants