Skip to content

Commit ca41414

Browse files
GeorgeTsiokosclaude
andcommitted
Fix background-task silent failures in PublishSubscribeReceiver
The initial fix for #1663 addressed the synchronous subscribe path but left a near-identical silent-failure hole in the background continuation path: HandleTaskCompletion was an async Task wired via _ = ContinueWith(...), so when no ErrorHandler was configured its "throw daprException" faulted an unobserved inner Task that never surfaced — exactly the bug the PR set out to fix. Changes: - HandleTaskCompletion no longer throws. It caches the first fault per subscribe cycle in a new pendingBackgroundFault field, which the next SubscribeAsync call drains and rethrows. Callers that skip the ErrorHandler still observe the error reliably rather than losing it to unobserved-task-exception void. - A new hasFaulted dedupe flag ensures that when multiple sibling continuations (FetchDataFromSidecar / ProcessAcknowledgementChannel / ProcessTopicChannel) all fault on a single sidecar outage, only the first is reported. - ResetStreamStateAsync now acquires the same semaphore used by GetStreamAsync, nulls clientStream before resetting hasInitialized, and disposes the prior stream — closing the race where a concurrent SubscribeAsync could observe a stale clientStream and leaking the gRPC streaming call on reset. - OperationCanceledException handling now inspects every inner exception (via AggregateException.InnerExceptions.All(...)) instead of just the first, so a fault wrapping both an OCE and a real RpcException still surfaces the real error. - When the user-supplied ErrorHandler itself throws, both the original fault and the handler failure are combined into an AggregateException and cached, replacing the previous empty catch that silently swallowed handler bugs. - DaprSubscriptionOptions.ErrorHandler and SubscriptionErrorHandler XML docs updated to describe the new cache-and-rethrow contract accurately. Tests: Five new tests drive the real SubscribeAsync -> ContinueWith -> HandleTaskCompletion chain (instead of awaiting HandleTaskCompletion directly) to reproduce each scenario end-to-end: - NextSubscribeRethrows — verifies the no-handler cache path - InvokesErrorHandlerExactlyOnce — verifies hasFaulted dedupe - NextSubscribeRecreatesStream — verifies reset + retry works end-to-end - BackgroundFetchCancelled_DoesNotCachePendingFault — verifies OCE short-circuit - ErrorHandlerThrows_CachesCombinedFaultForNextSubscribe — verifies handler-fault capture I verified these five reproduce the bug: after temporarily reverting the source fix and re-running them, they all fail (with the exact "background fault was not cached" / synchronous-throw symptoms the fix addresses). Two pre-existing unit tests were adjusted to match the new contract — they now exercise HandleTaskCompletion followed by SubscribeAsync to confirm the fault is surfaced via the cache-and-rethrow path instead of via a synchronous throw. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: George Tsiokos <george@tsiokos.com>
1 parent 0092cda commit ca41414

4 files changed

Lines changed: 379 additions & 25 deletions

File tree

src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ public sealed record DaprSubscriptionOptions(MessageHandlingPolicy MessageHandli
4444
/// <summary>
4545
/// An optional callback invoked when errors occur in background tasks during an active subscription.
4646
/// Errors during the initial subscription attempt are thrown directly to the caller.
47-
/// If not set, background task errors surface as unobserved task exceptions (default .NET behavior).
47+
/// If not set, the first background fault per subscribe cycle is cached on the receiver and rethrown
48+
/// on the next call to <c>SubscribeAsync</c>, so the caller can observe it and decide whether to retry.
49+
/// If the handler itself throws, both the original fault and the handler failure are cached and
50+
/// surfaced together as an <see cref="AggregateException"/> on the next subscribe attempt.
4851
/// </summary>
4952
public SubscriptionErrorHandler? ErrorHandler { get; init; }
5053
}

src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs

