feat: dual-lane queue for concurrent message and task execution#43
feat: dual-lane queue for concurrent message and task execution#43Peyton-Spencer merged 1 commit intomainfrom
Conversation
Messages and scheduled tasks now run on independent lanes per group, so a long-running background task never blocks user messages. Each group can have both a message container and a task container active simultaneously, with separate IPC input directories to prevent cross-lane interference. The messaging agent gets context injection about active background tasks so it can inform users. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
📝 WalkthroughWalkthroughThe PR introduces lane-based concurrency management that separates message and task processing into independent execution lanes with distinct container lifecycle management, per-lane concurrency limits, updated backend APIs to support configurable IPC subdirectories, and task scheduling with prompt context injection. Changes
Sequence DiagramssequenceDiagram
participant User as User/API
participant Queue as GroupQueue<br/>(Message Lane)
participant QueueTask as GroupQueue<br/>(Task Lane)
participant Backend as Backend
participant DB as Database
User->>Queue: sendMessage()
activate Queue
Queue->>Queue: Check message concurrency
Queue->>Backend: registerProcess(lane='message')
Backend->>Backend: Start container
Queue->>Queue: Process messages
deactivate Queue
User->>QueueTask: enqueueTask(promptPreview)
activate QueueTask
QueueTask->>DB: advanceTaskNextRun()
DB->>DB: Update next_run, status
QueueTask->>QueueTask: Check task concurrency<br/>(MAX_TASK_CONTAINERS)
QueueTask->>Backend: registerProcess(lane='task')
Backend->>Backend: Start container<br/>(isolated IPC subdir)
QueueTask->>QueueTask: Store ActiveTaskInfo
QueueTask->>QueueTask: Execute task with context
deactivate QueueTask
Queue->>Queue: Inject active task context<br/>into message prompt
sequenceDiagram
participant Scheduler as TaskScheduler
participant Queue as GroupQueue
participant Backend as Backend
participant DB as Database
Scheduler->>Scheduler: Check due tasks
activate Scheduler
Scheduler->>DB: Get scheduled_tasks
Scheduler->>Scheduler: Compute next_run<br/>(cron/interval)
Scheduler->>Scheduler: Generate promptPreview<br/>(Heartbeat or 100-char)
Scheduler->>DB: advanceTaskNextRun()
DB->>DB: Update next_run
Scheduler->>Queue: enqueueTask(promptPreview,<br/>lane='task')
activate Queue
Queue->>Backend: registerProcess(lane='task')
Backend->>Backend: Start container
Queue->>Queue: Execute task
deactivate Queue
Scheduler->>Queue: closeStdin(lane='task')
Queue->>Backend: Write sentinel to<br/>workspace/ipc/tasks/_close
deactivate Scheduler
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts (beta)
Comment |
omarzanji
left a comment
There was a problem hiding this comment.
✅ Approved - Excellent Architecture Improvement!
This is a major responsiveness upgrade for NanoClaw. Great work!
Key Improvements
Dual-Lane Queue Architecture 🎯
- ✅ Message lane + Task lane run independently per group
- ✅ Long-running tasks (30-40 min) no longer block user messages
- ✅ Immediate user message processing even during background tasks
- ✅ Proper resource limits (MAX_TASK_CONTAINERS) with message priority
Isolation & Safety 🔒
- ✅ Task containers mount
input-task/preventing cross-lane interference - ✅ Tasks use fresh sessions (no session ID conflicts)
- ✅ IPC input isolation between message and task lanes
- ✅ All backends updated (local, Sprites, Daytona, Railway, Hetzner)
Context Awareness 💡
- ✅ Message agent knows when task is running
- ✅ Context injection: "[Background Task Running: ... — started N min ago]"
- ✅ Can inform users about ongoing background work
Testing ✅
- 11/11 tests pass
- Comprehensive coverage of dual-lane concurrency
- Tests verify isolation, limits, and context tracking
- No type errors
Code Quality
- Clean separation of concerns (message vs task state)
- Backward compatible (optional
inputSubdirparam) - Well-documented state interfaces
- Proper error handling and retry logic
Real-World Impact
Before: User sends message during 30-min task → waits 30 minutes
After: User sends message during 30-min task → instant response! 🚀
This solves a critical UX issue. Ready to merge!
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/index.ts`:
- Around line 895-896: onProcess currently calls queue.registerProcess(...,
undefined, ...) leaving taskBackend unset so closeStdin uses local fallback;
thread the actual backend from task-scheduler's runTask into onProcess (or pass
the backend instance directly to registerProcess) so queue.registerProcess
receives the real taskBackend instead of undefined; update the call site in
runTask to supply the backend argument and adjust
onProcess/queue.registerProcess signatures to accept and store taskBackend
(references: onProcess, queue.registerProcess, runTask, taskBackend,
closeStdin).
In `@src/task-scheduler.ts`:
- Around line 360-378: Pre-advancing next_run in the scheduling block causes
one-shot tasks to be lost if the process crashes before execution; instead, set
a "running" state on the task before enqueuing to prevent duplicate discovery
and only update next_run (or clear it for one-shot) after runTask completes.
Modify the logic around advanceTaskNextRun(currentTask.id, nextRun) to first
mark the task as running (e.g., setTaskRunning or
updateTaskStatus(currentTask.id, 'running')) then enqueue, and after runTask
finishes successfully update next_run/clear it (or compute and persist the cron
nextRun post-execution); ensure any cron tasks still compute and persist their
future next_run atomically or by using a transaction that sets running plus
next_run as needed.
🧹 Nitpick comments (4)
src/backends/sprites-backend.ts (1)
251-252: Consider parameterizing the stale sentinel cleanup for task lane support.The stale close sentinel cleanup is hardcoded to
/workspace/ipc/input/_close, butcloseStdinnow supports dynamicinputSubdir(e.g.,input-taskfor scheduled tasks). If Sprites backend is used for scheduled tasks, the task lane's stale sentinel at/workspace/ipc/input-task/_closewon't be cleaned up before a task run.♻️ Suggested fix
Consider passing
isScheduledTasktorunAgentcontext and cleaning up the appropriate sentinel:// Clean up any stale close sentinel - try { await sprite.exec('rm -f /workspace/ipc/input/_close'); } catch { /* ignore */ } + const inputSubdir = input.isScheduledTask ? 'input-task' : 'input'; + try { await sprite.exec(`rm -f /workspace/ipc/${inputSubdir}/_close`); } catch { /* ignore */ }src/backends/daytona-backend.ts (1)
219-221: Same consideration as Sprites: stale sentinel cleanup is hardcoded.The cleanup at line 220 is hardcoded to
/workspace/ipc/input/_close, but scheduled tasks would useinput-task. Consider parameterizing this based oninput.isScheduledTaskif Daytona is used for scheduled tasks.src/group-queue.test.ts (1)
30-58: Avoid fixed 10ms waits to reduce test flakiness.These timing-based sleeps can be racy under CI load; consider waiting on a resolver/condition instead of a fixed delay.
src/group-queue.ts (1)
226-233: Consider returning a defensive copy from getActiveTaskInfo.Returning the internal object allows external mutation; a shallow copy avoids accidental state corruption.
♻️ Suggested tweak
- const state = this.groups.get(groupJid); - return state?.activeTaskInfo ?? null; + const state = this.groups.get(groupJid); + return state?.activeTaskInfo ? { ...state.activeTaskInfo } : null;
| onProcess: (groupJid, proc, containerName, groupFolder, lane) => queue.registerProcess(groupJid, proc, containerName, groupFolder, undefined, lane), | ||
| sendMessage: async (jid, rawText) => { |
There was a problem hiding this comment.
Task containers never register their backend, so closeStdin can’t reach remote tasks.
Line 895 passes undefined as the backend when registering task processes. That leaves taskBackend unset, so queue.closeStdin(chatJid, 'task') falls back to local filesystem writes, which won’t signal remote backends. Please thread the backend from task-scheduler’s runTask into onProcess or register the process directly with the backend instance.
🤖 Prompt for AI Agents
In `@src/index.ts` around lines 895 - 896, onProcess currently calls
queue.registerProcess(..., undefined, ...) leaving taskBackend unset so
closeStdin uses local fallback; thread the actual backend from task-scheduler's
runTask into onProcess (or pass the backend instance directly to
registerProcess) so queue.registerProcess receives the real taskBackend instead
of undefined; update the call site in runTask to supply the backend argument and
adjust onProcess/queue.registerProcess signatures to accept and store
taskBackend (references: onProcess, queue.registerProcess, runTask, taskBackend,
closeStdin).
| // Advance next_run BEFORE enqueuing so this task isn't | ||
| // re-discovered on subsequent ticks while it's running/queued. | ||
| // 'once' tasks (no recurrence) get next_run set to null which | ||
| // also removes them from getDueTasks results. | ||
| let nextRun: string | null = null; | ||
| if (currentTask.schedule_type === 'cron') { | ||
| try { | ||
| const interval = CronExpressionParser.parse(currentTask.schedule_value, { tz: TIMEZONE }); | ||
| nextRun = interval.next().toISOString(); | ||
| } catch { | ||
| // Invalid cron — leave null so it completes as a one-shot | ||
| } | ||
| } else if (currentTask.schedule_type === 'interval') { | ||
| const ms = parseInt(currentTask.schedule_value, 10); | ||
| if (!isNaN(ms) && ms > 0) { | ||
| nextRun = new Date(Date.now() + ms).toISOString(); | ||
| } | ||
| } | ||
| advanceTaskNextRun(currentTask.id, nextRun); |
There was a problem hiding this comment.
Pre-advancing next_run can drop one-shot tasks on crash.
Line 360 advances next_run (and marks tasks completed when null) before execution. If the process crashes after this update but before the task runs, one-shot tasks are lost and won’t retry. Consider tracking a “running” status or only marking completed after runTask finishes, while still preventing duplicate discovery.
🤖 Prompt for AI Agents
In `@src/task-scheduler.ts` around lines 360 - 378, Pre-advancing next_run in the
scheduling block causes one-shot tasks to be lost if the process crashes before
execution; instead, set a "running" state on the task before enqueuing to
prevent duplicate discovery and only update next_run (or clear it for one-shot)
after runTask completes. Modify the logic around
advanceTaskNextRun(currentTask.id, nextRun) to first mark the task as running
(e.g., setTaskRunning or updateTaskStatus(currentTask.id, 'running')) then
enqueue, and after runTask finishes successfully update next_run/clear it (or
compute and persist the cron nextRun post-execution); ensure any cron tasks
still compute and persist their future next_run atomically or by using a
transaction that sets running plus next_run as needed.
Summary
input-task/as their IPC input dir, preventing cross-lane message interference[Background Task Running: "..." — started N min ago]prepended to its prompt so it can inform usersMAX_TASK_CONTAINERSlimits task parallelism while always reserving capacity for messages; globalMAX_CONCURRENT_CONTAINERSstill caps total containerscloseStdin()accepts optionalinputSubdirparam across local, Sprites, Daytona, Railway, and Hetzner backendsFiles changed
src/group-queue.tssrc/config.tsMAX_TASK_CONTAINERSconstantsrc/backends/types.tscloseStdinoptionalinputSubdirparamsrc/backends/local-backend.tsinput-task/dir creation, task mount isolationsrc/backends/*.tsinputSubdirparam oncloseStdinsrc/task-scheduler.tspromptPreviewfor context injection, task lane paramssrc/db.tsadvanceTaskNextRun()for pre-enqueue next_run advancementsrc/index.tsprocessGroupMessages(), lane paramssrc/group-queue.test.tsTest plan
bun run build— no type errorsbun test ./src/group-queue.test.ts— 11/11 pass🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Tests