Skip to content
This repository was archived by the owner on Feb 21, 2026. It is now read-only.

feat: dual-lane queue for concurrent message and task execution#43

Merged
Peyton-Spencer merged 1 commit intomainfrom
feat/dual-lane-queue
Feb 14, 2026
Merged

feat: dual-lane queue for concurrent message and task execution#43
Peyton-Spencer merged 1 commit intomainfrom
feat/dual-lane-queue

Conversation

@Peyton-Spencer
Copy link
Copy Markdown

@Peyton-Spencer Peyton-Spencer commented Feb 14, 2026

Summary

  • Dual-lane GroupQueue: Messages and tasks now run on independent lanes per group — a 30-40 min scheduled task no longer blocks user messages
  • IPC input isolation: Task containers mount input-task/ as their IPC input dir, preventing cross-lane message interference
  • Context injection: Messaging agent gets [Background Task Running: "..." — started N min ago] prepended to its prompt so it can inform users
  • Concurrency controls: MAX_TASK_CONTAINERS limits task parallelism while always reserving capacity for messages; global MAX_CONCURRENT_CONTAINERS still caps total containers
  • Isolated task sessions: Tasks always use fresh sessions to prevent session ID conflicts with the message container
  • All backends updated: closeStdin() accepts optional inputSubdir param across local, Sprites, Daytona, Railway, and Hetzner backends

Files changed

