Skip to content

Fix SubscribeAsync silently failing when Dapr sidecar is unavailable#1744

Open
GeorgeTsiokos wants to merge 6 commits intodapr:masterfrom
GeorgeTsiokos:fix/subscribe-async-silent-failure-1663
Open

Fix SubscribeAsync silently failing when Dapr sidecar is unavailable#1744
GeorgeTsiokos wants to merge 6 commits intodapr:masterfrom
GeorgeTsiokos:fix/subscribe-async-silent-failure-1663

Conversation

@GeorgeTsiokos
Copy link
Copy Markdown

@GeorgeTsiokos GeorgeTsiokos commented Mar 13, 2026

Summary

Fixes #1663

PublishSubscribeReceiver.SubscribeAsync used to silently fail when the Dapr sidecar was unavailable: the initial gRPC call could throw, and three background continuations could fault, with all of those errors disappearing into unobserved task exceptions. This PR closes both halves of that hole and makes the receiver safely re-subscribable.

  • Initial-subscribe failures throw: SubscribeAsync catches RpcException from the gRPC stream setup and throws a DaprException with topic / pubsub context, then resets hasInitialized so the caller can retry once the sidecar is back.
  • Background-task faults are no longer silent: HandleTaskCompletion no longer throws (which would have faulted the unobserved inner Task returned by _ = ContinueWith(HandleTaskCompletion, ...) — the exact bug class this PR set out to fix). Instead it caches the first fault per subscribe cycle in pendingBackgroundFault; the next call to SubscribeAsync drains and rethrows it, so callers that don't configure an ErrorHandler still observe the error.
  • Single sidecar outage produces exactly one report: a new hasFaulted flag (CAS) dedupes across the three sibling continuations (FetchDataFromSidecarAsync, ProcessAcknowledgementChannelMessagesAsync, ProcessTopicChannelMessagesAsync), which previously could each fire HandleTaskCompletion for the same logical failure.
  • Stream reset is race-free and leak-free: new ResetStreamStateAsync runs under the same SemaphoreSlim as GetStreamAsync, nulls clientStream before resetting hasInitialized, and disposes the prior AsyncDuplexStreamingCall<,> to release the gRPC call. This closes the race where a concurrent SubscribeAsync could observe a reset hasInitialized while still seeing a stale stream reference.
  • SubscriptionErrorHandler-thrown exceptions are surfaced: if a user-supplied error handler itself throws, the original DaprException and the handler exception are combined into an AggregateException and cached for the next SubscribeAsync instead of being silently swallowed by an empty catch.
  • Cancellation is detected across all inner exceptions: the OperationCanceledException short-circuit now inspects every entry in task.Exception.InnerExceptions rather than only the first, so a fault wrapping both an OCE and a real RpcException still surfaces the real error rather than being suppressed as a clean cancel.
  • ErrorHandler delegate is invoked safely: ErrorHandler?.Invoke() is guarded by try/catch, with the catch now routing through the cache-and-rethrow path described above instead of being a silent drop.
  • Continuations use CancellationToken.None: ensures error-handling continuations always execute even after the caller's token is cancelled, so faults aren't lost to OnlyOnFaulted not running.
  • Doc comments updated: XML docs on DaprSubscriptionOptions.ErrorHandler and SubscriptionErrorHandler now describe the cache-and-rethrow contract accurately (the prior comment claimed errors "surface as unobserved task exceptions", which was both inaccurate and the original bug).

Test plan

Original test plan (unchanged, still passing):

  • HandleTaskCompletion invokes ErrorHandler with DaprException wrapping the original exception
  • HandleTaskCompletion does not throw synchronously when no ErrorHandler is set (now: caches for next subscribe)
  • HandleTaskCompletion does not propagate when ErrorHandler itself throws (now: caches combined fault)
  • HandleTaskCompletion does not invoke ErrorHandler for successful tasks
  • SubscribeAsync throws DaprException wrapping RpcException when sidecar unavailable
  • SubscribeAsync allows retry after RpcException failure (hasInitialized reset)
  • SubscribeAsync resets hasInitialized for non-RPC exceptions (e.g. ObjectDisposedException)
  • Tests use xUnit v3 patterns (TestContext.Current.CancellationToken)

