feat(resilience): add rate limiting, circuit breaker, and backpressure#3036
feat(resilience): add rate limiting, circuit breaker, and backpressure#3036rareba wants to merge 8 commits intozeroclaw-labs:masterfrom
Conversation
|
Note
|
| Cohort / File(s) | Summary |
|---|---|
Backpressure src/agent/backpressure.rs, src/agent/mod.rs, src/agent/loop_.rs |
New priority enum, BackpressureController, LoadShed error and BackpressureConfig; admit/complete/depth APIs with atomic queue-depth tracking. Integrated into agent run loop and single-message flow with admit/completion calls and tests. |
Rate Limiter src/security/rate_limiter.rs, src/security/mod.rs, src/gateway/mod.rs |
New TokenBucketRateLimiter with per-key buckets, eviction, refill logic, RateLimited error and RateLimiterConfig; wired into Gateway AppState as optional resilience_rate_limiter and checked in webhook handling. |
Circuit Breaker src/providers/circuit_breaker.rs, src/providers/mod.rs, src/agent/loop_.rs |
New async CircuitBreaker (Closed/Open/HalfOpen), CircuitOpen error, CircuitBreakerConfig, and CircuitBreakerProvider wrapper for Provider implementations. Integrated optionally in agent loop to wrap providers; includes state transitions and unit tests. |
Configuration src/config/schema.rs, src/config/mod.rs, src/onboard/wizard.rs |
Adds ResilienceConfig to schema and Config (serde defaults and Default impl), validation rules for resilience fields, and initializes resilience defaults in onboarding and quick-setup flows. |
Module Exports & Tests src/agent/mod.rs, src/providers/mod.rs, src/security/mod.rs, tests...* |
Public module exports added for backpressure, circuit_breaker, and rate_limiter. Extensive unit tests added for the new subsystems; test setup adjustments to accommodate new AppState field. |
Sequence Diagrams
sequenceDiagram
participant Client
participant Gateway
participant RateLimiter
participant WebhookHandler
Client->>Gateway: HTTP webhook request (api_key)
Gateway->>RateLimiter: check(api_key)
alt Token Available
RateLimiter-->>Gateway: Ok
Gateway->>WebhookHandler: Process request
WebhookHandler-->>Gateway: 200 OK
Gateway-->>Client: 200 OK
else Rate Limited
RateLimiter-->>Gateway: Err(RateLimited { retry_after })
Gateway-->>Client: 429 Too Many Requests (Retry-After)
end
sequenceDiagram
participant Request
participant BackpressureCtl
participant Queue
Request->>BackpressureCtl: admit(priority)
alt Critical
BackpressureCtl->>Queue: increment depth
BackpressureCtl-->>Request: Ok
else Non-Critical
BackpressureCtl->>BackpressureCtl: compute load = depth/max
alt load < threshold(priority)
BackpressureCtl->>Queue: increment depth
BackpressureCtl-->>Request: Ok
else
BackpressureCtl-->>Request: Err(LoadShed)
end
end
Request->>BackpressureCtl: complete()
BackpressureCtl->>Queue: decrement depth (saturating)
sequenceDiagram
participant Client
participant CircuitBreaker
participant Provider
Client->>CircuitBreaker: call(async operation)
CircuitBreaker->>CircuitBreaker: check state
alt Closed
CircuitBreaker->>Provider: execute operation
alt Success
CircuitBreaker->>CircuitBreaker: reset failures
CircuitBreaker-->>Client: return result
else Failure
CircuitBreaker->>CircuitBreaker: increment failures
alt failures >= threshold
CircuitBreaker->>CircuitBreaker: open circuit (set opened_at)
CircuitBreaker-->>Client: Err(CircuitOpen { retry_after })
else
CircuitBreaker-->>Client: Err(inner error)
end
end
else Open
CircuitBreaker->>CircuitBreaker: if timeout elapsed -> HalfOpen
CircuitBreaker-->>Client: Err(CircuitOpen { retry_after })
else HalfOpen
CircuitBreaker->>Provider: execute probe (limited by probes)
alt Probe success -> enough successes
CircuitBreaker->>CircuitBreaker: close circuit
CircuitBreaker-->>Client: return result
else Probe failure
CircuitBreaker->>CircuitBreaker: open circuit
CircuitBreaker-->>Client: Err(CircuitOpen)
end
end
Estimated code review effort
🎯 4 (Complex) | ⏱️ ~75 minutes
Possibly related PRs
- feat(agent): add primary allowed_tools/denied_tools filtering #2663 — modifies the agent run loop and single-message flow; likely to conflict with the backpressure admission/completion changes.
- feat(session): Add channel session persistence support #2093 — also changes
src/agent/loop_.rs(process_message/run loop), relevant to integration of resilience controls. - feat(gateway): add per-endpoint rate limiting and webhook idempotency #188 — adds gateway rate-limiting features; overlaps with gateway resilience_rate_limiter wiring.
Suggested labels
size: XL, risk: high, core, security, provider, agent, config: core, gateway, tests
Suggested reviewers
- JordanTheJet
- theonlyhennygod
🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
| Check name | Status | Explanation |
|---|---|---|
| Title check | ✅ Passed | Title clearly and concisely summarizes the main feature additions: rate limiting, circuit breaker, and backpressure components are directly reflected in the changeset. |
| Description check | ✅ Passed | Description covers the primary changes, design philosophy, test coverage, and risk assessment; most required template sections are addressed or implicitly complete (features are opt-in, configs backward-compatible). |
| Docstring Coverage | ✅ Passed | Docstring coverage is 87.50% which is sufficient. The required threshold is 80.00%. |
✏️ Tip: You can configure your own custom pre-merge checks in the settings.
✨ Finishing Touches
🧪 Generate unit tests (beta)
- Create PR with unit tests
- Post copyable unit tests in a comment
Comment @coderabbitai help to get the list of available commands and usage tips.
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (4)
src/gateway/mod.rs (2)
1631-1631: Consider adding an integration test for resilience rate limiter in webhook handler.All test updates correctly set
resilience_rate_limiter: None, but there's no test exercising the enabled path where the resilience rate limiter rejects a request inhandle_webhook. Based on learnings: "For security/runtime/gateway changes... add/update tests or validation evidence for failure modes and boundaries."A minimal test would verify that when
resilience_rate_limiteris enabled and the limit is exceeded, the webhook returns 429 with the correctretry_aftervalue.Would you like me to generate an integration test for the resilience rate limiter rejection path?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/gateway/mod.rs` at line 1631, Add an integration test that enables the resilience_rate_limiter and simulates an exceeded limit so handle_webhook returns a 429 and provides the correct retry_after value; specifically, construct the handler/config with resilience_rate_limiter set to an active limiter (not None), trigger a webhook request through handle_webhook (or the same HTTP entry point used by other tests), mock or configure the limiter to indicate "limit exceeded" for that request, then assert the response status is 429 and that the response contains the expected retry_after header/body value matching the limiter's backoff. Use the existing test harness and types around resilience_rate_limiter and handle_webhook to locate where to plug in the enabled limiter and the webhook request invocation.
650-663: Use existing constants instead of hardcoded values.Lines 656-657 use hardcoded
60and10_000when module-level constants already exist at lines 51 and 53:RATE_LIMIT_WINDOW_SECSandRATE_LIMIT_MAX_KEYS_DEFAULT.♻️ Proposed fix to use existing constants
resilience_rate_limiter: if config.resilience.rate_limit_enabled { Some(Arc::new( crate::security::rate_limiter::TokenBucketRateLimiter::new( crate::security::rate_limiter::RateLimiterConfig { requests_per_window: config.resilience.requests_per_minute, burst: config.resilience.burst, - window: Duration::from_secs(60), - max_keys: 10_000, + window: Duration::from_secs(RATE_LIMIT_WINDOW_SECS), + max_keys: RATE_LIMIT_MAX_KEYS_DEFAULT, }, ), )) } else { None },As per coding guidelines: "Use Rust standard casing consistently... constants/statics SCREAMING_SNAKE_CASE" - maintaining consistency with existing constants improves maintainability.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/gateway/mod.rs` around lines 650 - 663, The code instantiates TokenBucketRateLimiter::new with a RateLimiterConfig that uses hardcoded Duration::from_secs(60) and max_keys: 10_000; replace those magic literals with the module constants RATE_LIMIT_WINDOW_SECS and RATE_LIMIT_MAX_KEYS_DEFAULT (use Duration::from_secs(RATE_LIMIT_WINDOW_SECS) for the window) inside the RateLimiterConfig passed to TokenBucketRateLimiter::new to keep consistent SCREAMING_SNAKE_CASE constants usage.src/config/schema.rs (2)
5260-5260: Add focused contract tests for[resilience].These fixture updates keep older roundtrip tests compiling, but they still do not assert the new section's behavior. Please add a small test for “missing
[resilience]uses defaults” and one explicit roundtrip/assertion test for the resilience keys themselves.Based on learnings "Treat config schema as public API contract. Document defaults, compatibility impact, and migration/rollback path for all config key changes."
Also applies to: 5443-5443
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/config/schema.rs` at line 5260, Add two focused contract tests around ResilienceConfig: one test that loads/serializes a config without a [resilience] section and asserts that the parsed ResilienceConfig equals ResilienceConfig::default() (reference the ResilienceConfig type and the top-level config parse/deserialize function used in other tests), and a second explicit roundtrip test that constructs a Config with a non-default ResilienceConfig, serializes it to TOML, deserializes it back, and asserts equality on the resilience fields (use the same serialize/deserialize helpers and equality/assert macros used elsewhere). Ensure tests cover each public resilience key so the contract documents defaults and that future schema changes will be caught.
451-488: Avoid a second source of truth for resilience defaults.These helpers duplicate values already defined in
src/security/rate_limiter.rs,src/providers/circuit_breaker.rs, andsrc/agent/backpressure.rs. They match now, but the next default tweak can desync the schema contract from runtime behavior.As per coding guidelines "Duplicate small, local logic when it preserves clarity; extract shared utilities only after repeated, stable patterns (rule-of-three); when extracting, preserve module boundaries and avoid hidden coupling".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/config/schema.rs` around lines 451 - 488, The default helper functions (default_resilience_requests_per_minute, default_resilience_burst, default_resilience_failure_threshold, default_resilience_recovery_timeout_secs, default_resilience_half_open_max, default_resilience_max_queue_depth, default_resilience_graceful_shutdown_timeout_secs) duplicate defaults defined elsewhere and should be removed; update ResilienceConfig::default to reference the canonical defaults from their modules (e.g. use the rate limiter defaults from src/security/rate_limiter.rs for requests_per_minute and burst, the circuit breaker defaults from src/providers/circuit_breaker.rs for failure_threshold, recovery_timeout_secs and half_open_max_requests, and the backpressure defaults from src/agent/backpressure.rs for max_queue_depth and graceful_shutdown_timeout_secs), import or re-export those constant names or getter functions as needed, and delete the local default_* helper functions so there is a single source of truth.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/agent/backpressure.rs`:
- Around line 94-128: The admit function currently reads self.current_depth then
later increments, causing a race that can exceed config.max_queue_depth; replace
the non-atomic snapshot-and-add with a CAS loop on self.current_depth using
compare_exchange (Ordering::AcqRel / Acquire) to atomically reserve a slot only
when depth < max_queue_depth and below the per-priority threshold (apply the
same CAS logic for Priority::Critical if you want critical to still bypass
shedding but must still respect the hard cap reservation), returning LoadShed on
failed reservation or threshold breach; also add a concurrent regression test
that sets max_queue_depth = 1 and spawns many threads that call admit to assert
self.current_depth never surpasses max_queue_depth and that load-shed errors are
returned instead of allowing over-capacity increments.
In `@src/config/schema.rs`:
- Around line 409-449: Config::validate() must reject degenerate circuit-breaker
settings instead of accepting zeros; add explicit validation for
ResilienceConfig fields: ensure circuit_breaker_failure_threshold >= 1,
circuit_breaker_recovery_timeout_secs >= 1, and
circuit_breaker_half_open_max_requests >= 1 (and optionally enforce sensible
upper bounds if desired), and return/bail with a clear error message naming the
offending field when any check fails; update the validation logic that currently
inspects ResilienceConfig to perform these checks and use explicit error returns
(e.g., bail!/Err) rather than silently normalizing or leaving the config running
in an unsafe state.
In `@src/providers/circuit_breaker.rs`:
- Around line 96-106: The constructor CircuitBreaker::new should validate
config.half_open_max_requests and reject zero to avoid an unrecovable HalfOpen
state; update CircuitBreaker::new to check config.half_open_max_requests > 0
(either by returning a Result with an Err describing invalid config or by
asserting/panicking with a clear message) and add a unit test that constructs
CircuitBreakerConfig with half_open_max_requests = 0 and verifies the
constructor rejects it (pinning the boundary behavior). Ensure references:
CircuitBreaker::new and CircuitBreakerConfig.half_open_max_requests.
In `@src/security/rate_limiter.rs`:
- Around line 78-83: The constructor RateLimiter::new currently converts a zero
Duration window into an enormous refill_rate; instead validate the
RateLimiterConfig early: if config.window == Duration::ZERO and
(config.requests_per_window + config.burst) > 0 return an error (e.g., change
new to return Result<Self, RateLimiterError> or panic/assert with a clear
message) rather than computing f64::MAX; update the logic around refill_rate to
assume non-zero window after validation, and add a unit test that constructs a
config with window == Duration::ZERO and positive requests_per_window or burst
and asserts that new fails with the expected error.
---
Nitpick comments:
In `@src/config/schema.rs`:
- Line 5260: Add two focused contract tests around ResilienceConfig: one test
that loads/serializes a config without a [resilience] section and asserts that
the parsed ResilienceConfig equals ResilienceConfig::default() (reference the
ResilienceConfig type and the top-level config parse/deserialize function used
in other tests), and a second explicit roundtrip test that constructs a Config
with a non-default ResilienceConfig, serializes it to TOML, deserializes it
back, and asserts equality on the resilience fields (use the same
serialize/deserialize helpers and equality/assert macros used elsewhere). Ensure
tests cover each public resilience key so the contract documents defaults and
that future schema changes will be caught.
- Around line 451-488: The default helper functions
(default_resilience_requests_per_minute, default_resilience_burst,
default_resilience_failure_threshold, default_resilience_recovery_timeout_secs,
default_resilience_half_open_max, default_resilience_max_queue_depth,
default_resilience_graceful_shutdown_timeout_secs) duplicate defaults defined
elsewhere and should be removed; update ResilienceConfig::default to reference
the canonical defaults from their modules (e.g. use the rate limiter defaults
from src/security/rate_limiter.rs for requests_per_minute and burst, the circuit
breaker defaults from src/providers/circuit_breaker.rs for failure_threshold,
recovery_timeout_secs and half_open_max_requests, and the backpressure defaults
from src/agent/backpressure.rs for max_queue_depth and
graceful_shutdown_timeout_secs), import or re-export those constant names or
getter functions as needed, and delete the local default_* helper functions so
there is a single source of truth.
In `@src/gateway/mod.rs`:
- Line 1631: Add an integration test that enables the resilience_rate_limiter
and simulates an exceeded limit so handle_webhook returns a 429 and provides the
correct retry_after value; specifically, construct the handler/config with
resilience_rate_limiter set to an active limiter (not None), trigger a webhook
request through handle_webhook (or the same HTTP entry point used by other
tests), mock or configure the limiter to indicate "limit exceeded" for that
request, then assert the response status is 429 and that the response contains
the expected retry_after header/body value matching the limiter's backoff. Use
the existing test harness and types around resilience_rate_limiter and
handle_webhook to locate where to plug in the enabled limiter and the webhook
request invocation.
- Around line 650-663: The code instantiates TokenBucketRateLimiter::new with a
RateLimiterConfig that uses hardcoded Duration::from_secs(60) and max_keys:
10_000; replace those magic literals with the module constants
RATE_LIMIT_WINDOW_SECS and RATE_LIMIT_MAX_KEYS_DEFAULT (use
Duration::from_secs(RATE_LIMIT_WINDOW_SECS) for the window) inside the
RateLimiterConfig passed to TokenBucketRateLimiter::new to keep consistent
SCREAMING_SNAKE_CASE constants usage.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 19c3de7d-6771-4037-93d0-3c7446fef4bc
📒 Files selected for processing (10)
src/agent/backpressure.rssrc/agent/mod.rssrc/config/mod.rssrc/config/schema.rssrc/gateway/mod.rssrc/onboard/wizard.rssrc/providers/circuit_breaker.rssrc/providers/mod.rssrc/security/mod.rssrc/security/rate_limiter.rs
| pub fn admit(&self, priority: Priority) -> Result<(), LoadShed> { | ||
| let depth = self.current_depth.load(Ordering::Relaxed); | ||
|
|
||
| // Critical requests are never shed. | ||
| if priority == Priority::Critical { | ||
| self.current_depth.fetch_add(1, Ordering::Relaxed); | ||
| return Ok(()); | ||
| } | ||
|
|
||
| // Hard cap: reject everything non-critical at max. | ||
| if depth >= self.config.max_queue_depth { | ||
| return Err(LoadShed { | ||
| queue_depth: depth, | ||
| priority, | ||
| }); | ||
| } | ||
|
|
||
| let load_fraction = depth as f64 / self.config.max_queue_depth as f64; | ||
| let threshold = match priority { | ||
| Priority::Background => self.config.background_shed_threshold, | ||
| Priority::Low => self.config.low_shed_threshold, | ||
| Priority::Normal => self.config.normal_shed_threshold, | ||
| Priority::High => self.config.high_shed_threshold, | ||
| Priority::Critical => unreachable!(), | ||
| }; | ||
|
|
||
| if load_fraction >= threshold { | ||
| return Err(LoadShed { | ||
| queue_depth: depth, | ||
| priority, | ||
| }); | ||
| } | ||
|
|
||
| self.current_depth.fetch_add(1, Ordering::Relaxed); | ||
| Ok(()) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cd /tmp && find . -name "backpressure.rs" -type f 2>/dev/null | head -5Repository: zeroclaw-labs/zeroclaw
Length of output: 48
🏁 Script executed:
fd backpressure.rsRepository: zeroclaw-labs/zeroclaw
Length of output: 91
🏁 Script executed:
cat -n src/agent/backpressure.rs | head -150Repository: zeroclaw-labs/zeroclaw
Length of output: 6021
🏁 Script executed:
wc -l src/agent/backpressure.rsRepository: zeroclaw-labs/zeroclaw
Length of output: 95
🏁 Script executed:
git ls-files "**/*backpressure*"Repository: zeroclaw-labs/zeroclaw
Length of output: 91
🏁 Script executed:
cat -n src/agent/backpressure.rs | sed -n '147,242p'Repository: zeroclaw-labs/zeroclaw
Length of output: 3403
🏁 Script executed:
rg "BackpressureController" --type rust -B 2 -A 2Repository: zeroclaw-labs/zeroclaw
Length of output: 1556
🏁 Script executed:
rg "BackpressureController" --type rust -lRepository: zeroclaw-labs/zeroclaw
Length of output: 91
🏁 Script executed:
rg "admit\(" src/agent --type rust -B 3 -A 3Repository: zeroclaw-labs/zeroclaw
Length of output: 4138
🏁 Script executed:
rg "use.*backpressure" --type rustRepository: zeroclaw-labs/zeroclaw
Length of output: 48
🏁 Script executed:
rg "BackpressureController\|from.*backpressure" src --type rustRepository: zeroclaw-labs/zeroclaw
Length of output: 48
🏁 Script executed:
grep -r "backpressure" src --include="*.rs" | grep -v "^src/agent/backpressure.rs"Repository: zeroclaw-labs/zeroclaw
Length of output: 701
🏁 Script executed:
rg "BackpressureController::new|BackpressureController\s*\{" --type rustRepository: zeroclaw-labs/zeroclaw
Length of output: 353
🏁 Script executed:
rg "admit\(" src/agent --type rust | grep -v test | grep -v "^[^:]*:[[:space:]]*//"Repository: zeroclaw-labs/zeroclaw
Length of output: 1517
🏁 Script executed:
ast-grep --pattern 'let $depth = self.current_depth.load($_);
$$$
self.current_depth.fetch_add($_, $_);'Repository: zeroclaw-labs/zeroclaw
Length of output: 48
🏁 Script executed:
cat src/agent/backpressure.rs | sed -n '94,129p' | cat -nRepository: zeroclaw-labs/zeroclaw
Length of output: 1568
Implement atomic admission to prevent exceeding max_queue_depth under contention.
The admit path has a classic load-check-increment race: Line 95 snapshots current_depth, checks thresholds, then Line 127 increments later. Under concurrent load, multiple threads can observe the same depth, all pass the checks, and all increment, pushing the queue past max_queue_depth. The Critical-priority path (lines 98–101) is equally vulnerable—it increments without atomic reservation.
Replace with a CAS loop using compare_exchange(depth, depth + 1, Ordering::AcqRel, Ordering::Acquire) to atomically reserve depth before applying threshold checks. Add a concurrent regression test with max_queue_depth = 1 spawning multiple threads to verify the hard cap is never exceeded.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/agent/backpressure.rs` around lines 94 - 128, The admit function
currently reads self.current_depth then later increments, causing a race that
can exceed config.max_queue_depth; replace the non-atomic snapshot-and-add with
a CAS loop on self.current_depth using compare_exchange (Ordering::AcqRel /
Acquire) to atomically reserve a slot only when depth < max_queue_depth and
below the per-priority threshold (apply the same CAS logic for
Priority::Critical if you want critical to still bypass shedding but must still
respect the hard cap reservation), returning LoadShed on failed reservation or
threshold breach; also add a concurrent regression test that sets
max_queue_depth = 1 and spawns many threads that call admit to assert
self.current_depth never surpasses max_queue_depth and that load-shed errors are
returned instead of allowing over-capacity increments.
| pub fn new(config: CircuitBreakerConfig) -> Self { | ||
| Self { | ||
| config, | ||
| inner: Mutex::new(Inner { | ||
| state: CircuitState::Closed, | ||
| consecutive_failures: 0, | ||
| half_open_successes: 0, | ||
| half_open_in_flight: 0, | ||
| opened_at: None, | ||
| }), | ||
| } |
There was a problem hiding this comment.
Validate half_open_max_requests before constructing the breaker.
If this is 0, the breaker trips once, transitions to HalfOpen, and then Line 144 rejects every call forever because the quota check is immediately true. That leaves the provider unable to recover after the first open. Please reject this value up front and pin it with a boundary test.
🧯 Suggested guard
pub fn new(config: CircuitBreakerConfig) -> Self {
+ assert!(
+ config.half_open_max_requests > 0,
+ "half_open_max_requests must be greater than zero"
+ );
+
Self {
config,
inner: Mutex::new(Inner {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/providers/circuit_breaker.rs` around lines 96 - 106, The constructor
CircuitBreaker::new should validate config.half_open_max_requests and reject
zero to avoid an unrecovable HalfOpen state; update CircuitBreaker::new to check
config.half_open_max_requests > 0 (either by returning a Result with an Err
describing invalid config or by asserting/panicking with a clear message) and
add a unit test that constructs CircuitBreakerConfig with half_open_max_requests
= 0 and verifies the constructor rejects it (pinning the boundary behavior).
Ensure references: CircuitBreaker::new and
CircuitBreakerConfig.half_open_max_requests.
|
Thank you for the contribution! We appreciate the effort. We are closing this PR because it introduces a standalone module with no integration into the existing codebase — the new types/functions are not called by any existing code path. ZeroClaw follows a trait-driven architecture where new features must be wired through factory registration and have at least one active caller. To reopen, please:
We welcome smaller, integrated contributions. See CLAUDE.md §7 for playbooks on adding providers, channels, tools, and peripherals. |
|
Review: This PR introduces a standalone module that is not wired into any existing code path. The new types/functions have no callers in the codebase. To make this mergeable:
See CLAUDE.md §7 for playbooks. Recommend closing and resubmitting as smaller, integrated PRs. |
|
Thanks for the review feedback. I've addressed the integration concerns: Changes pushed:
All tests pass. |
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (2)
src/agent/backpressure.rs (1)
94-136:⚠️ Potential issue | 🔴 CriticalRace condition allows exceeding
max_queue_depthunder concurrent load.The admit path loads
current_depthat line 95, performs threshold checks, then increments at line 135. Under concurrent requests, multiple threads can observe the same depth, all pass checks, and all increment — pushing the queue pastmax_queue_depth.The
Priority::Criticalpath (lines 98–101) is also unbounded — it increments without any capacity reservation.Use a CAS loop with
compare_exchangeto atomically reserve a slot before applying threshold logic, and add a concurrent regression test withmax_queue_depth = 1spawning multiple threads to verify the hard cap is never exceeded.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/backpressure.rs` around lines 94 - 136, The admit() path has a race: it reads current_depth then later increments, allowing concurrent admits to exceed config.max_queue_depth (and Priority::Critical currently increments unbounded); replace the non-atomic read-then-add with a CAS loop using AtomicUsize::compare_exchange on current_depth inside admit() that repeatedly reads cur, computes next = cur + 1, checks next against hard cap (config.max_queue_depth == 0 => Err, next > max_queue_depth => Err) and checks the priority-specific threshold using load_fraction = cur / max_queue_depth (or use cur before increment) so you only reserve a slot when the checks pass, then attempt compare_exchange(cur, next) and on success return Ok() (or Err(LoadShed) when checks fail); treat Priority::Critical as still allowed to bypass per-priority thresholds but not the hard cap (enforce max_queue_depth in the same CAS loop); add a concurrent regression test that sets max_queue_depth = 1, spawns many threads concurrently calling admit(), and asserts current_depth never exceeds max_queue_depth and that Excess admits return Err(LoadShed).src/providers/circuit_breaker.rs (1)
96-106:⚠️ Potential issue | 🟠 MajorStill need to reject invalid breaker configs in
new().
CircuitBreaker::newis public, so schema-level validation elsewhere does not protect direct callers here. In particular,half_open_max_requests == 0leaves the breaker permanently rejecting probes at Line 144 after the first open, andrecovery_timeout == Duration::ZEROis another degenerate state worth rejecting locally.🧯 Minimal guard
pub fn new(config: CircuitBreakerConfig) -> Self { + assert!( + config.half_open_max_requests > 0, + "half_open_max_requests must be greater than zero" + ); + assert!( + config.recovery_timeout > Duration::ZERO, + "recovery_timeout must be greater than zero" + ); + Self { config, inner: Mutex::new(Inner {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/providers/circuit_breaker.rs` around lines 96 - 106, CircuitBreaker::new must validate CircuitBreakerConfig and reject degenerate values instead of constructing a broken breaker; change the signature of CircuitBreaker::new to return Result<Self, CircuitBreakerConfigError> (or another crate error type), validate fields like config.half_open_max_requests > 0 and config.recovery_timeout > Duration::ZERO (and any other invariants you care about, e.g., max_consecutive_failures > 0), and return an Err with a clear error when validation fails; on success construct and return Ok(Self { config, inner: Mutex::new(Inner { state: CircuitState::Closed, consecutive_failures: 0, half_open_successes: 0, half_open_in_flight: 0, opened_at: None, }), }). Ensure callers of CircuitBreaker::new are updated to handle the Result.
🧹 Nitpick comments (3)
src/agent/backpressure.rs (1)
155-293: Good functional test coverage, but missing concurrent regression test.The tests validate admission semantics, priority-based shedding, and the admit/complete cycle. However, given the race condition in
admit(), adding a concurrent test with multiple threads contending for a lowmax_queue_depthwould help validate the fix and prevent regression.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/backpressure.rs` around lines 155 - 293, Add a concurrent regression test in the tests module that stresses BackpressureController::admit and complete under contention: create a BackpressureController via make_controller with a small max_queue_depth (e.g., 2), wrap it in Arc and spawn many threads that repeatedly call ctrl.admit(priority) and on success call ctrl.complete(), tracking successes and failures; join threads and assert final depth is 0, total admitted never exceeded max at any time (or that admitted count <= total completes), and that non-critical requests are shed as expected—this will exercise the admit() race and prevent regressions.src/security/rate_limiter.rs (1)
77-78: Remove duplicate doc comment.Line 77 and 78 contain the same doc comment text.
✂️ Suggested fix
impl TokenBucketRateLimiter { /// Create a new rate limiter with the given configuration. - /// Create a new rate limiter with the given configuration. /// /// # Panics🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/security/rate_limiter.rs` around lines 77 - 78, There are two identical doc comment lines above the constructor—remove the duplicate so only a single doc comment remains for the rate limiter constructor (e.g., the doc above RateLimiter::new or the new() function); keep one "Create a new rate limiter with the given configuration." line and delete the redundant copy.src/config/schema.rs (1)
221-223: Document the rollback path for[resilience]explicitly.This new public config contract covers defaults and backward compatibility, but it still omits the migration/rollback story. A short note that removing
[resilience]or leaving all flagsfalserestores legacy behavior would make the contract complete for operators.As per coding guidelines "For config/schema changes, treat keys as public contract: document defaults, compatibility impact, and migration/rollback path".
Also applies to: 401-408
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/config/schema.rs` around lines 221 - 223, Update the public doc comment for the resilience field and the ResilienceConfig struct to include an explicit migration/rollback note: state that removing the [resilience] table entirely or leaving all resilience feature flags set to false will restore legacy behavior, and document the default values used for each flag for backward compatibility; apply the same explicit rollback sentence to the other related doc comments referenced around the ResilienceConfig usage so operators see defaults, compatibility impact, and the rollback path.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/agent/loop_.rs`:
- Around line 3350-3357: The branch that checks
config.resilience.backpressure_enabled only logs and then proceeds to call
agent_turn(...) without creating or passing any BackpressureController, so the
setting is a no-op for channel (single-message) paths; modify the branch to
either construct/shared a BackpressureController (Arc/Shared) and thread it into
the single-message path by passing it into agent_turn (and any downstream call
sites that expect admission/completion accounting), or if you cannot create a
controller here, return an Err or panic to fail fast when backpressure is
enabled; reference config.resilience.backpressure_enabled, the
BackpressureController type (or equivalent), and the callsite agent_turn(...) to
locate where to create/propagate or reject the request.
- Around line 3334-3349: The circuit breaker is being recreated inside the
per-message path (where provider is built) so its internal counters reset on
each call; move the wrapping out of process_message and create a single shared
provider instance once at startup (e.g., in your runtime/factory or Agent
initialization) that uses
providers::circuit_breaker::CircuitBreakerProvider::new with the
CircuitBreakerConfig (failure_threshold, recovery_timeout,
half_open_max_requests), expose it as a clonable/shared type (Arc<dyn Provider +
Send + Sync> or a factory that returns clones) and have process_message use that
shared provider instead; add a multi-message integration test that sends
multiple messages through the same pinned provider to verify the breaker
transitions to Open.
In `@src/providers/circuit_breaker.rs`:
- Around line 332-349: stream_chat_with_system currently delegates directly to
self.inner, bypassing the circuit-breaker logic used by chat/chat_with_system;
change stream_chat_with_system to first check the breaker state (the same
"circuit open" check used by chat/chat_with_system) and if open return a stream
that immediately yields a CircuitOpen error (or otherwise return an explicit
unsupported error) rather than calling self.inner.stream_chat_with_system, so
stream startup is gated by the breaker until you implement full streaming
outcome tracking.
- Around line 120-172: The call to f().await can be dropped and currently leaves
inner.half_open_in_flight stuck because decrement happens only after await; fix
by introducing an RAII permit guard (e.g., HalfOpenPermit) created when entering
CircuitState::HalfOpen in call(), incrementing inner.half_open_in_flight on
creation and decrementing in Drop, and pass a flag into
record_success/record_failure or explicitly disarm the guard there so the
successful/failure path doesn't double-decrement; update call() to create this
guard before releasing the lock and remove the manual decrement after await.
Also add a regression test that starts a half-open probe and drops the future
(simulate tokio::select! cancellation) to assert half_open_in_flight is
decremented and breaker can transition out of HalfOpen. Ensure references to
half_open_in_flight, call(), maybe_transition_to_half_open, record_success, and
record_failure are used when locating the changes.
---
Duplicate comments:
In `@src/agent/backpressure.rs`:
- Around line 94-136: The admit() path has a race: it reads current_depth then
later increments, allowing concurrent admits to exceed config.max_queue_depth
(and Priority::Critical currently increments unbounded); replace the non-atomic
read-then-add with a CAS loop using AtomicUsize::compare_exchange on
current_depth inside admit() that repeatedly reads cur, computes next = cur + 1,
checks next against hard cap (config.max_queue_depth == 0 => Err, next >
max_queue_depth => Err) and checks the priority-specific threshold using
load_fraction = cur / max_queue_depth (or use cur before increment) so you only
reserve a slot when the checks pass, then attempt compare_exchange(cur, next)
and on success return Ok() (or Err(LoadShed) when checks fail); treat
Priority::Critical as still allowed to bypass per-priority thresholds but not
the hard cap (enforce max_queue_depth in the same CAS loop); add a concurrent
regression test that sets max_queue_depth = 1, spawns many threads concurrently
calling admit(), and asserts current_depth never exceeds max_queue_depth and
that Excess admits return Err(LoadShed).
In `@src/providers/circuit_breaker.rs`:
- Around line 96-106: CircuitBreaker::new must validate CircuitBreakerConfig and
reject degenerate values instead of constructing a broken breaker; change the
signature of CircuitBreaker::new to return Result<Self,
CircuitBreakerConfigError> (or another crate error type), validate fields like
config.half_open_max_requests > 0 and config.recovery_timeout > Duration::ZERO
(and any other invariants you care about, e.g., max_consecutive_failures > 0),
and return an Err with a clear error when validation fails; on success construct
and return Ok(Self { config, inner: Mutex::new(Inner { state:
CircuitState::Closed, consecutive_failures: 0, half_open_successes: 0,
half_open_in_flight: 0, opened_at: None, }), }). Ensure callers of
CircuitBreaker::new are updated to handle the Result.
---
Nitpick comments:
In `@src/agent/backpressure.rs`:
- Around line 155-293: Add a concurrent regression test in the tests module that
stresses BackpressureController::admit and complete under contention: create a
BackpressureController via make_controller with a small max_queue_depth (e.g.,
2), wrap it in Arc and spawn many threads that repeatedly call
ctrl.admit(priority) and on success call ctrl.complete(), tracking successes and
failures; join threads and assert final depth is 0, total admitted never
exceeded max at any time (or that admitted count <= total completes), and that
non-critical requests are shed as expected—this will exercise the admit() race
and prevent regressions.
In `@src/config/schema.rs`:
- Around line 221-223: Update the public doc comment for the resilience field
and the ResilienceConfig struct to include an explicit migration/rollback note:
state that removing the [resilience] table entirely or leaving all resilience
feature flags set to false will restore legacy behavior, and document the
default values used for each flag for backward compatibility; apply the same
explicit rollback sentence to the other related doc comments referenced around
the ResilienceConfig usage so operators see defaults, compatibility impact, and
the rollback path.
In `@src/security/rate_limiter.rs`:
- Around line 77-78: There are two identical doc comment lines above the
constructor—remove the duplicate so only a single doc comment remains for the
rate limiter constructor (e.g., the doc above RateLimiter::new or the new()
function); keep one "Create a new rate limiter with the given configuration."
line and delete the redundant copy.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 04a70f26-07d7-4970-8c98-0d7d8b2cfd5e
📒 Files selected for processing (5)
src/agent/backpressure.rssrc/agent/loop_.rssrc/config/schema.rssrc/providers/circuit_breaker.rssrc/security/rate_limiter.rs
| // ── Circuit breaker (wrap provider when enabled) ──────────────────── | ||
| let provider: Box<dyn Provider> = if config.resilience.circuit_breaker_enabled { | ||
| let cb_config = providers::circuit_breaker::CircuitBreakerConfig { | ||
| failure_threshold: config.resilience.circuit_breaker_failure_threshold, | ||
| recovery_timeout: std::time::Duration::from_secs( | ||
| config.resilience.circuit_breaker_recovery_timeout_secs, | ||
| ), | ||
| half_open_max_requests: config.resilience.circuit_breaker_half_open_max_requests, | ||
| }; | ||
| Box::new(providers::circuit_breaker::CircuitBreakerProvider::new( | ||
| provider, cb_config, | ||
| )) | ||
| } else { | ||
| provider | ||
| }; | ||
|
|
There was a problem hiding this comment.
process_message recreates the circuit breaker for every inbound message.
This entrypoint handles one channel message at a time, so the wrapper's counters disappear as soon as the call returns. A provider outage that shows up as one failed provider call per Discord/Telegram message never accumulates enough consecutive failures to trip Open, which makes channel mode largely immune to the breaker. Hoist the wrapped provider into a shared runtime/factory object and pin it with a multi-message integration test.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/agent/loop_.rs` around lines 3334 - 3349, The circuit breaker is being
recreated inside the per-message path (where provider is built) so its internal
counters reset on each call; move the wrapping out of process_message and create
a single shared provider instance once at startup (e.g., in your runtime/factory
or Agent initialization) that uses
providers::circuit_breaker::CircuitBreakerProvider::new with the
CircuitBreakerConfig (failure_threshold, recovery_timeout,
half_open_max_requests), expose it as a clonable/shared type (Arc<dyn Provider +
Send + Sync> or a factory that returns clones) and have process_message use that
shared provider instead; add a multi-message integration test that sends
multiple messages through the same pinned provider to verify the breaker
transitions to Open.
| // ── Backpressure check (single-message path) ──────────────────────── | ||
| if config.resilience.backpressure_enabled { | ||
| // For single-message processing, a static controller would be needed | ||
| // for cross-request coordination. Here we validate config is enabled | ||
| // but actual admission control happens in the long-running `run` loop | ||
| // and `Agent::turn`. Log for observability. | ||
| tracing::debug!("Backpressure enabled (single-message path — no queue tracking)"); | ||
| } |
There was a problem hiding this comment.
Backpressure is still a no-op in the channel path.
When backpressure_enabled is true, this branch only logs and then immediately calls agent_turn(...) without any admission or completion accounting. Because nothing downstream receives a BackpressureController, the config is silently ineffective for channel traffic. Please thread a shared controller into this path, or fail fast here instead of ignoring the setting.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/agent/loop_.rs` around lines 3350 - 3357, The branch that checks
config.resilience.backpressure_enabled only logs and then proceeds to call
agent_turn(...) without creating or passing any BackpressureController, so the
setting is a no-op for channel (single-message) paths; modify the branch to
either construct/shared a BackpressureController (Arc/Shared) and thread it into
the single-message path by passing it into agent_turn (and any downstream call
sites that expect admission/completion accounting), or if you cannot create a
controller here, return an Err or panic to fail fast when backpressure is
enabled; reference config.resilience.backpressure_enabled, the
BackpressureController type (or equivalent), and the callsite agent_turn(...) to
locate where to create/propagate or reject the request.
| pub async fn call<F, Fut, T>(&self, f: F) -> anyhow::Result<T> | ||
| where | ||
| F: FnOnce() -> Fut, | ||
| Fut: std::future::Future<Output = anyhow::Result<T>>, | ||
| { | ||
| // Acquire permission to call. | ||
| let is_half_open_probe = { | ||
| let mut inner = self.inner.lock(); | ||
| self.maybe_transition_to_half_open(&mut inner); | ||
|
|
||
| match inner.state { | ||
| CircuitState::Closed => false, | ||
| CircuitState::Open => { | ||
| let elapsed = inner | ||
| .opened_at | ||
| .map(|t| t.elapsed()) | ||
| .unwrap_or(Duration::ZERO); | ||
| let remaining = self.config.recovery_timeout.saturating_sub(elapsed); | ||
| return Err(CircuitOpen { | ||
| retry_after: remaining, | ||
| } | ||
| .into()); | ||
| } | ||
| CircuitState::HalfOpen => { | ||
| if inner.half_open_in_flight + inner.half_open_successes | ||
| >= self.config.half_open_max_requests | ||
| { | ||
| return Err(CircuitOpen { | ||
| retry_after: Duration::from_secs(1), | ||
| } | ||
| .into()); | ||
| } | ||
| inner.half_open_in_flight += 1; | ||
| true | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| // Execute the call outside the lock. | ||
| let result = f().await; | ||
|
|
||
| // Record outcome. | ||
| let mut inner = self.inner.lock(); | ||
| if is_half_open_probe { | ||
| inner.half_open_in_flight = inner.half_open_in_flight.saturating_sub(1); | ||
| } | ||
|
|
||
| match &result { | ||
| Ok(_) => self.record_success(&mut inner, is_half_open_probe), | ||
| Err(_) => self.record_failure(&mut inner), | ||
| } | ||
|
|
||
| result |
There was a problem hiding this comment.
Dropping a half-open call can wedge the breaker.
half_open_in_flight is decremented only after f().await returns. If the future is dropped while awaiting the provider call — which the agent loop does when cancellation wins its tokio::select! — that decrement never runs, and the breaker can stay stuck in HalfOpen, rejecting every later probe. Please move the half-open permit accounting behind a drop guard/RAII permit and pin it with a regression test that drops a half-open call before completion.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/providers/circuit_breaker.rs` around lines 120 - 172, The call to
f().await can be dropped and currently leaves inner.half_open_in_flight stuck
because decrement happens only after await; fix by introducing an RAII permit
guard (e.g., HalfOpenPermit) created when entering CircuitState::HalfOpen in
call(), incrementing inner.half_open_in_flight on creation and decrementing in
Drop, and pass a flag into record_success/record_failure or explicitly disarm
the guard there so the successful/failure path doesn't double-decrement; update
call() to create this guard before releasing the lock and remove the manual
decrement after await. Also add a regression test that starts a half-open probe
and drops the future (simulate tokio::select! cancellation) to assert
half_open_in_flight is decremented and breaker can transition out of HalfOpen.
Ensure references to half_open_in_flight, call(), maybe_transition_to_half_open,
record_success, and record_failure are used when locating the changes.
| fn stream_chat_with_system( | ||
| &self, | ||
| _system_prompt: Option<&str>, | ||
| _message: &str, | ||
| _model: &str, | ||
| _temperature: f64, | ||
| _options: StreamOptions, | ||
| ) -> stream::BoxStream<'static, StreamResult<StreamChunk>> { | ||
| // Circuit breaker does not wrap streaming calls (they have their own | ||
| // error handling). Delegate directly. | ||
| self.inner.stream_chat_with_system( | ||
| _system_prompt, | ||
| _message, | ||
| _model, | ||
| _temperature, | ||
| _options, | ||
| ) | ||
| } |
There was a problem hiding this comment.
Don't silently bypass the breaker for streaming calls.
This decorator fail-fast rejects chat/chat_with_system when the circuit is open, but stream_chat_with_system still goes straight to the downstream provider. Any streaming caller will keep hitting a failing backend during an outage, so circuit_breaker_enabled only works on part of the trait surface. Either gate stream startup through the breaker and surface CircuitOpen, or return an explicit unsupported error until stream outcomes are tracked.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/providers/circuit_breaker.rs` around lines 332 - 349,
stream_chat_with_system currently delegates directly to self.inner, bypassing
the circuit-breaker logic used by chat/chat_with_system; change
stream_chat_with_system to first check the breaker state (the same "circuit
open" check used by chat/chat_with_system) and if open return a stream that
immediately yields a CircuitOpen error (or otherwise return an explicit
unsupported error) rather than calling self.inner.stream_chat_with_system, so
stream startup is gated by the breaker until you implement full streaming
outcome tracking.
1ed6f6c to
c775a78
Compare
Add production resilience patterns for enterprise deployments: - Token-bucket rate limiter (security/rate_limiter.rs): per-key limiting with configurable requests/window, burst, and eviction. Wired into the gateway webhook handler when [resilience].rate_limit_enabled=true. - Circuit breaker (providers/circuit_breaker.rs): Closed/Open/HalfOpen state machine with configurable failure threshold, recovery timeout, and half-open probe limits. Generic async call wrapper. - Backpressure controller (agent/backpressure.rs): queue depth tracking with adaptive load shedding across five priority levels (Critical, High, Normal, Low, Background). - ResilienceConfig in config/schema.rs with #[serde(default)] for full backward compatibility. All features disabled by default. No external crates added. Uses parking_lot::Mutex and std atomics.
…sure division - Add config validation for circuit_breaker_half_open_max_requests >= 1 (0 would prevent recovery from open state) - Add config validation for rate limiter: requests_per_minute and burst cannot both be 0 when enabled (all requests would be rejected) - Add config validation for backpressure_max_queue_depth >= 1 when enabled - Guard BackpressureController::admit against max_queue_depth=0 to prevent potential division by zero, with explicit early rejection path - Add test for zero max_queue_depth behavior
…meout - Assert non-zero window in TokenBucketRateLimiter::new() to prevent silently failing open (infinite refill rate) on zero-length window - Validate circuit_breaker_recovery_timeout_secs > 0 when circuit breaker is enabled (0 would skip the open state entirely) - Add test for zero-window panic
…r into provider calls Addresses reviewer feedback: standalone modules are now integrated into existing code paths with active callers.
3b2d65c to
a431578
Compare
Summary
masterfor all contributions):mastersrc/security/rate_limiter.rs) with per-key tracking, configurable burst/window, and bounded key cardinalitysrc/providers/circuit_breaker.rs) with Closed/Open/HalfOpen state machine for wrapping fallible async provider callssrc/agent/backpressure.rs) with priority-based adaptive load shedding (Critical, High, Normal, Low, Background)ResilienceConfigtosrc/config/schema.rswith#[serde(default)]on all fields for full backward compatibility[resilience] rate_limit_enabled = true)parking_lot::Mutexandstd::sync::atomic. All resilience features are disabled by default (safe for existing deployments).Label Snapshot (required)
risk: low|medium|high): highsize: XS|S|M|L|XL, auto-managed/read-only): Lsecurity: rate_limiter,provider: circuit_breaker,agent: backpressureChange Metadata
bug|feature|refactor|docs|security|chore): featureruntime|provider|channel|memory|security|ci|docs|multi): multi (security + provider + agent + gateway)Linked Issue
Supersede Attribution (required when
Supersedes #is used)N/A
Validation Evidence (required)
Commands and result summary:
Security Impact (required)
Yes, describe risk and mitigation: Rate limiting is opt-in (rate_limit_enabled = falseby default). Configurablerequests_per_minuteandburstparameters. Providesretry_afterheader for graceful client backoff.Privacy and Data Hygiene (required)
pass|needs-follow-up): passCompatibility / Migration
[resilience]config section with#[serde(default)]i18n Follow-Through (required when docs or user-facing wording changes)
Human Verification (required)
What was personally validated beyond CI:
Side Effects / Blast Radius (required)
retry_afterfor client-side backoff; circuit breaker state transitions are deterministic and testable.Agent Collaboration Notes (recommended)
AGENTS.md+CONTRIBUTING.md): YesRollback Plan (required)
git revert <commit-sha>[resilience] rate_limit_enabled,circuit_breakerthreshold,backpressuremax_queue_depth — all disabled/inactive by defaultRisks and Mitigations
recovery_timeoutis configurable per deployment; half-open state allows gradual recovery.