File Change
src/group-queue.ts Dual-lane state (message + task), independent drain logic
src/config.ts MAX_TASK_CONTAINERS constant
src/backends/types.ts closeStdin optional inputSubdir param
src/backends/local-backend.ts input-task/ dir creation, task mount isolation
src/backends/*.ts All backends: inputSubdir param on closeStdin
src/task-scheduler.ts Forced isolated sessions, promptPreview for context injection, task lane params
src/db.ts advanceTaskNextRun() for pre-enqueue next_run advancement
src/index.ts Context injection in processGroupMessages(), lane params
src/group-queue.test.ts 11 tests covering dual-lane concurrency, isolation, limits, and context tracking

Test plan

  • bun run build — no type errors
  • bun test ./src/group-queue.test.ts — 11/11 pass
  • Manual: start service, trigger scheduled task, send message — message gets immediate response
  • Verify logs show two containers for same group simultaneously
  • Verify context injection: message response mentions background task

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added multi-lane concurrent execution separating message and task processing for improved responsiveness.
    • Introduced scheduled task advancement with configurable concurrency limits and prompt preview tracking.
    • Added active task context injection into prompts during concurrent operations.
  • Tests

    • Restructured test suite to verify message and task lane isolation, concurrency limits, and simultaneous lane activity.

Messages and scheduled tasks now run on independent lanes per group,
so a long-running background task never blocks user messages. Each
group can have both a message container and a task container active
simultaneously, with separate IPC input directories to prevent
cross-lane interference. The messaging agent gets context injection
about active background tasks so it can inform users.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Feb 14, 2026

📝 Walkthrough

Walkthrough

The PR introduces lane-based concurrency management that separates message and task processing into independent execution lanes with distinct container lifecycle management, per-lane concurrency limits, updated backend APIs to support configurable IPC subdirectories, and task scheduling with prompt context injection.

Changes

Cohort / File(s) Summary
Backend API Updates
src/backends/daytona-backend.ts, src/backends/hetzner-backend.ts, src/backends/railway-backend.ts, src/backends/sprites-backend.ts, src/backends/types.ts
Updated closeStdin method signatures across all backends to accept optional inputSubdir parameter (defaulting to 'input'), enabling configurable IPC subdirectory paths for lane-isolated task execution; interface definition in types.ts reflects the new signature.
Local Backend Lane Support
src/backends/local-backend.ts
Extended buildVolumeMounts with isScheduledTask parameter to conditionally mount per-task IPC subdirectories and specific top-level files; updated closeStdin to accept inputSubdir parameter for task-lane isolation.
Configuration & Database
src/config.ts, src/db.ts
Added new exported MAX_TASK_CONTAINERS constant for task-specific concurrency limits; introduced advanceTaskNextRun function to update scheduled task timing and completion status prior to enqueue.
Group Queue Core
src/group-queue.ts
Refactored queue to support dual execution lanes (message vs. task) with separate container state, process registration, and draining logic per lane; added ActiveTaskInfo tracking for prompt context injection; extended enqueueTask with promptPreview parameter; updated registerProcess and closeStdin to accept lane parameter.
Task Scheduler & Execution
src/task-scheduler.ts, src/index.ts
Updated task scheduler to use advanceTaskNextRun for cron-based scheduling, compute task-specific prompt previews, and invoke process callbacks with 'task' lane identifier; main entry point now injects active task context into prompts and passes 'message' lane to process registration.
Comprehensive Test Coverage
src/group-queue.test.ts
Restructured tests to verify lane isolation, separate concurrency limits for message vs. task lanes, per-lane activation/drainage behavior, task context tracking, and concurrent activity across both lanes.

Sequence Diagrams

sequenceDiagram
    participant User as User/API
    participant Queue as GroupQueue<br/>(Message Lane)
    participant QueueTask as GroupQueue<br/>(Task Lane)
    participant Backend as Backend
    participant DB as Database
    
    User->>Queue: sendMessage()
    activate Queue
    Queue->>Queue: Check message concurrency
    Queue->>Backend: registerProcess(lane='message')
    Backend->>Backend: Start container
    Queue->>Queue: Process messages
    deactivate Queue
    
    User->>QueueTask: enqueueTask(promptPreview)
    activate QueueTask
    QueueTask->>DB: advanceTaskNextRun()
    DB->>DB: Update next_run, status
    QueueTask->>QueueTask: Check task concurrency<br/>(MAX_TASK_CONTAINERS)
    QueueTask->>Backend: registerProcess(lane='task')
    Backend->>Backend: Start container<br/>(isolated IPC subdir)
    QueueTask->>QueueTask: Store ActiveTaskInfo
    QueueTask->>QueueTask: Execute task with context
    deactivate QueueTask
    
    Queue->>Queue: Inject active task context<br/>into message prompt
Loading
sequenceDiagram
    participant Scheduler as TaskScheduler
    participant Queue as GroupQueue
    participant Backend as Backend
    participant DB as Database
    
    Scheduler->>Scheduler: Check due tasks
    activate Scheduler
    Scheduler->>DB: Get scheduled_tasks
    Scheduler->>Scheduler: Compute next_run<br/>(cron/interval)
    Scheduler->>Scheduler: Generate promptPreview<br/>(Heartbeat or 100-char)
    Scheduler->>DB: advanceTaskNextRun()
    DB->>DB: Update next_run
    Scheduler->>Queue: enqueueTask(promptPreview,<br/>lane='task')
    activate Queue
    Queue->>Backend: registerProcess(lane='task')
    Backend->>Backend: Start container
    Queue->>Queue: Execute task
    deactivate Queue
    Scheduler->>Queue: closeStdin(lane='task')
    Queue->>Backend: Write sentinel to<br/>workspace/ipc/tasks/_close
    deactivate Scheduler
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

🐰 Two lanes of work now run with grace,
Messages and tasks keep their own pace,
Containers dance in perfect time,
With prompts injected, code sublime—
The scheduler's heartbeat ticks along,
While queues keep everything strong!

🚥 Pre-merge checks | ✅ 2 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 20.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Merge Conflict Detection ⚠️ Warning ❌ Merge conflicts detected (12 files):

⚔️ src/backends/daytona-backend.ts (content)
⚔️ src/backends/hetzner-backend.ts (content)
⚔️ src/backends/local-backend.ts (content)
⚔️ src/backends/railway-backend.ts (content)
⚔️ src/backends/sprites-backend.ts (content)
⚔️ src/backends/types.ts (content)
⚔️ src/config.ts (content)
⚔️ src/db.ts (content)
⚔️ src/group-queue.test.ts (content)
⚔️ src/group-queue.ts (content)
⚔️ src/index.ts (content)
⚔️ src/task-scheduler.ts (content)

These conflicts must be resolved before merging into main.
Resolve conflicts locally and push changes to this branch.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main change: introducing a dual-lane queue architecture that enables concurrent execution of messages and tasks, which is the core feature across all modified files.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/dual-lane-queue
⚔️ Resolve merge conflicts (beta)
  • Auto-commit resolved conflicts to branch feat/dual-lane-queue
  • Create stacked PR with resolved conflicts
  • Post resolved changes as copyable diffs in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@omarzanji omarzanji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Approved - Excellent Architecture Improvement!

This is a major responsiveness upgrade for NanoClaw. Great work!

Key Improvements

Dual-Lane Queue Architecture 🎯

  • ✅ Message lane + Task lane run independently per group
  • ✅ Long-running tasks (30-40 min) no longer block user messages
  • ✅ Immediate user message processing even during background tasks
  • ✅ Proper resource limits (MAX_TASK_CONTAINERS) with message priority

Isolation & Safety 🔒

  • ✅ Task containers mount input-task/ preventing cross-lane interference
  • ✅ Tasks use fresh sessions (no session ID conflicts)
  • ✅ IPC input isolation between message and task lanes
  • ✅ All backends updated (local, Sprites, Daytona, Railway, Hetzner)

Context Awareness 💡

  • ✅ Message agent knows when task is running
  • ✅ Context injection: "[Background Task Running: ... — started N min ago]"
  • ✅ Can inform users about ongoing background work

Testing

  • 11/11 tests pass
  • Comprehensive coverage of dual-lane concurrency
  • Tests verify isolation, limits, and context tracking
  • No type errors

Code Quality

  • Clean separation of concerns (message vs task state)
  • Backward compatible (optional inputSubdir param)
  • Well-documented state interfaces
  • Proper error handling and retry logic

Real-World Impact

Before: User sends message during 30-min task → waits 30 minutes
After: User sends message during 30-min task → instant response! 🚀

This solves a critical UX issue. Ready to merge!

@Peyton-Spencer Peyton-Spencer merged commit d0f5ea3 into main Feb 14, 2026
1 of 3 checks passed
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@src/index.ts`:
- Around line 895-896: onProcess currently calls queue.registerProcess(...,
undefined, ...) leaving taskBackend unset so closeStdin uses local fallback;
thread the actual backend from task-scheduler's runTask into onProcess (or pass
the backend instance directly to registerProcess) so queue.registerProcess
receives the real taskBackend instead of undefined; update the call site in
runTask to supply the backend argument and adjust
onProcess/queue.registerProcess signatures to accept and store taskBackend
(references: onProcess, queue.registerProcess, runTask, taskBackend,
closeStdin).

In `@src/task-scheduler.ts`:
- Around line 360-378: Pre-advancing next_run in the scheduling block causes
one-shot tasks to be lost if the process crashes before execution; instead, set
a "running" state on the task before enqueuing to prevent duplicate discovery
and only update next_run (or clear it for one-shot) after runTask completes.
Modify the logic around advanceTaskNextRun(currentTask.id, nextRun) to first
mark the task as running (e.g., setTaskRunning or
updateTaskStatus(currentTask.id, 'running')) then enqueue, and after runTask
finishes successfully update next_run/clear it (or compute and persist the cron
nextRun post-execution); ensure any cron tasks still compute and persist their
future next_run atomically or by using a transaction that sets running plus
next_run as needed.
🧹 Nitpick comments (4)
src/backends/sprites-backend.ts (1)

251-252: Consider parameterizing the stale sentinel cleanup for task lane support.

The stale close sentinel cleanup is hardcoded to /workspace/ipc/input/_close, but closeStdin now supports dynamic inputSubdir (e.g., input-task for scheduled tasks). If Sprites backend is used for scheduled tasks, the task lane's stale sentinel at /workspace/ipc/input-task/_close won't be cleaned up before a task run.

♻️ Suggested fix

Consider passing isScheduledTask to runAgent context and cleaning up the appropriate sentinel:

     // Clean up any stale close sentinel
-    try { await sprite.exec('rm -f /workspace/ipc/input/_close'); } catch { /* ignore */ }
+    const inputSubdir = input.isScheduledTask ? 'input-task' : 'input';
+    try { await sprite.exec(`rm -f /workspace/ipc/${inputSubdir}/_close`); } catch { /* ignore */ }
src/backends/daytona-backend.ts (1)