New end-to-end repro tests (drive the real SubscribeAsyncContinueWithHandleTaskCompletion chain via a mock IAsyncStreamReader.MoveNext that faults — not by awaiting HandleTaskCompletion directly):

  • SubscribeAsync_WhenBackgroundFetchFaults_WithoutHandler_NextSubscribeRethrows — the no-handler cache-and-rethrow path
  • SubscribeAsync_WhenBackgroundFetchFaults_InvokesErrorHandlerExactlyOncehasFaulted dedupe across the three continuations
  • SubscribeAsync_AfterBackgroundFault_NextSubscribeRecreatesStream — reset + retry obtains a fresh stream end-to-end (verified via Mock.Verify(..., Times.Exactly(2)))
  • SubscribeAsync_WhenBackgroundFetchCancelled_DoesNotCachePendingFault — cancellation short-circuit doesn't pollute the pending-fault slot
  • SubscribeAsync_WhenErrorHandlerThrows_CachesCombinedFaultForNextSubscribe — handler-faulted path produces an AggregateException with both the original DaprException and the handler's exception
  • Verified the new tests genuinely reproduce the bug: with the source fix temporarily reverted, 5 of 6 fail with the exact "background fault not cached" / synchronous-throw symptoms the fix addresses
  • All 68 tests pass on net8.0, net9.0, and net10.0 across multiple back-to-back local runs

🤖 Generated with Claude Code

@GeorgeTsiokos GeorgeTsiokos requested review from a team as code owners March 13, 2026 10:19
@GeorgeTsiokos GeorgeTsiokos force-pushed the fix/subscribe-async-silent-failure-1663 branch from 74b1b32 to 9ce96c6 Compare March 13, 2026 10:22
@WhitWaldo
Copy link
Copy Markdown
Contributor

Do note that I just merged a PR to support xUnit v3 so your unit/integration tests may need a refresh.

@GeorgeTsiokos GeorgeTsiokos marked this pull request as draft March 15, 2026 18:51
@GeorgeTsiokos GeorgeTsiokos force-pushed the fix/subscribe-async-silent-failure-1663 branch from 9ce96c6 to 4aba9a6 Compare March 15, 2026 20:23
- Throw DaprException wrapping RpcException on initial connection failure
- Add SubscriptionErrorHandler callback for runtime errors in background tasks
- Reset hasInitialized on any failure (initial or mid-stream) to allow retry
- Guard ErrorHandler invocation with try/catch to prevent unobserved exceptions
- Use CancellationToken.None for ContinueWith to ensure error handlers fire
- Include topic/pubsub names in error messages for multi-subscription debugging

Signed-off-by: George Tsiokos <george@tsiokos.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@GeorgeTsiokos GeorgeTsiokos force-pushed the fix/subscribe-async-silent-failure-1663 branch from 4aba9a6 to 5da307f Compare March 15, 2026 20:30
@GeorgeTsiokos GeorgeTsiokos marked this pull request as ready for review March 15, 2026 20:34
- Re-throw in HandleTaskCompletion when no ErrorHandler is set (preserves
  pre-existing UnobservedTaskException behavior)
- Clear stale clientStream on background failure to prevent retry reusing
  a dead stream
- Skip ErrorHandler invocation for OperationCanceledException (user
  cancellation is not an error)
- Fix inaccurate XML doc comments

Signed-off-by: George Tsiokos <george@tsiokos.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@GeorgeTsiokos

This comment was marked as outdated.

@GeorgeTsiokos
Copy link
Copy Markdown
Author

GeorgeTsiokos commented Mar 15, 2026

Out of scope for this PR:

  1. ProcessTopicChannelMessagesAsync swallows OperationCanceledException from the message handler — timeout-triggered cancellations are silently converted to the default response action with no logging.
  2. FetchDataFromSidecarAsync has a bare catch (Exception) (line 351) that silently swallows channel-write failures, which could mask backpressure or disposal races.
  3. No ILogger anywhere in PublishSubscribeReceiver — all the silent catches above exist because there's no logger to report to. Adding an ILogger parameter would let these be logged instead of swallowed.
  4. ContinueWith + TaskScheduler.Default — the continuation pattern is fragile; a failed error handler's exception becomes unobserved. Modern async/await with a top-level try/catch would be more robust.

