Skip to content

feat(resilience): add rate limiting, circuit breaker, and backpressure#3036

Closed
rareba wants to merge 8 commits intozeroclaw-labs:masterfrom
rareba:feature/resilience
Closed

feat(resilience): add rate limiting, circuit breaker, and backpressure#3036
rareba wants to merge 8 commits intozeroclaw-labs:masterfrom
rareba:feature/resilience

Conversation

@rareba
Copy link
Copy Markdown
Contributor

@rareba rareba commented Mar 8, 2026

Summary

  • Base branch target (master for all contributions): master
  • Problem: No built-in resilience primitives for protecting the runtime against overload, cascading provider failures, or burst traffic.
  • Why it matters: Production deployments need rate limiting, circuit breaking, and backpressure to maintain stability under adverse conditions.
  • What changed:
    • Add token-bucket rate limiter (src/security/rate_limiter.rs) with per-key tracking, configurable burst/window, and bounded key cardinality
    • Add circuit breaker (src/providers/circuit_breaker.rs) with Closed/Open/HalfOpen state machine for wrapping fallible async provider calls
    • Add backpressure controller (src/agent/backpressure.rs) with priority-based adaptive load shedding (Critical, High, Normal, Low, Background)
    • Add ResilienceConfig to src/config/schema.rs with #[serde(default)] on all fields for full backward compatibility
    • Wire resilience rate limiter into the gateway webhook handler (opt-in via [resilience] rate_limit_enabled = true)
  • What did not change (scope boundary): No external crates added — uses parking_lot::Mutex and std::sync::atomic. All resilience features are disabled by default (safe for existing deployments).

Label Snapshot (required)

  • Risk label (risk: low|medium|high): high
  • Size label (size: XS|S|M|L|XL, auto-managed/read-only): L
  • Scope labels: security, provider, agent, gateway, config
  • Module labels: security: rate_limiter, provider: circuit_breaker, agent: backpressure
  • Contributor tier label: auto-managed
  • If any auto-label is incorrect, note requested correction: N/A

Change Metadata

  • Change type (bug|feature|refactor|docs|security|chore): feature
  • Primary scope (runtime|provider|channel|memory|security|ci|docs|multi): multi (security + provider + agent + gateway)

Linked Issue

  • Closes #
  • Related # N/A
  • Depends on #
  • Supersedes #