219-221: Same consideration as Sprites: stale sentinel cleanup is hardcoded.

The cleanup at line 220 is hardcoded to /workspace/ipc/input/_close, but scheduled tasks would use input-task. Consider parameterizing this based on input.isScheduledTask if Daytona is used for scheduled tasks.

src/group-queue.test.ts (1)

30-58: Avoid fixed 10ms waits to reduce test flakiness.

These timing-based sleeps can be racy under CI load; consider waiting on a resolver/condition instead of a fixed delay.

src/group-queue.ts (1)

226-233: Consider returning a defensive copy from getActiveTaskInfo.

Returning the internal object allows external mutation; a shallow copy avoids accidental state corruption.

♻️ Suggested tweak
-    const state = this.groups.get(groupJid);
-    return state?.activeTaskInfo ?? null;
+    const state = this.groups.get(groupJid);
+    return state?.activeTaskInfo ? { ...state.activeTaskInfo } : null;

Comment thread src/index.ts
Comment on lines +895 to 896
onProcess: (groupJid, proc, containerName, groupFolder, lane) => queue.registerProcess(groupJid, proc, containerName, groupFolder, undefined, lane),
sendMessage: async (jid, rawText) => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Task containers never register their backend, so closeStdin can’t reach remote tasks.

Line 895 passes undefined as the backend when registering task processes. That leaves taskBackend unset, so queue.closeStdin(chatJid, 'task') falls back to local filesystem writes, which won’t signal remote backends. Please thread the backend from task-scheduler’s runTask into onProcess or register the process directly with the backend instance.