Lines changed: 86 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,17 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable
7474
/// </summary>
7575
private int hasInitialized;
7676
/// <summary>
77+
/// Dedupes <see cref="HandleTaskCompletion"/> across the three background continuations so a single
78+
/// sidecar failure surfaces exactly once per subscribe cycle. Reset at the start of each new cycle.
79+
/// </summary>
80+
private int hasFaulted;
81+
/// <summary>
82+
/// Stores the first unhandled background fault per subscribe cycle. Rethrown on the next call to
83+
/// <see cref="SubscribeAsync"/> so a caller that did not configure an <see cref="DaprSubscriptionOptions.ErrorHandler"/>
84+
/// still observes the error rather than losing it as an unobserved task exception.
85+
/// </summary>
86+
private Exception? pendingBackgroundFault;
87+
/// <summary>
7788
/// Flag that ensures the instance is only disposed a single time.
7889
/// </summary>
7990
private bool isDisposed;
@@ -82,6 +93,9 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable
8293
internal Task TopicMessagesChannelCompletion => topicMessagesChannel.Reader.Completion;
8394
// Internal property for testing purposes
8495
internal Task AcknowledgementsChannelCompletion => acknowledgementsChannel.Reader.Completion;
96+
// Internal property for testing purposes — exposes whether a background fault has been cached
97+
// and is waiting to be surfaced on the next SubscribeAsync call.
98+
internal bool HasPendingBackgroundFault => Volatile.Read(ref pendingBackgroundFault) is not null;
8599

