diff --git a/packages/core/src/core/coreToolScheduler.test.ts b/packages/core/src/core/coreToolScheduler.test.ts index ee53116c48..ebf7dbc570 100644 --- a/packages/core/src/core/coreToolScheduler.test.ts +++ b/packages/core/src/core/coreToolScheduler.test.ts @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import type { Mock } from 'vitest'; import type { Config, @@ -3004,7 +3004,20 @@ describe('Fire hook functions integration', () => { }); }); - describe('Concurrent agent tool execution', () => { + describe('Concurrent tool execution', () => { + // Ensure tests are deterministic regardless of environment. + const origEnv = process.env['QWEN_CODE_MAX_TOOL_CONCURRENCY']; + beforeEach(() => { + delete process.env['QWEN_CODE_MAX_TOOL_CONCURRENCY']; + }); + afterEach(() => { + if (origEnv !== undefined) { + process.env['QWEN_CODE_MAX_TOOL_CONCURRENCY'] = origEnv; + } else { + delete process.env['QWEN_CODE_MAX_TOOL_CONCURRENCY']; + } + }); + function createScheduler( tools: Map, onAllToolCallsComplete: Mock, @@ -3132,7 +3145,7 @@ describe('Fire hook functions integration', () => { expect(startIndices.every((i) => i < firstEnd)).toBe(true); }); - it('should run agent tools concurrently while other tools run sequentially', async () => { + it('should run concurrency-safe tools in parallel and unsafe tools sequentially', async () => { const executionLog: string[] = []; const agentTool = new MockTool({ @@ -3151,10 +3164,11 @@ describe('Fire hook functions integration', () => { const readTool = new MockTool({ name: 'read_file', + kind: Kind.Read, execute: async (params) => { const id = (params as { id: string }).id; executionLog.push(`read:start:${id}`); - await new Promise((r) => setTimeout(r, 20)); + await new Promise((r) => setTimeout(r, 50)); executionLog.push(`read:end:${id}`); return { llmContent: `Read ${id} done`, @@ -3176,6 +3190,8 @@ describe('Fire hook functions integration', () => { ); const abortController = new AbortController(); + // All 4 calls are concurrency-safe (read_file=Kind.Read, agent=Agent name) + // so they form one parallel batch and all run concurrently. const requests = [ { callId: '1', @@ -3215,20 +3231,226 @@ describe('Fire hook functions integration', () => { expect(completedCalls).toHaveLength(4); expect(completedCalls.every((c) => c.status === 'success')).toBe(true); - // Non-agent tools should execute sequentially: read:1 finishes before read:2 starts - const read1End = executionLog.indexOf('read:end:1'); - const read2Start = executionLog.indexOf('read:start:2'); - expect(read1End).toBeLessThan(read2Start); - - // Agent tools should execute concurrently: both start before either ends - const agentAStart = executionLog.indexOf('agent:start:A'); - const agentBStart = executionLog.indexOf('agent:start:B'); - const firstAgentEnd = Math.min( + // All 4 tools are concurrency-safe → they should all start + // before any of them finishes (parallel execution). + const allStarts = [ + executionLog.indexOf('read:start:1'), + executionLog.indexOf('agent:start:A'), + executionLog.indexOf('read:start:2'), + executionLog.indexOf('agent:start:B'), + ]; + const firstEnd = Math.min( + executionLog.indexOf('read:end:1'), executionLog.indexOf('agent:end:A'), + executionLog.indexOf('read:end:2'), executionLog.indexOf('agent:end:B'), ); - expect(agentAStart).toBeLessThan(firstAgentEnd); - expect(agentBStart).toBeLessThan(firstAgentEnd); + // Ensure all entries exist before comparing ordering + for (const start of allStarts) { + expect(start).not.toBe(-1); + } + expect(firstEnd).not.toBe(-1); + for (const start of allStarts) { + expect(start).toBeLessThan(firstEnd); + } + }); + + it('should partition mixed safe/unsafe tools into correct batches', async () => { + const executionLog: string[] = []; + + const readTool = new MockTool({ + name: 'read_file', + kind: Kind.Read, + execute: async (params) => { + const id = (params as { id: string }).id; + executionLog.push(`read:start:${id}`); + await new Promise((r) => setTimeout(r, 50)); + executionLog.push(`read:end:${id}`); + return { + llmContent: `Read ${id} done`, + returnDisplay: `Read ${id} done`, + }; + }, + }); + + const editTool = new MockTool({ + name: 'edit', + kind: Kind.Edit, + execute: async (params) => { + const id = (params as { id: string }).id; + executionLog.push(`edit:start:${id}`); + await new Promise((r) => setTimeout(r, 20)); + executionLog.push(`edit:end:${id}`); + return { + llmContent: `Edit ${id} done`, + returnDisplay: `Edit ${id} done`, + }; + }, + }); + + const tools = new Map([ + ['read_file', readTool], + ['edit', editTool], + ]); + const onAllToolCallsComplete = vi.fn(); + const onToolCallsUpdate = vi.fn(); + const scheduler = createScheduler( + tools, + onAllToolCallsComplete, + onToolCallsUpdate, + ); + + // [Read₁, Read₂, Edit, Read₃] + // Expected batches: [Read₁,Read₂](parallel) → [Edit](seq) → [Read₃](seq) + const requests = [ + { + callId: '1', + name: 'read_file', + args: { id: '1' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + { + callId: '2', + name: 'read_file', + args: { id: '2' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + { + callId: '3', + name: 'edit', + args: { id: 'E' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + { + callId: '4', + name: 'read_file', + args: { id: '3' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + ]; + + await scheduler.schedule(requests, new AbortController().signal); + + expect(onAllToolCallsComplete).toHaveBeenCalled(); + const completedCalls = onAllToolCallsComplete.mock + .calls[0][0] as ToolCall[]; + expect(completedCalls).toHaveLength(4); + expect(completedCalls.every((c) => c.status === 'success')).toBe(true); + + // Batch 1: Read₁ and Read₂ run in parallel (both start before either ends) + const read1Start = executionLog.indexOf('read:start:1'); + const read2Start = executionLog.indexOf('read:start:2'); + const firstReadEnd = Math.min( + executionLog.indexOf('read:end:1'), + executionLog.indexOf('read:end:2'), + ); + expect(read1Start).not.toBe(-1); + expect(read2Start).not.toBe(-1); + expect(firstReadEnd).not.toBe(-1); + expect(read1Start).toBeLessThan(firstReadEnd); + expect(read2Start).toBeLessThan(firstReadEnd); + + // Batch 2: Edit starts after both reads complete + const lastReadEnd = Math.max( + executionLog.indexOf('read:end:1'), + executionLog.indexOf('read:end:2'), + ); + const editStart = executionLog.indexOf('edit:start:E'); + expect(editStart).not.toBe(-1); + expect(editStart).toBeGreaterThan(lastReadEnd); + + // Batch 3: Read₃ starts after Edit completes + const editEnd = executionLog.indexOf('edit:end:E'); + const read3Start = executionLog.indexOf('read:start:3'); + expect(editEnd).not.toBe(-1); + expect(read3Start).not.toBe(-1); + expect(read3Start).toBeGreaterThan(editEnd); + }); + + it('should run read-only shell commands concurrently and non-read-only sequentially', async () => { + const executionLog: string[] = []; + + const shellTool = new MockTool({ + name: 'run_shell_command', + kind: Kind.Execute, + execute: async (params) => { + const cmd = (params as { command: string }).command; + executionLog.push(`shell:start:${cmd}`); + await new Promise((r) => setTimeout(r, 50)); + executionLog.push(`shell:end:${cmd}`); + return { + llmContent: `Shell ${cmd} done`, + returnDisplay: `Shell ${cmd} done`, + }; + }, + }); + + const tools = new Map([ + ['run_shell_command', shellTool], + ]); + const onAllToolCallsComplete = vi.fn(); + const onToolCallsUpdate = vi.fn(); + const scheduler = createScheduler( + tools, + onAllToolCallsComplete, + onToolCallsUpdate, + ); + + // "git log" and "ls" are read-only → concurrent + // "npm install" is not read-only → sequential, breaks the batch + const requests = [ + { + callId: '1', + name: 'run_shell_command', + args: { command: 'git log' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + { + callId: '2', + name: 'run_shell_command', + args: { command: 'ls' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + { + callId: '3', + name: 'run_shell_command', + args: { command: 'npm install' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + ]; + + await scheduler.schedule(requests, new AbortController().signal); + + expect(onAllToolCallsComplete).toHaveBeenCalled(); + + // "git log" and "ls" should start concurrently (both before either ends) + const gitStart = executionLog.indexOf('shell:start:git log'); + const lsStart = executionLog.indexOf('shell:start:ls'); + const firstReadOnlyEnd = Math.min( + executionLog.indexOf('shell:end:git log'), + executionLog.indexOf('shell:end:ls'), + ); + expect(gitStart).not.toBe(-1); + expect(lsStart).not.toBe(-1); + expect(firstReadOnlyEnd).not.toBe(-1); + expect(gitStart).toBeLessThan(firstReadOnlyEnd); + expect(lsStart).toBeLessThan(firstReadOnlyEnd); + + // "npm install" should start after both read-only commands complete + const lastReadOnlyEnd = Math.max( + executionLog.indexOf('shell:end:git log'), + executionLog.indexOf('shell:end:ls'), + ); + const npmStart = executionLog.indexOf('shell:start:npm install'); + expect(npmStart).not.toBe(-1); + expect(npmStart).toBeGreaterThan(lastReadOnlyEnd); }); }); }); diff --git a/packages/core/src/core/coreToolScheduler.ts b/packages/core/src/core/coreToolScheduler.ts index 4c82c30952..9bb0fa4cd4 100644 --- a/packages/core/src/core/coreToolScheduler.ts +++ b/packages/core/src/core/coreToolScheduler.ts @@ -49,6 +49,9 @@ import type { PartListUnion, } from '@google/genai'; import { ToolNames } from '../tools/tool-names.js'; +import { CONCURRENCY_SAFE_KINDS } from '../tools/tools.js'; +import { isShellCommandReadOnly } from '../utils/shellReadOnlyChecker.js'; +import { stripShellWrapper } from '../utils/shell-utils.js'; import { buildPermissionCheckContext, evaluatePermissionRules, @@ -330,6 +333,58 @@ interface CoreToolSchedulerOptions { chatRecordingService?: ChatRecordingService; } +// ─── Tool Concurrency Helpers ──────────────────────────────── + +interface ToolBatch { + concurrent: boolean; + calls: ScheduledToolCall[]; +} + +/** + * Returns true if a scheduled tool call can safely execute concurrently + * with other safe tools (no side effects, no shared mutable state). + */ +function isConcurrencySafe(call: ScheduledToolCall): boolean { + // Agent tools spawn independent sub-agents with no shared state. + if (call.request.name === ToolNames.AGENT) return true; + // Shell commands: check if the command is read-only (e.g., git log, cat). + // Uses the synchronous regex+shell-quote checker (not the async AST-based + // one) because partitioning runs synchronously. The sync checker covers + // the same command whitelist and is fail-closed — unknown commands remain + // sequential. The AST version is used separately for permission decisions. + if (call.tool.kind === Kind.Execute) { + const command = (call.request.args as { command?: string }).command; + if (typeof command !== 'string') return false; + try { + return isShellCommandReadOnly(stripShellWrapper(command)); + } catch { + return false; // fail-closed + } + } + return CONCURRENCY_SAFE_KINDS.has(call.tool.kind); +} + +/** + * Partition tool calls into consecutive batches by concurrency safety. + * + * Consecutive safe tools are merged into a single parallel batch. + * Each unsafe tool forms its own sequential batch. + * + * Example: [Read, Read, Edit, Read] → [[Read,Read](parallel), [Edit](seq), [Read](seq)] + */ +function partitionToolCalls(calls: ScheduledToolCall[]): ToolBatch[] { + return calls.reduce((batches, call) => { + const safe = isConcurrencySafe(call); + const lastBatch = batches[batches.length - 1]; + if (safe && lastBatch?.concurrent) { + lastBatch.calls.push(call); + } else { + batches.push({ concurrent: safe, calls: [call] }); + } + return batches; + }, []); +} + export class CoreToolScheduler { private toolRegistry: ToolRegistry; private toolCalls: ToolCall[] = []; @@ -1317,32 +1372,51 @@ export class CoreToolScheduler { if (allCallsFinalOrScheduled) { const callsToExecute = this.toolCalls.filter( - (call) => call.status === 'scheduled', + (call): call is ScheduledToolCall => call.status === 'scheduled', ); - // Task tools are safe to run concurrently — they spawn independent - // sub-agents with no shared mutable state. All other tools run - // sequentially in their original order to preserve any implicit - // ordering the model may rely on. - const taskCalls = callsToExecute.filter( - (call) => call.request.name === ToolNames.AGENT, - ); - const otherCalls = callsToExecute.filter( - (call) => call.request.name !== ToolNames.AGENT, - ); - - const taskPromise = Promise.all( - taskCalls.map((tc) => this.executeSingleToolCall(tc, signal)), - ); + // Partition tool calls into consecutive batches by concurrency safety. + // Consecutive safe tools are grouped into parallel batches; unsafe + // tools each form their own sequential batch. Execute (shell) is safe + // only when isShellCommandReadOnly() returns true; otherwise sequential. + const batches = partitionToolCalls(callsToExecute); - const othersPromise = (async () => { - for (const toolCall of otherCalls) { - await this.executeSingleToolCall(toolCall, signal); + for (const batch of batches) { + if (batch.concurrent && batch.calls.length > 1) { + await this.runConcurrently(batch.calls, signal); + } else { + for (const call of batch.calls) { + await this.executeSingleToolCall(call, signal); + } } - })(); + } + } + } - await Promise.all([taskPromise, othersPromise]); + /** + * Execute multiple tool calls concurrently with a concurrency cap. + */ + private async runConcurrently( + calls: ScheduledToolCall[], + signal: AbortSignal, + ): Promise { + const parsed = parseInt( + process.env['QWEN_CODE_MAX_TOOL_CONCURRENCY'] || '', + 10, + ); + const maxConcurrency = Number.isFinite(parsed) && parsed >= 1 ? parsed : 10; + const executing = new Set>(); + + for (const call of calls) { + const p = this.executeSingleToolCall(call, signal).finally(() => { + executing.delete(p); + }); + executing.add(p); + if (executing.size >= maxConcurrency) { + await Promise.race(executing); + } } + await Promise.all(executing); } private async executeSingleToolCall( diff --git a/packages/core/src/test-utils/mock-tool.ts b/packages/core/src/test-utils/mock-tool.ts index 0e3cf293dc..6a1f245558 100644 --- a/packages/core/src/test-utils/mock-tool.ts +++ b/packages/core/src/test-utils/mock-tool.ts @@ -24,6 +24,7 @@ interface MockToolOptions { name: string; displayName?: string; description?: string; + kind?: Kind; canUpdateOutput?: boolean; isOutputMarkdown?: boolean; getDefaultPermission?: () => Promise; @@ -97,7 +98,7 @@ export class MockTool extends BaseDeclarativeTool< options.name, options.displayName ?? options.name, options.description ?? options.name, - Kind.Other, + options.kind ?? Kind.Other, options.params, options.isOutputMarkdown ?? false, options.canUpdateOutput ?? false, diff --git a/packages/core/src/tools/tools.ts b/packages/core/src/tools/tools.ts index 0d50f351ee..9af5677cb7 100644 --- a/packages/core/src/tools/tools.ts +++ b/packages/core/src/tools/tools.ts @@ -738,6 +738,17 @@ export const MUTATOR_KINDS: Kind[] = [ Kind.Execute, ] as const; +/** + * Tool kinds that are safe to execute concurrently (pure reads, no writes). + * Kind.Think is excluded because some Think tools write to disk + * (e.g., save_memory, todo_write). + */ +export const CONCURRENCY_SAFE_KINDS: ReadonlySet = new Set([ + Kind.Read, + Kind.Search, + Kind.Fetch, +]); + export interface ToolLocation { // Absolute path to the file path: string;