Skip to content

Commit 31e2c8e

Browse files
Fix LISTEN/NOTIFY connection self-healing to prevent worker death (#420)
When EnableLongPolling is enabled, the ListenForNotificationsAsync method only caught TaskCanceledException, causing all other exceptions (network timeouts, connection drops) to propagate up and kill the worker thread. This change adds self-healing reconnection logic that catches connection failures, reconnects after a 1 second backoff, and continues the loop. Fixes #410 Co-authored-by: Žygimantas A. <12134941+azygis@users.noreply.github.com>
1 parent 82d485e commit 31e2c8e

File tree

2 files changed

+78
-18
lines changed

2 files changed

+78
-18
lines changed

src/Hangfire.PostgreSql/PostgreSqlJobQueue.cs

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,10 @@ public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken)
7575
cancelListenSource?.Cancel();
7676
listenTask?.Wait();
7777
}
78-
catch (AggregateException ex)
78+
catch (AggregateException)
7979
{
80-
if (ex.InnerException is not TaskCanceledException)
81-
{
82-
throw;
83-
}
84-
85-
// Otherwise do nothing, cancel exception is expected.
80+
// Swallow all exceptions from listen task cleanup.
81+
// The main dequeue operation already succeeded or failed independently.
8682
}
8783
finally
8884
{
@@ -282,30 +278,45 @@ private Task ListenForNotificationsAsync(CancellationToken cancellationToken)
282278
// CreateAnOpenConnection can return the same connection over and over if an existing connection
283279
// is passed in the constructor of PostgreSqlStorage. We must use a separate dedicated
284280
// connection to listen for notifications.
285-
NpgsqlConnection clonedConnection = connection.CloneWith(connection.ConnectionString);
281+
string connectionString = connection.ConnectionString;
282+
NpgsqlConnection clonedConnection = connection.CloneWith(connectionString);
286283

287284
return Task.Run(async () => {
285+
NpgsqlConnection currentConnection = clonedConnection;
288286
try
289287
{
290-
if (clonedConnection.State != ConnectionState.Open)
291-
{
292-
await clonedConnection.OpenAsync(cancellationToken); // Open so that Dapper doesn't auto-close.
293-
}
294-
295288
while (!cancellationToken.IsCancellationRequested)
296289
{
297-
await clonedConnection.ExecuteAsync($"LISTEN {JobNotificationChannel}");
298-
await clonedConnection.WaitAsync(cancellationToken);
299-
JobQueueNotification.Set();
290+
try
291+
{
292+
if (currentConnection.State != ConnectionState.Open)
293+
{
294+
await currentConnection.OpenAsync(cancellationToken);
295+
}
296+
297+
await currentConnection.ExecuteAsync($"LISTEN {JobNotificationChannel}");
298+
await currentConnection.WaitAsync(cancellationToken);
299+
JobQueueNotification.Set();
300+
}
301+
catch (OperationCanceledException)
302+
{
303+
throw;
304+
}
305+
catch (Exception ex) when (ex.IsCatchableExceptionType())
306+
{
307+
currentConnection?.Dispose();
308+
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
309+
currentConnection = new NpgsqlConnection(connectionString);
310+
}
300311
}
301312
}
302-
catch (TaskCanceledException)
313+
catch (OperationCanceledException)
303314
{
304315
// Do nothing, cancellation requested so just end.
305316
}
306317
finally
307318
{
308-
_storage.ReleaseConnection(clonedConnection);
319+
currentConnection?.Dispose();
309320
}
310321

311322
}, cancellationToken);

tests/Hangfire.PostgreSql.Tests/PostgreSqlJobQueueFacts.cs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using Hangfire.PostgreSql.Tests.Utils;
99
using Hangfire.PostgreSql.Utils;
1010
using Hangfire.Storage;
11+
using Npgsql;
1112
using Xunit;
1213

1314
namespace Hangfire.PostgreSql.Tests
@@ -538,6 +539,54 @@ private void Enqueue_AddsAJobToTheQueue(bool useNativeDatabaseTransactions)
538539
});
539540
}
540541

542+
[Fact]
543+
[CleanDatabase]
544+
public void Dequeue_ShouldSelfHeal_WhenListenConnectionFails()
545+
{
546+
UseConnection((_, storage) => {
547+
storage.Options.QueuePollInterval = TimeSpan.FromMilliseconds(500);
548+
PostgreSqlJobQueue queue = CreateJobQueue(storage, false, true);
549+
Exception thrownException = null;
550+
IFetchedJob job = null;
551+
552+
CancellationTokenSource cts = new(TimeSpan.FromSeconds(10));
553+
554+
Task dequeueTask = Task.Run(() => {
555+
try
556+
{
557+
job = queue.Dequeue(new[] { "default" }, cts.Token);
558+
}
559+
catch (Exception ex) when (ex is not OperationCanceledException)
560+
{
561+
thrownException = ex;
562+
}
563+
});
564+
565+
Thread.Sleep(1000);
566+
567+
using (NpgsqlConnection adminConnection = ConnectionUtils.CreateMasterConnection())
568+
{
569+
adminConnection.Execute(@"
570+
SELECT pg_terminate_backend(pid)
571+
FROM pg_stat_activity
572+
WHERE query LIKE '%LISTEN%'
573+
AND pid <> pg_backend_pid()");
574+
}
575+
576+
Thread.Sleep(500);
577+
578+
using (NpgsqlConnection enqueueConnection = ConnectionUtils.CreateConnection())
579+
{
580+
queue.Enqueue(enqueueConnection, "default", "1");
581+
}
582+
583+
dequeueTask.Wait(TimeSpan.FromSeconds(5));
584+
585+
Assert.Null(thrownException);
586+
Assert.NotNull(job);
587+
});
588+
}
589+
541590
private static CancellationToken CreateTimingOutCancellationToken()
542591
{
543592
CancellationTokenSource source = new CancellationTokenSource(TimeSpan.FromSeconds(10));

0 commit comments

Comments
 (0)