GeorgeTsiokos and others added 4 commits March 15, 2026 17:06
- Change SubscriptionErrorHandler delegate from void to Task to support
  async error handling (logging sinks, alerting)
- Make HandleTaskCompletion async to await the error handler
- Update tests for async handler signature
- Fix PR comment to not call out issues in our own code as pre-existing

Signed-off-by: George Tsiokos <george@tsiokos.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
…nstance method

Upstream tests added in a573c3c and 4218d7f called HandleTaskCompletion as a static
method and expected AggregateException. The PR changed the method to an instance async
Task returning DaprException; update the three affected test call sites to match.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: George Tsiokos <george@tsiokos.com>
The initial fix for dapr#1663 addressed the synchronous subscribe path but left a
near-identical silent-failure hole in the background continuation path:
HandleTaskCompletion was an async Task wired via _ = ContinueWith(...), so when
no ErrorHandler was configured its "throw daprException" faulted an unobserved
inner Task that never surfaced — exactly the bug the PR set out to fix.

Changes:

- HandleTaskCompletion no longer throws. It caches the first fault per subscribe
  cycle in a new pendingBackgroundFault field, which the next SubscribeAsync
  call drains and rethrows. Callers that skip the ErrorHandler still observe
  the error reliably rather than losing it to unobserved-task-exception void.

- A new hasFaulted dedupe flag ensures that when multiple sibling continuations
  (FetchDataFromSidecar / ProcessAcknowledgementChannel / ProcessTopicChannel)
  all fault on a single sidecar outage, only the first is reported.

- ResetStreamStateAsync now acquires the same semaphore used by GetStreamAsync,
  nulls clientStream before resetting hasInitialized, and disposes the prior
  stream — closing the race where a concurrent SubscribeAsync could observe a
  stale clientStream and leaking the gRPC streaming call on reset.

- OperationCanceledException handling now inspects every inner exception (via
  AggregateException.InnerExceptions.All(...)) instead of just the first, so a
  fault wrapping both an OCE and a real RpcException still surfaces the real
  error.

- When the user-supplied ErrorHandler itself throws, both the original fault
  and the handler failure are combined into an AggregateException and cached,
  replacing the previous empty catch that silently swallowed handler bugs.

- DaprSubscriptionOptions.ErrorHandler and SubscriptionErrorHandler XML docs
  updated to describe the new cache-and-rethrow contract accurately.

Tests:

Five new tests drive the real SubscribeAsync -> ContinueWith -> HandleTaskCompletion
chain (instead of awaiting HandleTaskCompletion directly) to reproduce each
scenario end-to-end:

  - NextSubscribeRethrows — verifies the no-handler cache path
  - InvokesErrorHandlerExactlyOnce — verifies hasFaulted dedupe
  - NextSubscribeRecreatesStream — verifies reset + retry works end-to-end
  - BackgroundFetchCancelled_DoesNotCachePendingFault — verifies OCE short-circuit
  - ErrorHandlerThrows_CachesCombinedFaultForNextSubscribe — verifies handler-fault capture

I verified these five reproduce the bug: after temporarily reverting the source
fix and re-running them, they all fail (with the exact "background fault was
not cached" / synchronous-throw symptoms the fix addresses).

Two pre-existing unit tests were adjusted to match the new contract — they now
exercise HandleTaskCompletion followed by SubscribeAsync to confirm the fault
is surfaced via the cache-and-rethrow path instead of via a synchronous throw.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: George Tsiokos <george@tsiokos.com>
@GeorgeTsiokos GeorgeTsiokos force-pushed the fix/subscribe-async-silent-failure-1663 branch from 20c2da6 to ca41414 Compare April 9, 2026 02:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Messaging] SubscribeAsync silently fails with unobserved task exceptions when Dapr sidecar is unavailable

2 participants