86100
/// <summary>
87101
/// Constructs a new instance of a <see cref="PublishSubscribeReceiver"/> instance.
@@ -113,12 +127,23 @@ internal PublishSubscribeReceiver(string pubSubName, string topicName, DaprSubsc
113127
/// <returns>A task representing the asynchronous subscribe operation.</returns>
114128
internal async Task SubscribeAsync(CancellationToken cancellationToken = default)
115129
{
130+
// Surface any unhandled background fault from a prior subscribe cycle so the caller
131+
// can observe it explicitly before re-subscribing.
132+
var carried = Interlocked.Exchange(ref pendingBackgroundFault, null);
133+
if (carried is not null)
134+
{
135+
throw carried;
136+
}
137+
116138
//Prevents the receiver from performing the subscribe operation more than once (as the multiple initialization messages would cancel the stream).
117139
if (Interlocked.Exchange(ref hasInitialized, 1) == 1)
118140
{
119141
return;
120142
}
121143

144+
// Reset the per-cycle fault dedupe flag.
145+
Interlocked.Exchange(ref hasFaulted, 0);
146+
122147
AsyncDuplexStreamingCall<P.SubscribeTopicEventsRequestAlpha1, P.SubscribeTopicEventsResponseAlpha1> stream;
123148
try
124149
{
@@ -168,42 +193,93 @@ internal async Task WriteAcknowledgementToChannelAsync(TopicAcknowledgement ackn
168193
}
169194

170195
/// <summary>
171-
/// Handles faulted background tasks by resetting the initialization flag and invoking the configured error handler, if any.
196+
/// Handles faulted background tasks by resetting the subscription state and either invoking the
197+
/// configured error handler or caching the fault for the next <see cref="SubscribeAsync"/> call.
172198
/// </summary>
199+
/// <remarks>
200+
/// This method is the terminal continuation for <see cref="FetchDataFromSidecarAsync"/>,
201+
/// <see cref="ProcessAcknowledgementChannelMessagesAsync"/>, and <see cref="ProcessTopicChannelMessagesAsync"/>.
202+
/// It never re-throws: doing so would fault an unobserved inner <see cref="Task"/> (the original bug
203+
/// this class was meant to fix). Instead, unhandled faults are stored in
204+
/// <see cref="pendingBackgroundFault"/> and surfaced on the caller's next subscribe attempt.
205+
/// </remarks>
173206
internal async Task HandleTaskCompletion(Task task, object? state)
174207
{
175208
if (task.Exception is null)
176209
{
177210
return;
178211
}
179212

180-
// Reset initialization flag so a future SubscribeAsync call can re-establish the subscription
181-
Interlocked.Exchange(ref hasInitialized, 0);
182-
clientStream = null;
213+
// Dedupe: the three sibling continuations typically all fault when the sidecar dies.
214+
// Only the first one is reported; the rest observe the dedupe flag and exit.
215+
if (Interlocked.CompareExchange(ref hasFaulted, 1, 0) != 0)
216+
{
217+
return;
218+
}
183219

184-
var innerException = task.Exception.InnerException ?? task.Exception;
220+
var innerExceptions = task.Exception.InnerExceptions;
185221

186-
if (innerException is OperationCanceledException)
222+
// If every inner exception is a cancellation, treat the fault as a clean cancel:
223+
// reset the subscription state but do not cache or surface an error.
224+
if (innerExceptions.All(e => e is OperationCanceledException))
187225
{
226+
await ResetStreamStateAsync().ConfigureAwait(false);
188227
return;
189228
}
190229

230+
// Prefer a non-cancellation inner exception for the user-facing message.
231+
var innerException = innerExceptions.FirstOrDefault(e => e is not OperationCanceledException)
232+
?? task.Exception;
233+
234+
await ResetStreamStateAsync().ConfigureAwait(false);
235+
191236
var daprException = new DaprException(
192237
$"An error occurred during an active subscription to topic '{topicName}' on pubsub '{pubSubName}'.",
193238
innerException);
194239

195240
if (options.ErrorHandler is null)
196241
{
197-
throw daprException;
242+
// No handler configured: cache so the next SubscribeAsync surfaces it.
243+
Interlocked.CompareExchange(ref pendingBackgroundFault, daprException, null);
244+
return;
245+
}
246+
247+
try
248+
{
249+
await options.ErrorHandler.Invoke(daprException).ConfigureAwait(false);
198250
}
251+
catch (Exception handlerEx)
252+
{
253+
// User-supplied handler threw. Cache a combined fault so the next SubscribeAsync
254+
// surfaces both the original error and the handler failure — never silently drop either.
255+
var combined = new AggregateException(
256+
$"The SubscriptionErrorHandler for topic '{topicName}' on pubsub '{pubSubName}' threw while handling a prior fault.",
257+
daprException, handlerEx);
258+
Interlocked.CompareExchange(ref pendingBackgroundFault, combined, null);
259+
}
260+
}
199261

262+
/// <summary>
263+
/// Atomically tears down the active stream and resets the initialization flag.
264+
/// </summary>
265+
/// <remarks>
266+
/// Serialized under <see cref="semaphore"/> (the same lock used by <see cref="GetStreamAsync"/>)
267+
/// so a concurrent <see cref="SubscribeAsync"/> cannot observe <see cref="hasInitialized"/> reset
268+
/// while <see cref="clientStream"/> still holds the stale reference.
269+
/// </remarks>
270+
private async Task ResetStreamStateAsync()
271+
{
272+
await semaphore.WaitAsync().ConfigureAwait(false);
200273
try
201274
{
202-
await options.ErrorHandler.Invoke(daprException);
275+
var old = clientStream;
276+
clientStream = null;
277+
Interlocked.Exchange(ref hasInitialized, 0);
278+
old?.Dispose();
203279
}
204-
catch (Exception)
280+
finally
205281
{
206-
// No logger available; prevent a faulty error handler from becoming an unobserved task exception
282+
semaphore.Release();
207283
}
208284
}
209285

src/Dapr.Messaging/PublishSubscribe/SubscriptionErrorHandler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ namespace Dapr.Messaging.PublishSubscribe;
1919
/// <param name="exception">The <see cref="DaprException"/> wrapping the original error.</param>
2020
/// <remarks>
2121
/// This handler is invoked on a thread pool thread. Implementations should be thread-safe.
22-
/// If the returned task faults, the exception will be suppressed.
22+
/// If the returned task faults, the handler's exception is combined with the original fault and
23+
/// surfaced as an <see cref="AggregateException"/> on the next call to <c>SubscribeAsync</c>.
2324
/// </remarks>
2425
public delegate Task SubscriptionErrorHandler(DaprException exception);

0 commit comments

Comments
 (0)