🤖 Prompt for AI Agents
In `@src/index.ts` around lines 895 - 896, onProcess currently calls
queue.registerProcess(..., undefined, ...) leaving taskBackend unset so
closeStdin uses local fallback; thread the actual backend from task-scheduler's
runTask into onProcess (or pass the backend instance directly to
registerProcess) so queue.registerProcess receives the real taskBackend instead
of undefined; update the call site in runTask to supply the backend argument and
adjust onProcess/queue.registerProcess signatures to accept and store
taskBackend (references: onProcess, queue.registerProcess, runTask, taskBackend,
closeStdin).

Comment thread src/task-scheduler.ts
Comment on lines +360 to +378
// Advance next_run BEFORE enqueuing so this task isn't
// re-discovered on subsequent ticks while it's running/queued.
// 'once' tasks (no recurrence) get next_run set to null which
// also removes them from getDueTasks results.
let nextRun: string | null = null;
if (currentTask.schedule_type === 'cron') {
try {
const interval = CronExpressionParser.parse(currentTask.schedule_value, { tz: TIMEZONE });
nextRun = interval.next().toISOString();
} catch {
// Invalid cron — leave null so it completes as a one-shot
}
} else if (currentTask.schedule_type === 'interval') {
const ms = parseInt(currentTask.schedule_value, 10);
if (!isNaN(ms) && ms > 0) {
nextRun = new Date(Date.now() + ms).toISOString();
}
}
advanceTaskNextRun(currentTask.id, nextRun);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Pre-advancing next_run can drop one-shot tasks on crash.

Line 360 advances next_run (and marks tasks completed when null) before execution. If the process crashes after this update but before the task runs, one-shot tasks are lost and won’t retry. Consider tracking a “running” status or only marking completed after runTask finishes, while still preventing duplicate discovery.

🤖 Prompt for AI Agents
In `@src/task-scheduler.ts` around lines 360 - 378, Pre-advancing next_run in the
scheduling block causes one-shot tasks to be lost if the process crashes before
execution; instead, set a "running" state on the task before enqueuing to
prevent duplicate discovery and only update next_run (or clear it for one-shot)
after runTask completes. Modify the logic around
advanceTaskNextRun(currentTask.id, nextRun) to first mark the task as running
(e.g., setTaskRunning or updateTaskStatus(currentTask.id, 'running')) then
enqueue, and after runTask finishes successfully update next_run/clear it (or
compute and persist the cron nextRun post-execution); ensure any cron tasks
still compute and persist their future next_run atomically or by using a
transaction that sets running plus next_run as needed.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants