From c5a50af53852a62c89c0a8659f3a59aaa493650c Mon Sep 17 00:00:00 2001 From: Wei Date: Wed, 16 Jul 2025 15:46:16 +1000 Subject: [PATCH 1/7] Pass on consumerDispatchConcurrency to CreateChannelOptions object --- src/Queues/RabbitMq/src/RabbitMqClient.cs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/Queues/RabbitMq/src/RabbitMqClient.cs b/src/Queues/RabbitMq/src/RabbitMqClient.cs index 881c7a6e..a7dd42c6 100644 --- a/src/Queues/RabbitMq/src/RabbitMqClient.cs +++ b/src/Queues/RabbitMq/src/RabbitMqClient.cs @@ -46,7 +46,8 @@ public async Task EnqueueAsync(string exchange, TData data, EnqueueOption var message = MessageWrapper.New(data); var bytes = _options.Serializer.Serialize(message); - await using var channel = await GetChannelAsync(options.EnablePublisherConfirms, cancellationToken); + await using var channel = await GetChannelAsync(options.EnablePublisherConfirms, + cancellationToken : cancellationToken); var properties = new BasicProperties { @@ -79,7 +80,7 @@ public async Task SubscribeAsync(string queue, // We don't want to dispose the channel here (unless an exception is thrown, see below). // The returned SubscriptionContext is the object that should be disposed (which disposes the channel) - var channel = await GetChannelAsync(false, cancellationToken); + var channel = await GetChannelAsync(false, options.PrefetchCount, cancellationToken); try { @@ -190,13 +191,16 @@ private async ValueTask ConnectSlowAsync(CancellationToken cancella } } - private async Task GetChannelAsync(bool enablePublisherConfirms, CancellationToken cancellationToken) + private async Task GetChannelAsync(bool enablePublisherConfirms, + ushort consumerDispatchConcurrency = 1, + CancellationToken cancellationToken = default) { var connection = await GetConnectionAsync(cancellationToken); var options = new CreateChannelOptions( publisherConfirmationsEnabled: enablePublisherConfirms, - publisherConfirmationTrackingEnabled: enablePublisherConfirms + publisherConfirmationTrackingEnabled: enablePublisherConfirms, + consumerDispatchConcurrency : consumerDispatchConcurrency ); return await connection.CreateChannelAsync(options, cancellationToken); From f22c86f0fe1e50fe8436267779a374ee7821f563 Mon Sep 17 00:00:00 2001 From: Wei Date: Wed, 16 Jul 2025 15:48:57 +1000 Subject: [PATCH 2/7] Pass on ConcurrentTaskCount to RabbitMqClientOptions.ConsumerDispatchConcurrency --- src/Hosting/Queue/src/BaseQueueHostedService.cs | 13 +++++++++---- src/Hosting/Queue/src/QueueHostedService.cs | 3 ++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/Hosting/Queue/src/BaseQueueHostedService.cs b/src/Hosting/Queue/src/BaseQueueHostedService.cs index ef8ce5dd..345ef84d 100644 --- a/src/Hosting/Queue/src/BaseQueueHostedService.cs +++ b/src/Hosting/Queue/src/BaseQueueHostedService.cs @@ -32,8 +32,11 @@ public abstract class BaseQueueHostedService : IHostedService, IAsyncD /// Initialises a new instance of . /// /// + /// /// - protected BaseQueueHostedService(IOptions options, ILoggerFactory loggerFactory) + protected BaseQueueHostedService(IOptions options, + ILoggerFactory loggerFactory, + ushort consumerDispatchConcurrency = 1) { var type = GetType(); @@ -41,7 +44,7 @@ protected BaseQueueHostedService(IOptions options, ILoggerFactory logg Options = options.Value; Logger = loggerFactory.CreateLogger(type); - _queueClient = new RabbitMqClient(CreateOptions(Options, loggerFactory)); + _queueClient = new RabbitMqClient(CreateOptions(Options, consumerDispatchConcurrency, loggerFactory)); } /// @@ -176,7 +179,8 @@ protected ValueTask AcknowledgeAsync(ulong deliveryTag, bool multiple = false, C return ValueTask.CompletedTask; } - private static RabbitMqClientOptions CreateOptions(BaseQueueHostedServiceOptions options, ILoggerFactory loggerFactory) + private static RabbitMqClientOptions CreateOptions(BaseQueueHostedServiceOptions options, + ushort consumerDispatchConcurrency, ILoggerFactory loggerFactory) { var host = options.Host; @@ -192,7 +196,8 @@ private static RabbitMqClientOptions CreateOptions(BaseQueueHostedServiceOptions ConnectionTimeout = options.ConnectionTimeout, EnableSsl = options.EnableSsl, IgnoreSslErrors = options.IgnoreSslErrors, - LoggerFactory = loggerFactory + LoggerFactory = loggerFactory, + ConsumerDispatchConcurrency = consumerDispatchConcurrency }; if (options.SslVersion != null) diff --git a/src/Hosting/Queue/src/QueueHostedService.cs b/src/Hosting/Queue/src/QueueHostedService.cs index e0ecf5be..ba05c56c 100644 --- a/src/Hosting/Queue/src/QueueHostedService.cs +++ b/src/Hosting/Queue/src/QueueHostedService.cs @@ -18,7 +18,8 @@ public abstract class QueueHostedService : BaseQueueHostedSe /// /// /// - protected QueueHostedService(IOptions options, ILoggerFactory loggerFactory) : base(options, loggerFactory) + protected QueueHostedService(IOptions options, ILoggerFactory loggerFactory) + : base(options, loggerFactory, options.Value.ConcurrentTaskCount) { } From 19db6e822ef40d3b7532effd351192b92d3c4364 Mon Sep 17 00:00:00 2001 From: Wei Date: Wed, 16 Jul 2025 15:51:11 +1000 Subject: [PATCH 3/7] Consolidate some nugets version conflicts --- .../IdGen/src/ClickView.GoodStuff.IdGenerators.IdGen.csproj | 2 +- ...w.GoodStuff.IdGenerators.Serialization.SystemTextJson.csproj | 2 +- .../MySql/src/ClickView.GoodStuff.Repositories.MySql.csproj | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/IdGenerators/IdGen/src/ClickView.GoodStuff.IdGenerators.IdGen.csproj b/src/IdGenerators/IdGen/src/ClickView.GoodStuff.IdGenerators.IdGen.csproj index e93d87ea..c3789d68 100644 --- a/src/IdGenerators/IdGen/src/ClickView.GoodStuff.IdGenerators.IdGen.csproj +++ b/src/IdGenerators/IdGen/src/ClickView.GoodStuff.IdGenerators.IdGen.csproj @@ -11,7 +11,7 @@ - + diff --git a/src/IdGenerators/Serialization/SystemTextJson/src/ClickView.GoodStuff.IdGenerators.Serialization.SystemTextJson.csproj b/src/IdGenerators/Serialization/SystemTextJson/src/ClickView.GoodStuff.IdGenerators.Serialization.SystemTextJson.csproj index f486eafa..0edb6546 100644 --- a/src/IdGenerators/Serialization/SystemTextJson/src/ClickView.GoodStuff.IdGenerators.Serialization.SystemTextJson.csproj +++ b/src/IdGenerators/Serialization/SystemTextJson/src/ClickView.GoodStuff.IdGenerators.Serialization.SystemTextJson.csproj @@ -6,7 +6,7 @@ - + diff --git a/src/Repositories/MySql/src/ClickView.GoodStuff.Repositories.MySql.csproj b/src/Repositories/MySql/src/ClickView.GoodStuff.Repositories.MySql.csproj index c8627bdb..d75dfcb6 100644 --- a/src/Repositories/MySql/src/ClickView.GoodStuff.Repositories.MySql.csproj +++ b/src/Repositories/MySql/src/ClickView.GoodStuff.Repositories.MySql.csproj @@ -7,7 +7,7 @@ - + From f6d803560ad0b660aedc70ddb702cefa8af50a42 Mon Sep 17 00:00:00 2001 From: Wei Date: Thu, 17 Jul 2025 10:16:07 +1000 Subject: [PATCH 4/7] Move ConcurrentTaskCount to base class --- src/Hosting/Queue/src/BaseQueueHostedService.cs | 11 ++++------- .../Queue/src/BaseQueueHostedServiceOptions.cs | 5 +++++ src/Hosting/Queue/src/QueueHostedService.cs | 3 +-- src/Hosting/Queue/src/QueueHostedServiceOptions.cs | 4 ---- src/Queues/RabbitMq/src/RabbitMqClient.cs | 5 ++--- 5 files changed, 12 insertions(+), 16 deletions(-) diff --git a/src/Hosting/Queue/src/BaseQueueHostedService.cs b/src/Hosting/Queue/src/BaseQueueHostedService.cs index 345ef84d..7e06ff89 100644 --- a/src/Hosting/Queue/src/BaseQueueHostedService.cs +++ b/src/Hosting/Queue/src/BaseQueueHostedService.cs @@ -34,9 +34,7 @@ public abstract class BaseQueueHostedService : IHostedService, IAsyncD /// /// /// - protected BaseQueueHostedService(IOptions options, - ILoggerFactory loggerFactory, - ushort consumerDispatchConcurrency = 1) + protected BaseQueueHostedService(IOptions options, ILoggerFactory loggerFactory) { var type = GetType(); @@ -44,7 +42,7 @@ protected BaseQueueHostedService(IOptions options, Options = options.Value; Logger = loggerFactory.CreateLogger(type); - _queueClient = new RabbitMqClient(CreateOptions(Options, consumerDispatchConcurrency, loggerFactory)); + _queueClient = new RabbitMqClient(CreateOptions(Options, loggerFactory)); } /// @@ -179,8 +177,7 @@ protected ValueTask AcknowledgeAsync(ulong deliveryTag, bool multiple = false, C return ValueTask.CompletedTask; } - private static RabbitMqClientOptions CreateOptions(BaseQueueHostedServiceOptions options, - ushort consumerDispatchConcurrency, ILoggerFactory loggerFactory) + private static RabbitMqClientOptions CreateOptions(BaseQueueHostedServiceOptions options, ILoggerFactory loggerFactory) { var host = options.Host; @@ -197,7 +194,7 @@ private static RabbitMqClientOptions CreateOptions(BaseQueueHostedServiceOptions EnableSsl = options.EnableSsl, IgnoreSslErrors = options.IgnoreSslErrors, LoggerFactory = loggerFactory, - ConsumerDispatchConcurrency = consumerDispatchConcurrency + ConsumerDispatchConcurrency = options.ConcurrentTaskCount }; if (options.SslVersion != null) diff --git a/src/Hosting/Queue/src/BaseQueueHostedServiceOptions.cs b/src/Hosting/Queue/src/BaseQueueHostedServiceOptions.cs index 0da76947..e0928fc8 100644 --- a/src/Hosting/Queue/src/BaseQueueHostedServiceOptions.cs +++ b/src/Hosting/Queue/src/BaseQueueHostedServiceOptions.cs @@ -57,4 +57,9 @@ public abstract class BaseQueueHostedServiceOptions /// The serializer to use for deserializing messages from the Queue /// public IMessageSerializer? Serializer { get; set; } + + /// + /// The number of tasks to run concurrently. + /// + public ushort ConcurrentTaskCount { get; set; } = 1; } diff --git a/src/Hosting/Queue/src/QueueHostedService.cs b/src/Hosting/Queue/src/QueueHostedService.cs index ba05c56c..e0ecf5be 100644 --- a/src/Hosting/Queue/src/QueueHostedService.cs +++ b/src/Hosting/Queue/src/QueueHostedService.cs @@ -18,8 +18,7 @@ public abstract class QueueHostedService : BaseQueueHostedSe /// /// /// - protected QueueHostedService(IOptions options, ILoggerFactory loggerFactory) - : base(options, loggerFactory, options.Value.ConcurrentTaskCount) + protected QueueHostedService(IOptions options, ILoggerFactory loggerFactory) : base(options, loggerFactory) { } diff --git a/src/Hosting/Queue/src/QueueHostedServiceOptions.cs b/src/Hosting/Queue/src/QueueHostedServiceOptions.cs index 8e05ac10..63d5743c 100644 --- a/src/Hosting/Queue/src/QueueHostedServiceOptions.cs +++ b/src/Hosting/Queue/src/QueueHostedServiceOptions.cs @@ -5,8 +5,4 @@ namespace ClickView.GoodStuff.Hosting.Queue; /// public abstract class QueueHostedServiceOptions : BaseQueueHostedServiceOptions { - /// - /// The number of tasks to run concurrently. - /// - public ushort ConcurrentTaskCount { get; set; } = 1; } diff --git a/src/Queues/RabbitMq/src/RabbitMqClient.cs b/src/Queues/RabbitMq/src/RabbitMqClient.cs index a7dd42c6..575e9517 100644 --- a/src/Queues/RabbitMq/src/RabbitMqClient.cs +++ b/src/Queues/RabbitMq/src/RabbitMqClient.cs @@ -46,8 +46,7 @@ public async Task EnqueueAsync(string exchange, TData data, EnqueueOption var message = MessageWrapper.New(data); var bytes = _options.Serializer.Serialize(message); - await using var channel = await GetChannelAsync(options.EnablePublisherConfirms, - cancellationToken : cancellationToken); + await using var channel = await GetChannelAsync(options.EnablePublisherConfirms, cancellationToken); var properties = new BasicProperties { @@ -219,7 +218,7 @@ private static ConnectionFactory CreateConnectionFactory(RabbitMqClientOptions o HostName = options.Host, Port = options.Port, AutomaticRecoveryEnabled = true, - ConsumerDispatchConcurrency = options.ConsumerDispatchConcurrency + ConsumerDispatchConcurrency = 1 //options.ConsumerDispatchConcurrency }; // Username From 3fceb79bf50162ba5b04013f86eae86c04042296 Mon Sep 17 00:00:00 2001 From: Wei Date: Thu, 17 Jul 2025 10:17:20 +1000 Subject: [PATCH 5/7] Fix compile --- src/Queues/RabbitMq/src/RabbitMqClient.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Queues/RabbitMq/src/RabbitMqClient.cs b/src/Queues/RabbitMq/src/RabbitMqClient.cs index 575e9517..1ef5dca0 100644 --- a/src/Queues/RabbitMq/src/RabbitMqClient.cs +++ b/src/Queues/RabbitMq/src/RabbitMqClient.cs @@ -46,7 +46,9 @@ public async Task EnqueueAsync(string exchange, TData data, EnqueueOption var message = MessageWrapper.New(data); var bytes = _options.Serializer.Serialize(message); - await using var channel = await GetChannelAsync(options.EnablePublisherConfirms, cancellationToken); + await using var channel = await GetChannelAsync( + options.EnablePublisherConfirms, + cancellationToken : cancellationToken); var properties = new BasicProperties { From 40a816611f49eb47df351cddda3c32dedb6651f4 Mon Sep 17 00:00:00 2001 From: Wei Date: Thu, 17 Jul 2025 10:19:32 +1000 Subject: [PATCH 6/7] Revert debug change --- src/Queues/RabbitMq/src/RabbitMqClient.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Queues/RabbitMq/src/RabbitMqClient.cs b/src/Queues/RabbitMq/src/RabbitMqClient.cs index 1ef5dca0..5f87a658 100644 --- a/src/Queues/RabbitMq/src/RabbitMqClient.cs +++ b/src/Queues/RabbitMq/src/RabbitMqClient.cs @@ -220,7 +220,7 @@ private static ConnectionFactory CreateConnectionFactory(RabbitMqClientOptions o HostName = options.Host, Port = options.Port, AutomaticRecoveryEnabled = true, - ConsumerDispatchConcurrency = 1 //options.ConsumerDispatchConcurrency + ConsumerDispatchConcurrency = options.ConsumerDispatchConcurrency }; // Username From 4f56dac3312e2ceb54a206844dfc12aee8d92a83 Mon Sep 17 00:00:00 2001 From: Wei Date: Thu, 17 Jul 2025 10:20:42 +1000 Subject: [PATCH 7/7] Remove XML comment --- src/Hosting/Queue/src/BaseQueueHostedService.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Hosting/Queue/src/BaseQueueHostedService.cs b/src/Hosting/Queue/src/BaseQueueHostedService.cs index 7e06ff89..507d80f0 100644 --- a/src/Hosting/Queue/src/BaseQueueHostedService.cs +++ b/src/Hosting/Queue/src/BaseQueueHostedService.cs @@ -32,7 +32,6 @@ public abstract class BaseQueueHostedService : IHostedService, IAsyncD /// Initialises a new instance of . /// /// - /// /// protected BaseQueueHostedService(IOptions options, ILoggerFactory loggerFactory) {