feat(beta): add CircuitBreakerMiddleware for LLM failure isolation#2532
feat(beta): add CircuitBreakerMiddleware for LLM failure isolation#2532amabito wants to merge 12 commits intoag2ai:mainfrom
Conversation
|
|
||
| def __init__(self, config: CircuitBreakerConfig | None = None) -> None: | ||
| self._config = config or CircuitBreakerConfig() | ||
| self._circuit_breaker = _CircuitBreaker(self._config) |
There was a problem hiding this comment.
We need to move _CircuitBreaker creation to __call__ instead of init. It allows us to has a unique lock per conversation. Now your implementation has a shared lock (counter, state, etc) for all agents using middleware and all concurrent executions of same agent.
|
|
||
| if state == CircuitState.OPEN: | ||
| logger.debug("CircuitBreaker is OPEN -- blocking LLM call") | ||
| return _blocked_response() |
There was a problem hiding this comment.
I thought, CircuitBreakerMiddleware should automatically retries connection and gently wait for state breaker open. But you returns an empty message to user - it is not the behavior I expect of this feature
Also, it looks to me, that OPEN and CLOSED state are processing incorrectly here - common implementation is
- OPEN - allow execution
- CLOSED - wait for release
- HALF_OPEN - make an attempt and fallback to CLOSED back at any error
|
Hey @Lancetnik, appreciate you digging in. A few of these I want to push back on and a few I'll just fix. State namesThe CLOSED / OPEN / HALF_OPEN labels in this PR follow the canonical definition of the pattern, and flipping them would break consistency with every other CB library AG2 users have touched. The electrical metaphor is the anchor: a closed circuit conducts (normal operation), an open circuit is broken (calls are rejected). Sources:
The implementation matches this in if self._failure_count < self._config.failure_threshold:
return CircuitState.CLOSED # normal pass-through
if self._opened_at is None:
return CircuitState.CLOSED
elapsed = time.monotonic() - self._opened_at
if elapsed < self._config.recovery_timeout_s:
return CircuitState.OPEN # blocked, fail-fast
return CircuitState.HALF_OPEN # single probe allowedIf you have a source that uses the inverted definition I'd genuinely like to see it, because then we're talking about something other than a Fowler-style circuit breaker and I'd want to rename the class rather than redefine standard terminology. "Auto-retry and gently wait"That's a different pattern -- Retry (with backoff), not Circuit Breaker. resilience4j treats them as separate decorators for a reason: one answers "should I attempt this call at all?", the other answers "should I re-attempt a failed call?". Merging them conflates two concerns and makes the CB impossible to compose with an external retry layer. The legitimate UX gap in what I shipped is that The parts I agree with
I'll push the mechanical fixes ( |
Address review feedback from @Lancetnik on ag2ai#2532: - Flatten constructor API: pass failure_threshold/recovery_timeout_s/ fallback directly instead of requiring callers to build a CircuitBreakerConfig - Drop __post_init__ validation from CircuitBreakerConfig (mypy covers the type guarantees and the runtime checks were redundant) - Add optional fallback callable so callers can customize the blocked- state ModelResponse (addresses the "empty message" UX concern while keeping the CB and Retry patterns cleanly separated) The state machine semantics (CLOSED=pass, OPEN=block, HALF_OPEN=probe) are unchanged -- they follow the canonical Fowler/Hystrix/resilience4j definition and are left as-is pending scope-of-sharing clarification.
Fair, we should have an agent-level locks and current design covers it m = CircuitBreakerMiddleware()
agent = Agent(..., middleware=(m,))
# concurrent conversations should reuse lock
asyncio.create_task(agent.ask(...))
asyncio.create_task(agent.ask(...))But, the following code could leads to a problem: m = CircuitBreakerMiddleware()
agent1 = Agent(..., middleware=(m,))
agent2 = Agent(..., middleware=(m,)) # reuses agent1 lockI suggest to just reflect the problem in documentation (or docstring at least) and always stay with per-agent middleware creation |
I don't like I imagine the final API like this: agent = Agent(..., middleware=(CircuitBreakerMiddleware(),))
while True:
try:
result = await agent.ask(...)
except CircuitBreakerOpenedError as er:
await er.wait_release()
else:
breakBut, such API looks like regular boilerplate I want to avoid. CircuitBreaker requires retry logic pretty other, but it's another logic - I am agree here. So, I suggest to introduce 2 middleware here: agent = Agent(..., middleware=(
CircuitRetryMiddleware(max_retries=3),
CircuitBreakerMiddleware(),
))
try:
result = await agent.ask(...)
except TooManyAttempts:
... |
There was a problem hiding this comment.
I prefer end-to-end test here using public API (or at least do not use non-public objects). Such tests are not so implementation-tied and not so fragile. Also, they cover real usecases and could be used as a some kind of documentation
Please, try to implement each test using TestClient / TracingClient. Reference: https://github.com/ag2ai/ag2/blob/main/test/beta/middleware/test_tool_execution.py
Address Lancetnik review on ag2ai#2532: 1. Replace _blocked_response sentinel with explicit CircuitBreakerOpenError. Empty ModelResponse looked like error suppression; raising makes the blocked state visible to callers. 2. Add CircuitBreakerOpenError.wait_release() which sleeps the exact remaining recovery time (no polling). Callers can retry after awaiting release: try: result = await agent.ask(...) except CircuitBreakerOpenError as exc: await exc.wait_release() result = await agent.ask(...) 3. Drop the fallback= parameter. Suppressing the error via fallback conflicts with the explicit-failure design; retry/fallback belongs in a separate CircuitRetryMiddleware (follow-up PR). 4. Add docstring warning about sharing one middleware instance across multiple agents -- internal lock/counter are coupled when shared. Also bump flaky TestTimerRefresh sleep from 10ms to 50ms (Windows monotonic clock resolution is ~15ms). 17/17 tests pass, pre-commit clean.
|
@Lancetnik Thanks for the review. Addressed both points in 12509b2: 1. Shared instance foot-gun -> documented Added a 2. You were right, the empty
Usage matches what you sketched: agent = Agent(..., middleware=(CircuitBreakerMiddleware(),))
while True:
try:
result = await agent.ask(...)
break
except CircuitBreakerOpenError as exc:
await exc.wait_release()On I agree the boilerplate above is the reason people reach for a retry layer, and keeping Breaker and Retry as two composable middlewares is the right factoring (same as resilience4j). I'd like to ship that as a follow-up PR rather than bundle it here -- it needs its own backoff policy surface (constant/exponential/jitter), interaction rules with the breaker (does a retry count as one failure or N?), and a separate test matrix. Happy to open it immediately after this one merges if that works for you. Tests: 17/17 pass, pre-commit clean. |
3b6822f to
6d67e35
Compare
Address review feedback from @Lancetnik on ag2ai#2532: - Flatten constructor API: pass failure_threshold/recovery_timeout_s/ fallback directly instead of requiring callers to build a CircuitBreakerConfig - Drop __post_init__ validation from CircuitBreakerConfig (mypy covers the type guarantees and the runtime checks were redundant) - Add optional fallback callable so callers can customize the blocked- state ModelResponse (addresses the "empty message" UX concern while keeping the CB and Retry patterns cleanly separated) The state machine semantics (CLOSED=pass, OPEN=block, HALF_OPEN=probe) are unchanged -- they follow the canonical Fowler/Hystrix/resilience4j definition and are left as-is pending scope-of-sharing clarification.
Address Lancetnik review on ag2ai#2532: 1. Replace _blocked_response sentinel with explicit CircuitBreakerOpenError. Empty ModelResponse looked like error suppression; raising makes the blocked state visible to callers. 2. Add CircuitBreakerOpenError.wait_release() which sleeps the exact remaining recovery time (no polling). Callers can retry after awaiting release: try: result = await agent.ask(...) except CircuitBreakerOpenError as exc: await exc.wait_release() result = await agent.ask(...) 3. Drop the fallback= parameter. Suppressing the error via fallback conflicts with the explicit-failure design; retry/fallback belongs in a separate CircuitRetryMiddleware (follow-up PR). 4. Add docstring warning about sharing one middleware instance across multiple agents -- internal lock/counter are coupled when shared. Also bump flaky TestTimerRefresh sleep from 10ms to 50ms (Windows monotonic clock resolution is ~15ms). 17/17 tests pass, pre-commit clean.
|
Sorry for the back-to-back updates -- one more pass caught a couple of things worth pushing together. Since the last round:
31 tests, all passing. Pre-commit clean. |
Address review feedback from @Lancetnik on ag2ai#2532: - Flatten constructor API: pass failure_threshold/recovery_timeout_s/ fallback directly instead of requiring callers to build a CircuitBreakerConfig - Drop __post_init__ validation from CircuitBreakerConfig (mypy covers the type guarantees and the runtime checks were redundant) - Add optional fallback callable so callers can customize the blocked- state ModelResponse (addresses the "empty message" UX concern while keeping the CB and Retry patterns cleanly separated) The state machine semantics (CLOSED=pass, OPEN=block, HALF_OPEN=probe) are unchanged -- they follow the canonical Fowler/Hystrix/resilience4j definition and are left as-is pending scope-of-sharing clarification.
Address Lancetnik review on ag2ai#2532: 1. Replace _blocked_response sentinel with explicit CircuitBreakerOpenError. Empty ModelResponse looked like error suppression; raising makes the blocked state visible to callers. 2. Add CircuitBreakerOpenError.wait_release() which sleeps the exact remaining recovery time (no polling). Callers can retry after awaiting release: try: result = await agent.ask(...) except CircuitBreakerOpenError as exc: await exc.wait_release() result = await agent.ask(...) 3. Drop the fallback= parameter. Suppressing the error via fallback conflicts with the explicit-failure design; retry/fallback belongs in a separate CircuitRetryMiddleware (follow-up PR). 4. Add docstring warning about sharing one middleware instance across multiple agents -- internal lock/counter are coupled when shared. Also bump flaky TestTimerRefresh sleep from 10ms to 50ms (Windows monotonic clock resolution is ~15ms). 17/17 tests pass, pre-commit clean.
fd3fff9 to
ba133e9
Compare
Single-responsibility middleware implementing the circuit breaker pattern for LLM calls. Blocks calls after consecutive failures and allows recovery probes after a configurable timeout. - CircuitBreakerConfig: failure_threshold, recovery_timeout_s - Three states: CLOSED, OPEN, HALF_OPEN with atomic probe claiming - CancelledError releases probe without recording failure - Thread-safe state machine with mutex
Address review feedback from @Lancetnik on ag2ai#2532: - Flatten constructor API: pass failure_threshold/recovery_timeout_s/ fallback directly instead of requiring callers to build a CircuitBreakerConfig - Drop __post_init__ validation from CircuitBreakerConfig (mypy covers the type guarantees and the runtime checks were redundant) - Add optional fallback callable so callers can customize the blocked- state ModelResponse (addresses the "empty message" UX concern while keeping the CB and Retry patterns cleanly separated) The state machine semantics (CLOSED=pass, OPEN=block, HALF_OPEN=probe) are unchanged -- they follow the canonical Fowler/Hystrix/resilience4j definition and are left as-is pending scope-of-sharing clarification.
Address Lancetnik review on ag2ai#2532: 1. Replace _blocked_response sentinel with explicit CircuitBreakerOpenError. Empty ModelResponse looked like error suppression; raising makes the blocked state visible to callers. 2. Add CircuitBreakerOpenError.wait_release() which sleeps the exact remaining recovery time (no polling). Callers can retry after awaiting release: try: result = await agent.ask(...) except CircuitBreakerOpenError as exc: await exc.wait_release() result = await agent.ask(...) 3. Drop the fallback= parameter. Suppressing the error via fallback conflicts with the explicit-failure design; retry/fallback belongs in a separate CircuitRetryMiddleware (follow-up PR). 4. Add docstring warning about sharing one middleware instance across multiple agents -- internal lock/counter are coupled when shared. Also bump flaky TestTimerRefresh sleep from 10ms to 50ms (Windows monotonic clock resolution is ~15ms). 17/17 tests pass, pre-commit clean.
…BreakerMiddleware CB-1 (stale success / OPEN -> CLOSED without HALF_OPEN gate): record_success() previously reset failure_count and opened_at unconditionally. A delayed success arriving while the circuit was OPEN could bypass the HALF_OPEN probe requirement and jump directly to CLOSED. Fix: record_success() now reads the current state under the lock and acts per state: - CLOSED: reset failure_count only - HALF_OPEN: full reset to CLOSED (probe validated) - OPEN: no state change; log debug and return Adds regression tests: test_record_success_in_open_state_does_not_close and test_record_success_in_closed_state_resets_failure_count. CB-2 (CircuitBreakerOpenError not picklable): The exception stored a reference to _CircuitBreaker which holds a threading.Lock -- threading.Lock cannot be pickled, so the exception could not be serialised or forwarded across process boundaries. Fix: change CircuitBreakerOpenError.__init__ to accept a 'remaining_s' float snapshot instead of the breaker object. wait_release() sleeps on the snapshot. __reduce__ enables pickle roundtrips. Update both raise sites in _CircuitBreakerInstance.on_llm_call to pass self._cb.remaining_recovery_s(). Adds regression test: test_circuit_breaker_open_error_picklable, test_wait_release_sleeps_for_remaining_time, and test_wait_release_returns_immediately_when_zero. Also add top-level export test for autogen.beta.middleware (export regression guard).
…dleware CB-1 (stale non-probe success): record_success() now requires a probe_token parameter. Closing from HALF_OPEN is only allowed when the token matches the current _probe_generation, preventing calls admitted while CLOSED from prematurely closing a later HALF_OPEN circuit. CB-2 (pickle safety): CircuitBreakerOpenError now stores a remaining_s float snapshot rather than a _CircuitBreaker reference. Added __reduce__ for full pickle/multiprocessing support. CB-3 (stale claimed probe): Added _probe_generation int to _CircuitBreaker. The counter increments on every record_failure() that opens or refreshes the circuit. check_and_claim() returns (state, generation_token). record_success() closes only when the token matches, so an old probe admitted before a later failure cannot close the refreshed circuit. Export: CircuitBreakerConfig, CircuitBreakerMiddleware, CircuitBreakerOpenError, and CircuitState are now re-exported from autogen.beta.middleware. Tests: 13 new tests covering stale non-probe success, stale claimed probe regression, generation invalidation, failed probe reopen, cancellation re-claim, pickling, wait_release mock, and export regression. Replaced real-sleep flaky test with asyncio.sleep mock.
…n tests - TestProbeGeneration.test_probe_generation_advances_on_first_failure: asserts _probe_generation goes from 0 to 1 on the threshold-crossing failure. - TestProbeGeneration.test_probe_generation_advances_again_on_subsequent_failure: asserts generation increments a second time while _opened_at is already set. - TestProbeGeneration.test_middleware_stale_claimed_probe_does_not_close_circuit: middleware-level timeline -- probe P admitted, intervening failure bumps generation, P completes successfully, circuit stays HALF_OPEN/OPEN.
Replace the private state-machine calls with a coroutine-based test that exercises the full on_llm_call wiring: probe token capture, probe_token pass-through to record_success, and generation-mismatch rejection. All three test_middleware_stale_claimed_probe timeline steps now go through CircuitBreakerMiddleware.on_llm_call() rather than _CircuitBreaker internals, so a wiring regression would be caught.
…dleware regression test If _CircuitBreakerInstance.on_llm_call ever regresses to calling record_success() without the captured probe_token, the spy assertion now fails explicitly instead of silently passing because the state-machine would reject None anyway.
3f0162c to
dc6714d
Compare
Codecov Report❌ Patch coverage is
... and 350 files with indirect coverage changes 🚀 New features to boost your workflow:
|
Why are these changes needed?
Extracts the circuit breaker from GovernanceMiddleware (#2501, closed) into
a standalone middleware. Three states (CLOSED, OPEN, HALF_OPEN), atomic probe
claiming for the recovery path, and proper CancelledError handling that
releases the probe without recording a failure.
Same motivation as the BudgetMiddleware split (#2531): one middleware per
concern, composable via the middleware chain. Prior art in #2430 (circuit
breaker docs, merged).
Related issue number
Replaces the circuit breaker portion of #2501 (closed).
Related: #2430 (docs, merged), #2531 (BudgetMiddleware).
Checks