Fix SubscribeAsync silently failing when Dapr sidecar is unavailable#1744
Open
GeorgeTsiokos wants to merge 6 commits intodapr:masterfrom
Open
Fix SubscribeAsync silently failing when Dapr sidecar is unavailable#1744GeorgeTsiokos wants to merge 6 commits intodapr:masterfrom
GeorgeTsiokos wants to merge 6 commits intodapr:masterfrom
Conversation
74b1b32 to
9ce96c6
Compare
Contributor
|
Do note that I just merged a PR to support xUnit v3 so your unit/integration tests may need a refresh. |
9ce96c6 to
4aba9a6
Compare
- Throw DaprException wrapping RpcException on initial connection failure - Add SubscriptionErrorHandler callback for runtime errors in background tasks - Reset hasInitialized on any failure (initial or mid-stream) to allow retry - Guard ErrorHandler invocation with try/catch to prevent unobserved exceptions - Use CancellationToken.None for ContinueWith to ensure error handlers fire - Include topic/pubsub names in error messages for multi-subscription debugging Signed-off-by: George Tsiokos <george@tsiokos.com> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
4aba9a6 to
5da307f
Compare
- Re-throw in HandleTaskCompletion when no ErrorHandler is set (preserves pre-existing UnobservedTaskException behavior) - Clear stale clientStream on background failure to prevent retry reusing a dead stream - Skip ErrorHandler invocation for OperationCanceledException (user cancellation is not an error) - Fix inaccurate XML doc comments Signed-off-by: George Tsiokos <george@tsiokos.com> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This comment was marked as outdated.
This comment was marked as outdated.
Author
|
Out of scope for this PR:
|
- Change SubscriptionErrorHandler delegate from void to Task to support async error handling (logging sinks, alerting) - Make HandleTaskCompletion async to await the error handler - Update tests for async handler signature - Fix PR comment to not call out issues in our own code as pre-existing Signed-off-by: George Tsiokos <george@tsiokos.com> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
…nstance method Upstream tests added in a573c3c and 4218d7f called HandleTaskCompletion as a static method and expected AggregateException. The PR changed the method to an instance async Task returning DaprException; update the three affected test call sites to match. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: George Tsiokos <george@tsiokos.com>
The initial fix for dapr#1663 addressed the synchronous subscribe path but left a near-identical silent-failure hole in the background continuation path: HandleTaskCompletion was an async Task wired via _ = ContinueWith(...), so when no ErrorHandler was configured its "throw daprException" faulted an unobserved inner Task that never surfaced — exactly the bug the PR set out to fix. Changes: - HandleTaskCompletion no longer throws. It caches the first fault per subscribe cycle in a new pendingBackgroundFault field, which the next SubscribeAsync call drains and rethrows. Callers that skip the ErrorHandler still observe the error reliably rather than losing it to unobserved-task-exception void. - A new hasFaulted dedupe flag ensures that when multiple sibling continuations (FetchDataFromSidecar / ProcessAcknowledgementChannel / ProcessTopicChannel) all fault on a single sidecar outage, only the first is reported. - ResetStreamStateAsync now acquires the same semaphore used by GetStreamAsync, nulls clientStream before resetting hasInitialized, and disposes the prior stream — closing the race where a concurrent SubscribeAsync could observe a stale clientStream and leaking the gRPC streaming call on reset. - OperationCanceledException handling now inspects every inner exception (via AggregateException.InnerExceptions.All(...)) instead of just the first, so a fault wrapping both an OCE and a real RpcException still surfaces the real error. - When the user-supplied ErrorHandler itself throws, both the original fault and the handler failure are combined into an AggregateException and cached, replacing the previous empty catch that silently swallowed handler bugs. - DaprSubscriptionOptions.ErrorHandler and SubscriptionErrorHandler XML docs updated to describe the new cache-and-rethrow contract accurately. Tests: Five new tests drive the real SubscribeAsync -> ContinueWith -> HandleTaskCompletion chain (instead of awaiting HandleTaskCompletion directly) to reproduce each scenario end-to-end: - NextSubscribeRethrows — verifies the no-handler cache path - InvokesErrorHandlerExactlyOnce — verifies hasFaulted dedupe - NextSubscribeRecreatesStream — verifies reset + retry works end-to-end - BackgroundFetchCancelled_DoesNotCachePendingFault — verifies OCE short-circuit - ErrorHandlerThrows_CachesCombinedFaultForNextSubscribe — verifies handler-fault capture I verified these five reproduce the bug: after temporarily reverting the source fix and re-running them, they all fail (with the exact "background fault was not cached" / synchronous-throw symptoms the fix addresses). Two pre-existing unit tests were adjusted to match the new contract — they now exercise HandleTaskCompletion followed by SubscribeAsync to confirm the fault is surfaced via the cache-and-rethrow path instead of via a synchronous throw. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: George Tsiokos <george@tsiokos.com>
20c2da6 to
ca41414
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fixes #1663
PublishSubscribeReceiver.SubscribeAsyncused to silently fail when the Dapr sidecar was unavailable: the initial gRPC call could throw, and three background continuations could fault, with all of those errors disappearing into unobserved task exceptions. This PR closes both halves of that hole and makes the receiver safely re-subscribable.SubscribeAsynccatchesRpcExceptionfrom the gRPC stream setup and throws aDaprExceptionwith topic / pubsub context, then resetshasInitializedso the caller can retry once the sidecar is back.HandleTaskCompletionno longer throws (which would have faulted the unobserved innerTaskreturned by_ = ContinueWith(HandleTaskCompletion, ...)— the exact bug class this PR set out to fix). Instead it caches the first fault per subscribe cycle inpendingBackgroundFault; the next call toSubscribeAsyncdrains and rethrows it, so callers that don't configure anErrorHandlerstill observe the error.hasFaultedflag (CAS) dedupes across the three sibling continuations (FetchDataFromSidecarAsync,ProcessAcknowledgementChannelMessagesAsync,ProcessTopicChannelMessagesAsync), which previously could each fireHandleTaskCompletionfor the same logical failure.ResetStreamStateAsyncruns under the sameSemaphoreSlimasGetStreamAsync, nullsclientStreambefore resettinghasInitialized, and disposes the priorAsyncDuplexStreamingCall<,>to release the gRPC call. This closes the race where a concurrentSubscribeAsynccould observe a resethasInitializedwhile still seeing a stale stream reference.SubscriptionErrorHandler-thrown exceptions are surfaced: if a user-supplied error handler itself throws, the originalDaprExceptionand the handler exception are combined into anAggregateExceptionand cached for the nextSubscribeAsyncinstead of being silently swallowed by an emptycatch.OperationCanceledExceptionshort-circuit now inspects every entry intask.Exception.InnerExceptionsrather than only the first, so a fault wrapping both an OCE and a realRpcExceptionstill surfaces the real error rather than being suppressed as a clean cancel.ErrorHandlerdelegate is invoked safely:ErrorHandler?.Invoke()is guarded by try/catch, with the catch now routing through the cache-and-rethrow path described above instead of being a silent drop.CancellationToken.None: ensures error-handling continuations always execute even after the caller's token is cancelled, so faults aren't lost toOnlyOnFaultednot running.DaprSubscriptionOptions.ErrorHandlerandSubscriptionErrorHandlernow describe the cache-and-rethrow contract accurately (the prior comment claimed errors "surface as unobserved task exceptions", which was both inaccurate and the original bug).Test plan
Original test plan (unchanged, still passing):
HandleTaskCompletioninvokesErrorHandlerwithDaprExceptionwrapping the original exceptionHandleTaskCompletiondoes not throw synchronously when noErrorHandleris set (now: caches for next subscribe)HandleTaskCompletiondoes not propagate whenErrorHandleritself throws (now: caches combined fault)HandleTaskCompletiondoes not invokeErrorHandlerfor successful tasksSubscribeAsyncthrowsDaprExceptionwrappingRpcExceptionwhen sidecar unavailableSubscribeAsyncallows retry afterRpcExceptionfailure (hasInitializedreset)SubscribeAsyncresetshasInitializedfor non-RPC exceptions (e.g.ObjectDisposedException)TestContext.Current.CancellationToken)New end-to-end repro tests (drive the real
SubscribeAsync→ContinueWith→HandleTaskCompletionchain via a mockIAsyncStreamReader.MoveNextthat faults — not by awaitingHandleTaskCompletiondirectly):SubscribeAsync_WhenBackgroundFetchFaults_WithoutHandler_NextSubscribeRethrows— the no-handler cache-and-rethrow pathSubscribeAsync_WhenBackgroundFetchFaults_InvokesErrorHandlerExactlyOnce—hasFaulteddedupe across the three continuationsSubscribeAsync_AfterBackgroundFault_NextSubscribeRecreatesStream— reset + retry obtains a fresh stream end-to-end (verified viaMock.Verify(..., Times.Exactly(2)))SubscribeAsync_WhenBackgroundFetchCancelled_DoesNotCachePendingFault— cancellation short-circuit doesn't pollute the pending-fault slotSubscribeAsync_WhenErrorHandlerThrows_CachesCombinedFaultForNextSubscribe— handler-faulted path produces anAggregateExceptionwith both the originalDaprExceptionand the handler's exception🤖 Generated with Claude Code