Supersede Attribution (required when Supersedes # is used)

N/A

Validation Evidence (required)

Commands and result summary:

cargo fmt --all -- --check   # pass
cargo clippy --all-targets -- -D warnings  # pass
cargo test                   # pass
  • Evidence provided (test/log/trace/screenshot/perf): Rate limiter tests (capacity enforcement, burst, independent keys, eviction, retry_after, zero-capacity edge case); circuit breaker tests (closed-on-success, opens-after-threshold, resets-on-success, half-open transitions, half-open failure reopens); backpressure tests (admission below threshold, depth tracking, priority-based shedding, critical-never-shed)
  • If any command is intentionally skipped, explain why: N/A

Security Impact (required)

  • New permissions/capabilities? Yes — rate limiter can reject requests with 429 Too Many Requests
  • New external network calls? No
  • Secrets/tokens handling changed? No
  • File system access scope changed? No
  • If any Yes, describe risk and mitigation: Rate limiting is opt-in (rate_limit_enabled = false by default). Configurable requests_per_minute and burst parameters. Provides retry_after header for graceful client backoff.

Privacy and Data Hygiene (required)

  • Data-hygiene status (pass|needs-follow-up): pass
  • Redaction/anonymization notes: Rate limiter tracks keys (e.g., IP or user ID) in memory only; no persistent storage.
  • Neutral wording confirmation: Confirmed — uses project-native labels throughout.

Compatibility / Migration

  • Backward compatible? Yes
  • Config/env changes? Yes — adds [resilience] config section with #[serde(default)]
  • Migration needed? No
  • If yes, exact upgrade steps: N/A

i18n Follow-Through (required when docs or user-facing wording changes)

  • i18n follow-through triggered? No

Human Verification (required)

What was personally validated beyond CI:

  • Verified scenarios: Unit tests for all three resilience primitives — rate limiter, circuit breaker, backpressure controller
  • Edge cases checked: Zero-capacity rate limiter, critical-priority-never-shed, half-open circuit breaker failure re-opens, key eviction under cardinality bounds
  • What was not verified: Load testing under real traffic; integration with actual provider failures

Side Effects / Blast Radius (required)

  • Affected subsystems/workflows: Gateway webhook handler (rate limiter check added), agent loop (backpressure depth tracking), provider module (circuit breaker submodule), security module (rate limiter submodule), config schema
  • Potential unintended effects: None when disabled (default). When enabled, rate limiter may reject legitimate requests if limits are too low; circuit breaker may block provider calls during recovery window.
  • Guardrails/monitoring for early detection: All features gated behind config toggles; rate limiter returns retry_after for client-side backoff; circuit breaker state transitions are deterministic and testable.

Agent Collaboration Notes (recommended)

  • Agent tools used (if any): Claude Code
  • Workflow/plan summary (if any): Implemented rate limiter, then circuit breaker, then backpressure controller, then config and gateway wiring.
  • Verification focus: State machine correctness (circuit breaker), priority ordering (backpressure), token bucket accuracy (rate limiter).
  • Confirmation: naming + architecture boundaries followed (AGENTS.md + CONTRIBUTING.md): Yes

Rollback Plan (required)

  • Fast rollback command/path: git revert <commit-sha>
  • Feature flags or config toggles (if any): [resilience] rate_limit_enabled, circuit_breaker threshold, backpressure max_queue_depth — all disabled/inactive by default
  • Observable failure symptoms: Unexpected 429 responses from gateway; circuit breaker stuck in Open state blocking all provider calls; backpressure shedding normal-priority requests prematurely.

Risks and Mitigations

  • Risk: Rate limiter key cardinality could grow unbounded without eviction.
    • Mitigation: Bounded key cardinality with LRU-style eviction implemented.
  • Risk: Circuit breaker recovery timeout too aggressive or too conservative for specific providers.
    • Mitigation: recovery_timeout is configurable per deployment; half-open state allows gradual recovery.
  • Risk: Backpressure priority classification may not match all use cases.
    • Mitigation: Five priority levels (Critical through Background) cover common patterns; Critical is never shed.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 8, 2026

Note

.coderabbit.yaml has unrecognized properties

CodeRabbit is using all valid settings from your configuration. Unrecognized properties (listed below) have been ignored and may indicate typos or deprecated fields that can be removed.

⚠️ Parsing warnings (1)
Validation error: Unrecognized key(s) in object: 'tools', 'path_filters', 'review_instructions'
⚙️ Configuration instructions
  • Please see the configuration documentation for more information.
  • You can also validate your configuration using the online YAML validator.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json
📝 Walkthrough

Walkthrough

Adds a resilience subsystem: backpressure (priority-based shedding), per-key token-bucket rate limiting, and an async circuit breaker; wires resilience config into Config, gateway, onboarding, and the agent run loop.

Changes

Cohort / File(s) Summary
Backpressure
src/agent/backpressure.rs, src/agent/mod.rs, src/agent/loop_.rs
New priority enum, BackpressureController, LoadShed error and BackpressureConfig; admit/complete/depth APIs with atomic queue-depth tracking. Integrated into agent run loop and single-message flow with admit/completion calls and tests.
Rate Limiter
src/security/rate_limiter.rs, src/security/mod.rs, src/gateway/mod.rs
New TokenBucketRateLimiter with per-key buckets, eviction, refill logic, RateLimited error and RateLimiterConfig; wired into Gateway AppState as optional resilience_rate_limiter and checked in webhook handling.
Circuit Breaker
src/providers/circuit_breaker.rs, src/providers/mod.rs, src/agent/loop_.rs
New async CircuitBreaker (Closed/Open/HalfOpen), CircuitOpen error, CircuitBreakerConfig, and CircuitBreakerProvider wrapper for Provider implementations. Integrated optionally in agent loop to wrap providers; includes state transitions and unit tests.
Configuration
src/config/schema.rs, src/config/mod.rs, src/onboard/wizard.rs
Adds ResilienceConfig to schema and Config (serde defaults and Default impl), validation rules for resilience fields, and initializes resilience defaults in onboarding and quick-setup flows.
Module Exports & Tests
src/agent/mod.rs, src/providers/mod.rs, src/security/mod.rs, tests...*
Public module exports added for backpressure, circuit_breaker, and rate_limiter. Extensive unit tests added for the new subsystems; test setup adjustments to accommodate new AppState field.

Sequence Diagrams

sequenceDiagram
    participant Client
    participant Gateway
    participant RateLimiter
    participant WebhookHandler

    Client->>Gateway: HTTP webhook request (api_key)
    Gateway->>RateLimiter: check(api_key)
    alt Token Available
        RateLimiter-->>Gateway: Ok
        Gateway->>WebhookHandler: Process request
        WebhookHandler-->>Gateway: 200 OK
        Gateway-->>Client: 200 OK
    else Rate Limited
        RateLimiter-->>Gateway: Err(RateLimited { retry_after })
        Gateway-->>Client: 429 Too Many Requests (Retry-After)
    end
Loading
sequenceDiagram
    participant Request
    participant BackpressureCtl
    participant Queue

    Request->>BackpressureCtl: admit(priority)
    alt Critical
        BackpressureCtl->>Queue: increment depth
        BackpressureCtl-->>Request: Ok
    else Non-Critical
        BackpressureCtl->>BackpressureCtl: compute load = depth/max
        alt load < threshold(priority)
            BackpressureCtl->>Queue: increment depth
            BackpressureCtl-->>Request: Ok
        else
            BackpressureCtl-->>Request: Err(LoadShed)
        end
    end
    Request->>BackpressureCtl: complete()
    BackpressureCtl->>Queue: decrement depth (saturating)
Loading
sequenceDiagram
    participant Client
    participant CircuitBreaker
    participant Provider

    Client->>CircuitBreaker: call(async operation)
    CircuitBreaker->>CircuitBreaker: check state
    alt Closed
        CircuitBreaker->>Provider: execute operation
        alt Success
            CircuitBreaker->>CircuitBreaker: reset failures
            CircuitBreaker-->>Client: return result
        else Failure
            CircuitBreaker->>CircuitBreaker: increment failures
            alt failures >= threshold
                CircuitBreaker->>CircuitBreaker: open circuit (set opened_at)
                CircuitBreaker-->>Client: Err(CircuitOpen { retry_after })
            else
                CircuitBreaker-->>Client: Err(inner error)
            end
        end
    else Open
        CircuitBreaker->>CircuitBreaker: if timeout elapsed -> HalfOpen
        CircuitBreaker-->>Client: Err(CircuitOpen { retry_after })
    else HalfOpen
        CircuitBreaker->>Provider: execute probe (limited by probes)
        alt Probe success -> enough successes
            CircuitBreaker->>CircuitBreaker: close circuit
            CircuitBreaker-->>Client: return result
        else Probe failure
            CircuitBreaker->>CircuitBreaker: open circuit
            CircuitBreaker-->>Client: Err(CircuitOpen)
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

Suggested labels

size: XL, risk: high, core, security, provider, agent, config: core, gateway, tests

Suggested reviewers

  • JordanTheJet
  • theonlyhennygod
🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed Title clearly and concisely summarizes the main feature additions: rate limiting, circuit breaker, and backpressure components are directly reflected in the changeset.
Description check ✅ Passed Description covers the primary changes, design philosophy, test coverage, and risk assessment; most required template sections are addressed or implicitly complete (features are opt-in, configs backward-compatible).
Docstring Coverage ✅ Passed Docstring coverage is 87.50% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (4)
src/gateway/mod.rs (2)

1631-1631: Consider adding an integration test for resilience rate limiter in webhook handler.

All test updates correctly set resilience_rate_limiter: None, but there's no test exercising the enabled path where the resilience rate limiter rejects a request in handle_webhook. Based on learnings: "For security/runtime/gateway changes... add/update tests or validation evidence for failure modes and boundaries."

A minimal test would verify that when resilience_rate_limiter is enabled and the limit is exceeded, the webhook returns 429 with the correct retry_after value.

Would you like me to generate an integration test for the resilience rate limiter rejection path?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/gateway/mod.rs` at line 1631, Add an integration test that enables the
resilience_rate_limiter and simulates an exceeded limit so handle_webhook
returns a 429 and provides the correct retry_after value; specifically,
construct the handler/config with resilience_rate_limiter set to an active
limiter (not None), trigger a webhook request through handle_webhook (or the
same HTTP entry point used by other tests), mock or configure the limiter to
indicate "limit exceeded" for that request, then assert the response status is
429 and that the response contains the expected retry_after header/body value
matching the limiter's backoff. Use the existing test harness and types around
resilience_rate_limiter and handle_webhook to locate where to plug in the
enabled limiter and the webhook request invocation.

650-663: Use existing constants instead of hardcoded values.

Lines 656-657 use hardcoded 60 and 10_000 when module-level constants already exist at lines 51 and 53: RATE_LIMIT_WINDOW_SECS and RATE_LIMIT_MAX_KEYS_DEFAULT.

♻️ Proposed fix to use existing constants
         resilience_rate_limiter: if config.resilience.rate_limit_enabled {
             Some(Arc::new(
                 crate::security::rate_limiter::TokenBucketRateLimiter::new(
                     crate::security::rate_limiter::RateLimiterConfig {
                         requests_per_window: config.resilience.requests_per_minute,
                         burst: config.resilience.burst,
-                        window: Duration::from_secs(60),
-                        max_keys: 10_000,
+                        window: Duration::from_secs(RATE_LIMIT_WINDOW_SECS),
+                        max_keys: RATE_LIMIT_MAX_KEYS_DEFAULT,
                     },
                 ),
             ))
         } else {
             None
         },

As per coding guidelines: "Use Rust standard casing consistently... constants/statics SCREAMING_SNAKE_CASE" - maintaining consistency with existing constants improves maintainability.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/gateway/mod.rs` around lines 650 - 663, The code instantiates
TokenBucketRateLimiter::new with a RateLimiterConfig that uses hardcoded
Duration::from_secs(60) and max_keys: 10_000; replace those magic literals with
the module constants RATE_LIMIT_WINDOW_SECS and RATE_LIMIT_MAX_KEYS_DEFAULT (use
Duration::from_secs(RATE_LIMIT_WINDOW_SECS) for the window) inside the
RateLimiterConfig passed to TokenBucketRateLimiter::new to keep consistent
SCREAMING_SNAKE_CASE constants usage.
src/config/schema.rs (2)

5260-5260: Add focused contract tests for [resilience].

These fixture updates keep older roundtrip tests compiling, but they still do not assert the new section's behavior. Please add a small test for “missing [resilience] uses defaults” and one explicit roundtrip/assertion test for the resilience keys themselves.

Based on learnings "Treat config schema as public API contract. Document defaults, compatibility impact, and migration/rollback path for all config key changes."

Also applies to: 5443-5443

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/config/schema.rs` at line 5260, Add two focused contract tests around
ResilienceConfig: one test that loads/serializes a config without a [resilience]
section and asserts that the parsed ResilienceConfig equals
ResilienceConfig::default() (reference the ResilienceConfig type and the
top-level config parse/deserialize function used in other tests), and a second
explicit roundtrip test that constructs a Config with a non-default
ResilienceConfig, serializes it to TOML, deserializes it back, and asserts
equality on the resilience fields (use the same serialize/deserialize helpers
and equality/assert macros used elsewhere). Ensure tests cover each public
resilience key so the contract documents defaults and that future schema changes
will be caught.

451-488: Avoid a second source of truth for resilience defaults.

These helpers duplicate values already defined in src/security/rate_limiter.rs, src/providers/circuit_breaker.rs, and src/agent/backpressure.rs. They match now, but the next default tweak can desync the schema contract from runtime behavior.

As per coding guidelines "Duplicate small, local logic when it preserves clarity; extract shared utilities only after repeated, stable patterns (rule-of-three); when extracting, preserve module boundaries and avoid hidden coupling".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/config/schema.rs` around lines 451 - 488, The default helper functions
(default_resilience_requests_per_minute, default_resilience_burst,
default_resilience_failure_threshold, default_resilience_recovery_timeout_secs,
default_resilience_half_open_max, default_resilience_max_queue_depth,
default_resilience_graceful_shutdown_timeout_secs) duplicate defaults defined
elsewhere and should be removed; update ResilienceConfig::default to reference
the canonical defaults from their modules (e.g. use the rate limiter defaults
from src/security/rate_limiter.rs for requests_per_minute and burst, the circuit
breaker defaults from src/providers/circuit_breaker.rs for failure_threshold,
recovery_timeout_secs and half_open_max_requests, and the backpressure defaults
from src/agent/backpressure.rs for max_queue_depth and
graceful_shutdown_timeout_secs), import or re-export those constant names or
getter functions as needed, and delete the local default_* helper functions so
there is a single source of truth.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/agent/backpressure.rs`:
- Around line 94-128: The admit function currently reads self.current_depth then
later increments, causing a race that can exceed config.max_queue_depth; replace
the non-atomic snapshot-and-add with a CAS loop on self.current_depth using
compare_exchange (Ordering::AcqRel / Acquire) to atomically reserve a slot only
when depth < max_queue_depth and below the per-priority threshold (apply the
same CAS logic for Priority::Critical if you want critical to still bypass
shedding but must still respect the hard cap reservation), returning LoadShed on
failed reservation or threshold breach; also add a concurrent regression test
that sets max_queue_depth = 1 and spawns many threads that call admit to assert
self.current_depth never surpasses max_queue_depth and that load-shed errors are
returned instead of allowing over-capacity increments.

In `@src/config/schema.rs`:
- Around line 409-449: Config::validate() must reject degenerate circuit-breaker
settings instead of accepting zeros; add explicit validation for
ResilienceConfig fields: ensure circuit_breaker_failure_threshold >= 1,
circuit_breaker_recovery_timeout_secs >= 1, and
circuit_breaker_half_open_max_requests >= 1 (and optionally enforce sensible
upper bounds if desired), and return/bail with a clear error message naming the
offending field when any check fails; update the validation logic that currently
inspects ResilienceConfig to perform these checks and use explicit error returns
(e.g., bail!/Err) rather than silently normalizing or leaving the config running
in an unsafe state.

In `@src/providers/circuit_breaker.rs`:
- Around line 96-106: The constructor CircuitBreaker::new should validate
config.half_open_max_requests and reject zero to avoid an unrecovable HalfOpen
state; update CircuitBreaker::new to check config.half_open_max_requests > 0
(either by returning a Result with an Err describing invalid config or by
asserting/panicking with a clear message) and add a unit test that constructs
CircuitBreakerConfig with half_open_max_requests = 0 and verifies the
constructor rejects it (pinning the boundary behavior). Ensure references:
CircuitBreaker::new and CircuitBreakerConfig.half_open_max_requests.

In `@src/security/rate_limiter.rs`:
- Around line 78-83: The constructor RateLimiter::new currently converts a zero
Duration window into an enormous refill_rate; instead validate the
RateLimiterConfig early: if config.window == Duration::ZERO and
(config.requests_per_window + config.burst) > 0 return an error (e.g., change
new to return Result<Self, RateLimiterError> or panic/assert with a clear
message) rather than computing f64::MAX; update the logic around refill_rate to
assume non-zero window after validation, and add a unit test that constructs a
config with window == Duration::ZERO and positive requests_per_window or burst
and asserts that new fails with the expected error.

---

Nitpick comments:
In `@src/config/schema.rs`:
- Line 5260: Add two focused contract tests around ResilienceConfig: one test
that loads/serializes a config without a [resilience] section and asserts that
the parsed ResilienceConfig equals ResilienceConfig::default() (reference the
ResilienceConfig type and the top-level config parse/deserialize function used
in other tests), and a second explicit roundtrip test that constructs a Config
with a non-default ResilienceConfig, serializes it to TOML, deserializes it
back, and asserts equality on the resilience fields (use the same
serialize/deserialize helpers and equality/assert macros used elsewhere). Ensure
tests cover each public resilience key so the contract documents defaults and
that future schema changes will be caught.
- Around line 451-488: The default helper functions
(default_resilience_requests_per_minute, default_resilience_burst,
default_resilience_failure_threshold, default_resilience_recovery_timeout_secs,
default_resilience_half_open_max, default_resilience_max_queue_depth,
default_resilience_graceful_shutdown_timeout_secs) duplicate defaults defined
elsewhere and should be removed; update ResilienceConfig::default to reference
the canonical defaults from their modules (e.g. use the rate limiter defaults
from src/security/rate_limiter.rs for requests_per_minute and burst, the circuit
breaker defaults from src/providers/circuit_breaker.rs for failure_threshold,
recovery_timeout_secs and half_open_max_requests, and the backpressure defaults
from src/agent/backpressure.rs for max_queue_depth and
graceful_shutdown_timeout_secs), import or re-export those constant names or
getter functions as needed, and delete the local default_* helper functions so
there is a single source of truth.

In `@src/gateway/mod.rs`:
- Line 1631: Add an integration test that enables the resilience_rate_limiter
and simulates an exceeded limit so handle_webhook returns a 429 and provides the
correct retry_after value; specifically, construct the handler/config with
resilience_rate_limiter set to an active limiter (not None), trigger a webhook
request through handle_webhook (or the same HTTP entry point used by other
tests), mock or configure the limiter to indicate "limit exceeded" for that
request, then assert the response status is 429 and that the response contains
the expected retry_after header/body value matching the limiter's backoff. Use
the existing test harness and types around resilience_rate_limiter and
handle_webhook to locate where to plug in the enabled limiter and the webhook
request invocation.
- Around line 650-663: The code instantiates TokenBucketRateLimiter::new with a
RateLimiterConfig that uses hardcoded Duration::from_secs(60) and max_keys:
10_000; replace those magic literals with the module constants
RATE_LIMIT_WINDOW_SECS and RATE_LIMIT_MAX_KEYS_DEFAULT (use
Duration::from_secs(RATE_LIMIT_WINDOW_SECS) for the window) inside the
RateLimiterConfig passed to TokenBucketRateLimiter::new to keep consistent
SCREAMING_SNAKE_CASE constants usage.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 19c3de7d-6771-4037-93d0-3c7446fef4bc

📥 Commits

Reviewing files that changed from the base of the PR and between f7fefd4 and a46ca6b.

📒 Files selected for processing (10)
  • src/agent/backpressure.rs
  • src/agent/mod.rs
  • src/config/mod.rs
  • src/config/schema.rs
  • src/gateway/mod.rs
  • src/onboard/wizard.rs
  • src/providers/circuit_breaker.rs
  • src/providers/mod.rs
  • src/security/mod.rs
  • src/security/rate_limiter.rs

Comment thread src/agent/backpressure.rs
Comment on lines +94 to +128
pub fn admit(&self, priority: Priority) -> Result<(), LoadShed> {
let depth = self.current_depth.load(Ordering::Relaxed);

// Critical requests are never shed.
if priority == Priority::Critical {
self.current_depth.fetch_add(1, Ordering::Relaxed);
return Ok(());
}

// Hard cap: reject everything non-critical at max.
if depth >= self.config.max_queue_depth {
return Err(LoadShed {
queue_depth: depth,
priority,
});
}

let load_fraction = depth as f64 / self.config.max_queue_depth as f64;
let threshold = match priority {
Priority::Background => self.config.background_shed_threshold,
Priority::Low => self.config.low_shed_threshold,
Priority::Normal => self.config.normal_shed_threshold,
Priority::High => self.config.high_shed_threshold,
Priority::Critical => unreachable!(),
};

if load_fraction >= threshold {
return Err(LoadShed {
queue_depth: depth,
priority,
});
}

self.current_depth.fetch_add(1, Ordering::Relaxed);
Ok(())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

cd /tmp && find . -name "backpressure.rs" -type f 2>/dev/null | head -5

Repository: zeroclaw-labs/zeroclaw

Length of output: 48


🏁 Script executed:

fd backpressure.rs

Repository: zeroclaw-labs/zeroclaw

Length of output: 91


🏁 Script executed:

cat -n src/agent/backpressure.rs | head -150

Repository: zeroclaw-labs/zeroclaw

Length of output: 6021


🏁 Script executed:

wc -l src/agent/backpressure.rs

Repository: zeroclaw-labs/zeroclaw

Length of output: 95


🏁 Script executed:

git ls-files "**/*backpressure*"

Repository: zeroclaw-labs/zeroclaw

Length of output: 91


🏁 Script executed:

cat -n src/agent/backpressure.rs | sed -n '147,242p'

Repository: zeroclaw-labs/zeroclaw

Length of output: 3403


🏁 Script executed:

rg "BackpressureController" --type rust -B 2 -A 2

Repository: zeroclaw-labs/zeroclaw

Length of output: 1556


🏁 Script executed:

rg "BackpressureController" --type rust -l

Repository: zeroclaw-labs/zeroclaw

Length of output: 91


🏁 Script executed:

rg "admit\(" src/agent --type rust -B 3 -A 3

Repository: zeroclaw-labs/zeroclaw

Length of output: 4138


🏁 Script executed:

rg "use.*backpressure" --type rust

Repository: zeroclaw-labs/zeroclaw

Length of output: 48


🏁 Script executed:

rg "BackpressureController\|from.*backpressure" src --type rust

Repository: zeroclaw-labs/zeroclaw

Length of output: 48


🏁 Script executed:

grep -r "backpressure" src --include="*.rs" | grep -v "^src/agent/backpressure.rs"

Repository: zeroclaw-labs/zeroclaw

Length of output: 701


🏁 Script executed:

rg "BackpressureController::new|BackpressureController\s*\{" --type rust

Repository: zeroclaw-labs/zeroclaw

Length of output: 353


🏁 Script executed:

rg "admit\(" src/agent --type rust | grep -v test | grep -v "^[^:]*:[[:space:]]*//"

Repository: zeroclaw-labs/zeroclaw

Length of output: 1517


🏁 Script executed:

ast-grep --pattern 'let $depth = self.current_depth.load($_);
$$$
self.current_depth.fetch_add($_, $_);'

Repository: zeroclaw-labs/zeroclaw

Length of output: 48


🏁 Script executed:

cat src/agent/backpressure.rs | sed -n '94,129p' | cat -n

Repository: zeroclaw-labs/zeroclaw

Length of output: 1568


Implement atomic admission to prevent exceeding max_queue_depth under contention.

The admit path has a classic load-check-increment race: Line 95 snapshots current_depth, checks thresholds, then Line 127 increments later. Under concurrent load, multiple threads can observe the same depth, all pass the checks, and all increment, pushing the queue past max_queue_depth. The Critical-priority path (lines 98–101) is equally vulnerable—it increments without atomic reservation.

Replace with a CAS loop using compare_exchange(depth, depth + 1, Ordering::AcqRel, Ordering::Acquire) to atomically reserve depth before applying threshold checks. Add a concurrent regression test with max_queue_depth = 1 spawning multiple threads to verify the hard cap is never exceeded.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/backpressure.rs` around lines 94 - 128, The admit function
currently reads self.current_depth then later increments, causing a race that
can exceed config.max_queue_depth; replace the non-atomic snapshot-and-add with
a CAS loop on self.current_depth using compare_exchange (Ordering::AcqRel /
Acquire) to atomically reserve a slot only when depth < max_queue_depth and
below the per-priority threshold (apply the same CAS logic for
Priority::Critical if you want critical to still bypass shedding but must still
respect the hard cap reservation), returning LoadShed on failed reservation or
threshold breach; also add a concurrent regression test that sets
max_queue_depth = 1 and spawns many threads that call admit to assert
self.current_depth never surpasses max_queue_depth and that load-shed errors are
returned instead of allowing over-capacity increments.

Comment thread src/config/schema.rs
Comment on lines +96 to +106
pub fn new(config: CircuitBreakerConfig) -> Self {
Self {
config,
inner: Mutex::new(Inner {
state: CircuitState::Closed,
consecutive_failures: 0,
half_open_successes: 0,
half_open_in_flight: 0,
opened_at: None,
}),
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate half_open_max_requests before constructing the breaker.

If this is 0, the breaker trips once, transitions to HalfOpen, and then Line 144 rejects every call forever because the quota check is immediately true. That leaves the provider unable to recover after the first open. Please reject this value up front and pin it with a boundary test.

🧯 Suggested guard
 pub fn new(config: CircuitBreakerConfig) -> Self {
+    assert!(
+        config.half_open_max_requests > 0,
+        "half_open_max_requests must be greater than zero"
+    );
+
     Self {
         config,
         inner: Mutex::new(Inner {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/providers/circuit_breaker.rs` around lines 96 - 106, The constructor
CircuitBreaker::new should validate config.half_open_max_requests and reject
zero to avoid an unrecovable HalfOpen state; update CircuitBreaker::new to check
config.half_open_max_requests > 0 (either by returning a Result with an Err
describing invalid config or by asserting/panicking with a clear message) and
add a unit test that constructs CircuitBreakerConfig with half_open_max_requests
= 0 and verifies the constructor rejects it (pinning the boundary behavior).
Ensure references: CircuitBreaker::new and
CircuitBreakerConfig.half_open_max_requests.

Comment thread src/security/rate_limiter.rs Outdated
@rikitrader
Copy link
Copy Markdown

Thank you for the contribution! We appreciate the effort.

We are closing this PR because it introduces a standalone module with no integration into the existing codebase — the new types/functions are not called by any existing code path. ZeroClaw follows a trait-driven architecture where new features must be wired through factory registration and have at least one active caller.

To reopen, please:

  1. Wire the feature into an existing subsystem (channel, provider, tool factory, or agent loop)
  2. Add integration tests that exercise the end-to-end flow
  3. Keep PR scope focused — one feature per PR, ideally under 500 lines

We welcome smaller, integrated contributions. See CLAUDE.md §7 for playbooks on adding providers, channels, tools, and peripherals.

@rikitrader
Copy link
Copy Markdown

Review: This PR introduces a standalone module that is not wired into any existing code path. The new types/functions have no callers in the codebase.

To make this mergeable:

  1. Wire into an existing subsystem (channel, provider, tool factory, or agent loop)
  2. Add integration tests exercising the end-to-end flow
  3. Keep scope under 500 lines per PR

See CLAUDE.md §7 for playbooks. Recommend closing and resubmitting as smaller, integrated PRs.

@rareba
Copy link
Copy Markdown
Contributor Author

rareba commented Mar 9, 2026

Thanks for the review feedback. I've addressed the integration concerns:

Changes pushed:

  • Backpressure controller is now wired into the agent message processing loop (src/agent/loop_.rs). Before each message, admit(Priority::Normal) is called; on load shed, the message is rejected. After processing, complete() decrements queue depth.
  • Circuit breaker is now a Provider decorator (CircuitBreakerProvider) that wraps provider calls. When config.resilience.circuit_breaker_enabled is true, the provider is wrapped automatically.
  • Rate limiter was already wired in the gateway middleware.
  • Added integration tests for both backpressure lifecycle and circuit breaker open/close behavior.

All tests pass. cargo check clean.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (2)
src/agent/backpressure.rs (1)

94-136: ⚠️ Potential issue | 🔴 Critical

Race condition allows exceeding max_queue_depth under concurrent load.

The admit path loads current_depth at line 95, performs threshold checks, then increments at line 135. Under concurrent requests, multiple threads can observe the same depth, all pass checks, and all increment — pushing the queue past max_queue_depth.

The Priority::Critical path (lines 98–101) is also unbounded — it increments without any capacity reservation.

Use a CAS loop with compare_exchange to atomically reserve a slot before applying threshold logic, and add a concurrent regression test with max_queue_depth = 1 spawning multiple threads to verify the hard cap is never exceeded.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/backpressure.rs` around lines 94 - 136, The admit() path has a
race: it reads current_depth then later increments, allowing concurrent admits
to exceed config.max_queue_depth (and Priority::Critical currently increments
unbounded); replace the non-atomic read-then-add with a CAS loop using
AtomicUsize::compare_exchange on current_depth inside admit() that repeatedly
reads cur, computes next = cur + 1, checks next against hard cap
(config.max_queue_depth == 0 => Err, next > max_queue_depth => Err) and checks
the priority-specific threshold using load_fraction = cur / max_queue_depth (or
use cur before increment) so you only reserve a slot when the checks pass, then
attempt compare_exchange(cur, next) and on success return Ok() (or Err(LoadShed)
when checks fail); treat Priority::Critical as still allowed to bypass
per-priority thresholds but not the hard cap (enforce max_queue_depth in the
same CAS loop); add a concurrent regression test that sets max_queue_depth = 1,
spawns many threads concurrently calling admit(), and asserts current_depth
never exceeds max_queue_depth and that Excess admits return Err(LoadShed).
src/providers/circuit_breaker.rs (1)

96-106: ⚠️ Potential issue | 🟠 Major

Still need to reject invalid breaker configs in new().

CircuitBreaker::new is public, so schema-level validation elsewhere does not protect direct callers here. In particular, half_open_max_requests == 0 leaves the breaker permanently rejecting probes at Line 144 after the first open, and recovery_timeout == Duration::ZERO is another degenerate state worth rejecting locally.

🧯 Minimal guard
 pub fn new(config: CircuitBreakerConfig) -> Self {
+    assert!(
+        config.half_open_max_requests > 0,
+        "half_open_max_requests must be greater than zero"
+    );
+    assert!(
+        config.recovery_timeout > Duration::ZERO,
+        "recovery_timeout must be greater than zero"
+    );
+
     Self {
         config,
         inner: Mutex::new(Inner {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/providers/circuit_breaker.rs` around lines 96 - 106, CircuitBreaker::new
must validate CircuitBreakerConfig and reject degenerate values instead of
constructing a broken breaker; change the signature of CircuitBreaker::new to
return Result<Self, CircuitBreakerConfigError> (or another crate error type),
validate fields like config.half_open_max_requests > 0 and
config.recovery_timeout > Duration::ZERO (and any other invariants you care
about, e.g., max_consecutive_failures > 0), and return an Err with a clear error
when validation fails; on success construct and return Ok(Self { config, inner:
Mutex::new(Inner { state: CircuitState::Closed, consecutive_failures: 0,
half_open_successes: 0, half_open_in_flight: 0, opened_at: None, }), }). Ensure
callers of CircuitBreaker::new are updated to handle the Result.
🧹 Nitpick comments (3)
src/agent/backpressure.rs (1)

155-293: Good functional test coverage, but missing concurrent regression test.

The tests validate admission semantics, priority-based shedding, and the admit/complete cycle. However, given the race condition in admit(), adding a concurrent test with multiple threads contending for a low max_queue_depth would help validate the fix and prevent regression.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/backpressure.rs` around lines 155 - 293, Add a concurrent
regression test in the tests module that stresses BackpressureController::admit
and complete under contention: create a BackpressureController via
make_controller with a small max_queue_depth (e.g., 2), wrap it in Arc and spawn
many threads that repeatedly call ctrl.admit(priority) and on success call
ctrl.complete(), tracking successes and failures; join threads and assert final
depth is 0, total admitted never exceeded max at any time (or that admitted
count <= total completes), and that non-critical requests are shed as
expected—this will exercise the admit() race and prevent regressions.
src/security/rate_limiter.rs (1)

77-78: Remove duplicate doc comment.

Line 77 and 78 contain the same doc comment text.

✂️ Suggested fix
 impl TokenBucketRateLimiter {
     /// Create a new rate limiter with the given configuration.
-    /// Create a new rate limiter with the given configuration.
     ///
     /// # Panics
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/security/rate_limiter.rs` around lines 77 - 78, There are two identical
doc comment lines above the constructor—remove the duplicate so only a single
doc comment remains for the rate limiter constructor (e.g., the doc above
RateLimiter::new or the new() function); keep one "Create a new rate limiter
with the given configuration." line and delete the redundant copy.
src/config/schema.rs (1)

221-223: Document the rollback path for [resilience] explicitly.

This new public config contract covers defaults and backward compatibility, but it still omits the migration/rollback story. A short note that removing [resilience] or leaving all flags false restores legacy behavior would make the contract complete for operators.

As per coding guidelines "For config/schema changes, treat keys as public contract: document defaults, compatibility impact, and migration/rollback path".

Also applies to: 401-408

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/config/schema.rs` around lines 221 - 223, Update the public doc comment
for the resilience field and the ResilienceConfig struct to include an explicit
migration/rollback note: state that removing the [resilience] table entirely or
leaving all resilience feature flags set to false will restore legacy behavior,
and document the default values used for each flag for backward compatibility;
apply the same explicit rollback sentence to the other related doc comments
referenced around the ResilienceConfig usage so operators see defaults,
compatibility impact, and the rollback path.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/agent/loop_.rs`:
- Around line 3350-3357: The branch that checks
config.resilience.backpressure_enabled only logs and then proceeds to call
agent_turn(...) without creating or passing any BackpressureController, so the
setting is a no-op for channel (single-message) paths; modify the branch to
either construct/shared a BackpressureController (Arc/Shared) and thread it into
the single-message path by passing it into agent_turn (and any downstream call
sites that expect admission/completion accounting), or if you cannot create a
controller here, return an Err or panic to fail fast when backpressure is
enabled; reference config.resilience.backpressure_enabled, the
BackpressureController type (or equivalent), and the callsite agent_turn(...) to
locate where to create/propagate or reject the request.
- Around line 3334-3349: The circuit breaker is being recreated inside the
per-message path (where provider is built) so its internal counters reset on
each call; move the wrapping out of process_message and create a single shared
provider instance once at startup (e.g., in your runtime/factory or Agent
initialization) that uses
providers::circuit_breaker::CircuitBreakerProvider::new with the
CircuitBreakerConfig (failure_threshold, recovery_timeout,
half_open_max_requests), expose it as a clonable/shared type (Arc<dyn Provider +
Send + Sync> or a factory that returns clones) and have process_message use that
shared provider instead; add a multi-message integration test that sends
multiple messages through the same pinned provider to verify the breaker
transitions to Open.

In `@src/providers/circuit_breaker.rs`:
- Around line 332-349: stream_chat_with_system currently delegates directly to
self.inner, bypassing the circuit-breaker logic used by chat/chat_with_system;
change stream_chat_with_system to first check the breaker state (the same
"circuit open" check used by chat/chat_with_system) and if open return a stream
that immediately yields a CircuitOpen error (or otherwise return an explicit
unsupported error) rather than calling self.inner.stream_chat_with_system, so
stream startup is gated by the breaker until you implement full streaming
outcome tracking.
- Around line 120-172: The call to f().await can be dropped and currently leaves
inner.half_open_in_flight stuck because decrement happens only after await; fix
by introducing an RAII permit guard (e.g., HalfOpenPermit) created when entering
CircuitState::HalfOpen in call(), incrementing inner.half_open_in_flight on
creation and decrementing in Drop, and pass a flag into
record_success/record_failure or explicitly disarm the guard there so the
successful/failure path doesn't double-decrement; update call() to create this
guard before releasing the lock and remove the manual decrement after await.
Also add a regression test that starts a half-open probe and drops the future
(simulate tokio::select! cancellation) to assert half_open_in_flight is
decremented and breaker can transition out of HalfOpen. Ensure references to
half_open_in_flight, call(), maybe_transition_to_half_open, record_success, and
record_failure are used when locating the changes.

---

Duplicate comments:
In `@src/agent/backpressure.rs`:
- Around line 94-136: The admit() path has a race: it reads current_depth then
later increments, allowing concurrent admits to exceed config.max_queue_depth
(and Priority::Critical currently increments unbounded); replace the non-atomic
read-then-add with a CAS loop using AtomicUsize::compare_exchange on
current_depth inside admit() that repeatedly reads cur, computes next = cur + 1,
checks next against hard cap (config.max_queue_depth == 0 => Err, next >
max_queue_depth => Err) and checks the priority-specific threshold using
load_fraction = cur / max_queue_depth (or use cur before increment) so you only
reserve a slot when the checks pass, then attempt compare_exchange(cur, next)
and on success return Ok() (or Err(LoadShed) when checks fail); treat
Priority::Critical as still allowed to bypass per-priority thresholds but not
the hard cap (enforce max_queue_depth in the same CAS loop); add a concurrent
regression test that sets max_queue_depth = 1, spawns many threads concurrently
calling admit(), and asserts current_depth never exceeds max_queue_depth and
that Excess admits return Err(LoadShed).

In `@src/providers/circuit_breaker.rs`:
- Around line 96-106: CircuitBreaker::new must validate CircuitBreakerConfig and
reject degenerate values instead of constructing a broken breaker; change the
signature of CircuitBreaker::new to return Result<Self,
CircuitBreakerConfigError> (or another crate error type), validate fields like
config.half_open_max_requests > 0 and config.recovery_timeout > Duration::ZERO
(and any other invariants you care about, e.g., max_consecutive_failures > 0),
and return an Err with a clear error when validation fails; on success construct
and return Ok(Self { config, inner: Mutex::new(Inner { state:
CircuitState::Closed, consecutive_failures: 0, half_open_successes: 0,
half_open_in_flight: 0, opened_at: None, }), }). Ensure callers of
CircuitBreaker::new are updated to handle the Result.

---

Nitpick comments:
In `@src/agent/backpressure.rs`:
- Around line 155-293: Add a concurrent regression test in the tests module that
stresses BackpressureController::admit and complete under contention: create a
BackpressureController via make_controller with a small max_queue_depth (e.g.,
2), wrap it in Arc and spawn many threads that repeatedly call
ctrl.admit(priority) and on success call ctrl.complete(), tracking successes and
failures; join threads and assert final depth is 0, total admitted never
exceeded max at any time (or that admitted count <= total completes), and that
non-critical requests are shed as expected—this will exercise the admit() race
and prevent regressions.

In `@src/config/schema.rs`:
- Around line 221-223: Update the public doc comment for the resilience field
and the ResilienceConfig struct to include an explicit migration/rollback note:
state that removing the [resilience] table entirely or leaving all resilience
feature flags set to false will restore legacy behavior, and document the
default values used for each flag for backward compatibility; apply the same
explicit rollback sentence to the other related doc comments referenced around
the ResilienceConfig usage so operators see defaults, compatibility impact, and
the rollback path.

In `@src/security/rate_limiter.rs`:
- Around line 77-78: There are two identical doc comment lines above the
constructor—remove the duplicate so only a single doc comment remains for the
rate limiter constructor (e.g., the doc above RateLimiter::new or the new()
function); keep one "Create a new rate limiter with the given configuration."
line and delete the redundant copy.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 04a70f26-07d7-4970-8c98-0d7d8b2cfd5e

📥 Commits

Reviewing files that changed from the base of the PR and between a46ca6b and 53e0fb9.

📒 Files selected for processing (5)
  • src/agent/backpressure.rs
  • src/agent/loop_.rs
  • src/config/schema.rs
  • src/providers/circuit_breaker.rs
  • src/security/rate_limiter.rs

Comment thread src/agent/loop_.rs
Comment on lines +3334 to +3349
// ── Circuit breaker (wrap provider when enabled) ────────────────────
let provider: Box<dyn Provider> = if config.resilience.circuit_breaker_enabled {
let cb_config = providers::circuit_breaker::CircuitBreakerConfig {
failure_threshold: config.resilience.circuit_breaker_failure_threshold,
recovery_timeout: std::time::Duration::from_secs(
config.resilience.circuit_breaker_recovery_timeout_secs,
),
half_open_max_requests: config.resilience.circuit_breaker_half_open_max_requests,
};
Box::new(providers::circuit_breaker::CircuitBreakerProvider::new(
provider, cb_config,
))
} else {
provider
};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

process_message recreates the circuit breaker for every inbound message.

This entrypoint handles one channel message at a time, so the wrapper's counters disappear as soon as the call returns. A provider outage that shows up as one failed provider call per Discord/Telegram message never accumulates enough consecutive failures to trip Open, which makes channel mode largely immune to the breaker. Hoist the wrapped provider into a shared runtime/factory object and pin it with a multi-message integration test.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/loop_.rs` around lines 3334 - 3349, The circuit breaker is being
recreated inside the per-message path (where provider is built) so its internal
counters reset on each call; move the wrapping out of process_message and create
a single shared provider instance once at startup (e.g., in your runtime/factory
or Agent initialization) that uses
providers::circuit_breaker::CircuitBreakerProvider::new with the
CircuitBreakerConfig (failure_threshold, recovery_timeout,
half_open_max_requests), expose it as a clonable/shared type (Arc<dyn Provider +
Send + Sync> or a factory that returns clones) and have process_message use that
shared provider instead; add a multi-message integration test that sends
multiple messages through the same pinned provider to verify the breaker
transitions to Open.

Comment thread src/agent/loop_.rs
Comment on lines +3350 to +3357
// ── Backpressure check (single-message path) ────────────────────────
if config.resilience.backpressure_enabled {
// For single-message processing, a static controller would be needed
// for cross-request coordination. Here we validate config is enabled
// but actual admission control happens in the long-running `run` loop
// and `Agent::turn`. Log for observability.
tracing::debug!("Backpressure enabled (single-message path — no queue tracking)");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Backpressure is still a no-op in the channel path.

When backpressure_enabled is true, this branch only logs and then immediately calls agent_turn(...) without any admission or completion accounting. Because nothing downstream receives a BackpressureController, the config is silently ineffective for channel traffic. Please thread a shared controller into this path, or fail fast here instead of ignoring the setting.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/loop_.rs` around lines 3350 - 3357, The branch that checks
config.resilience.backpressure_enabled only logs and then proceeds to call
agent_turn(...) without creating or passing any BackpressureController, so the
setting is a no-op for channel (single-message) paths; modify the branch to
either construct/shared a BackpressureController (Arc/Shared) and thread it into
the single-message path by passing it into agent_turn (and any downstream call
sites that expect admission/completion accounting), or if you cannot create a
controller here, return an Err or panic to fail fast when backpressure is
enabled; reference config.resilience.backpressure_enabled, the
BackpressureController type (or equivalent), and the callsite agent_turn(...) to
locate where to create/propagate or reject the request.

Comment on lines +120 to +172
pub async fn call<F, Fut, T>(&self, f: F) -> anyhow::Result<T>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = anyhow::Result<T>>,
{
// Acquire permission to call.
let is_half_open_probe = {
let mut inner = self.inner.lock();
self.maybe_transition_to_half_open(&mut inner);

match inner.state {
CircuitState::Closed => false,
CircuitState::Open => {
let elapsed = inner
.opened_at
.map(|t| t.elapsed())
.unwrap_or(Duration::ZERO);
let remaining = self.config.recovery_timeout.saturating_sub(elapsed);
return Err(CircuitOpen {
retry_after: remaining,
}
.into());
}
CircuitState::HalfOpen => {
if inner.half_open_in_flight + inner.half_open_successes
>= self.config.half_open_max_requests
{
return Err(CircuitOpen {
retry_after: Duration::from_secs(1),
}
.into());
}
inner.half_open_in_flight += 1;
true
}
}
};

// Execute the call outside the lock.
let result = f().await;

// Record outcome.
let mut inner = self.inner.lock();
if is_half_open_probe {
inner.half_open_in_flight = inner.half_open_in_flight.saturating_sub(1);
}

match &result {
Ok(_) => self.record_success(&mut inner, is_half_open_probe),
Err(_) => self.record_failure(&mut inner),
}

result
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Dropping a half-open call can wedge the breaker.

half_open_in_flight is decremented only after f().await returns. If the future is dropped while awaiting the provider call — which the agent loop does when cancellation wins its tokio::select! — that decrement never runs, and the breaker can stay stuck in HalfOpen, rejecting every later probe. Please move the half-open permit accounting behind a drop guard/RAII permit and pin it with a regression test that drops a half-open call before completion.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/providers/circuit_breaker.rs` around lines 120 - 172, The call to
f().await can be dropped and currently leaves inner.half_open_in_flight stuck
because decrement happens only after await; fix by introducing an RAII permit
guard (e.g., HalfOpenPermit) created when entering CircuitState::HalfOpen in
call(), incrementing inner.half_open_in_flight on creation and decrementing in
Drop, and pass a flag into record_success/record_failure or explicitly disarm
the guard there so the successful/failure path doesn't double-decrement; update
call() to create this guard before releasing the lock and remove the manual
decrement after await. Also add a regression test that starts a half-open probe
and drops the future (simulate tokio::select! cancellation) to assert
half_open_in_flight is decremented and breaker can transition out of HalfOpen.
Ensure references to half_open_in_flight, call(), maybe_transition_to_half_open,
record_success, and record_failure are used when locating the changes.

Comment on lines +332 to +349
fn stream_chat_with_system(
&self,
_system_prompt: Option<&str>,
_message: &str,
_model: &str,
_temperature: f64,
_options: StreamOptions,
) -> stream::BoxStream<'static, StreamResult<StreamChunk>> {
// Circuit breaker does not wrap streaming calls (they have their own
// error handling). Delegate directly.
self.inner.stream_chat_with_system(
_system_prompt,
_message,
_model,
_temperature,
_options,
)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't silently bypass the breaker for streaming calls.

This decorator fail-fast rejects chat/chat_with_system when the circuit is open, but stream_chat_with_system still goes straight to the downstream provider. Any streaming caller will keep hitting a failing backend during an outage, so circuit_breaker_enabled only works on part of the trait surface. Either gate stream startup through the breaker and surface CircuitOpen, or return an explicit unsupported error until stream outcomes are tracked.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/providers/circuit_breaker.rs` around lines 332 - 349,
stream_chat_with_system currently delegates directly to self.inner, bypassing
the circuit-breaker logic used by chat/chat_with_system; change
stream_chat_with_system to first check the breaker state (the same "circuit
open" check used by chat/chat_with_system) and if open return a stream that
immediately yields a CircuitOpen error (or otherwise return an explicit
unsupported error) rather than calling self.inner.stream_chat_with_system, so
stream startup is gated by the breaker until you implement full streaming
outcome tracking.

@rareba rareba force-pushed the feature/resilience branch from 1ed6f6c to c775a78 Compare March 12, 2026 08:22
rareba added 7 commits March 15, 2026 15:46
Add production resilience patterns for enterprise deployments:

- Token-bucket rate limiter (security/rate_limiter.rs): per-key limiting
  with configurable requests/window, burst, and eviction. Wired into
  the gateway webhook handler when [resilience].rate_limit_enabled=true.

- Circuit breaker (providers/circuit_breaker.rs): Closed/Open/HalfOpen
  state machine with configurable failure threshold, recovery timeout,
  and half-open probe limits. Generic async call wrapper.

- Backpressure controller (agent/backpressure.rs): queue depth tracking
  with adaptive load shedding across five priority levels (Critical,
  High, Normal, Low, Background).

- ResilienceConfig in config/schema.rs with #[serde(default)] for full
  backward compatibility. All features disabled by default.

No external crates added. Uses parking_lot::Mutex and std atomics.
…sure division

- Add config validation for circuit_breaker_half_open_max_requests >= 1
  (0 would prevent recovery from open state)
- Add config validation for rate limiter: requests_per_minute and burst
  cannot both be 0 when enabled (all requests would be rejected)
- Add config validation for backpressure_max_queue_depth >= 1 when enabled
- Guard BackpressureController::admit against max_queue_depth=0 to prevent
  potential division by zero, with explicit early rejection path
- Add test for zero max_queue_depth behavior
…meout

- Assert non-zero window in TokenBucketRateLimiter::new() to prevent
  silently failing open (infinite refill rate) on zero-length window
- Validate circuit_breaker_recovery_timeout_secs > 0 when circuit
  breaker is enabled (0 would skip the open state entirely)
- Add test for zero-window panic
…r into provider calls

Addresses reviewer feedback: standalone modules are now integrated into
existing code paths with active callers.
@rareba rareba force-pushed the feature/resilience branch from 3b2d65c to a431578 Compare March 15, 2026 14:50
@rareba rareba closed this Mar 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants