diff --git a/src/Hosting/Queue/src/BaseQueueHostedService.cs b/src/Hosting/Queue/src/BaseQueueHostedService.cs index ef8ce5d..971edde 100644 --- a/src/Hosting/Queue/src/BaseQueueHostedService.cs +++ b/src/Hosting/Queue/src/BaseQueueHostedService.cs @@ -6,7 +6,7 @@ namespace ClickView.GoodStuff.Hosting.Queue; using Queues.RabbitMq; /// -/// A base class for all queue hosted services to inherit from +/// A base class for all queue-hosted services to inherit from /// /// public abstract class BaseQueueHostedService : IHostedService, IAsyncDisposable @@ -32,16 +32,16 @@ public abstract class BaseQueueHostedService : IHostedService, IAsyncD /// Initialises a new instance of . /// /// + /// /// - protected BaseQueueHostedService(IOptions options, ILoggerFactory loggerFactory) + protected BaseQueueHostedService(IOptions options, IQueueClient queueClient, ILoggerFactory loggerFactory) { var type = GetType(); _name = type.Name; + _queueClient = queueClient; Options = options.Value; Logger = loggerFactory.CreateLogger(type); - - _queueClient = new RabbitMqClient(CreateOptions(Options, loggerFactory)); } /// @@ -126,13 +126,10 @@ protected virtual async ValueTask DisposeAsyncCore() _disposed = true; - // If we still have an open subscription, Dispose it + // If we still have an open subscription, dispose it if (_subscriptionContext != null) await _subscriptionContext.DisposeAsync(); - // Then finally dispose the client - await _queueClient.DisposeAsync(); - // Clean up everything else _subscriptionLock.Dispose(); } @@ -176,37 +173,8 @@ protected ValueTask AcknowledgeAsync(ulong deliveryTag, bool multiple = false, C return ValueTask.CompletedTask; } - private static RabbitMqClientOptions CreateOptions(BaseQueueHostedServiceOptions options, ILoggerFactory loggerFactory) - { - var host = options.Host; - - if (string.IsNullOrWhiteSpace(host)) - throw new ArgumentException($"{nameof(QueueHostedServiceOptions.Host)} is required"); - - var o = new RabbitMqClientOptions - { - Host = host, - Port = options.Port, - Username = options.Username, - Password = options.Password, - ConnectionTimeout = options.ConnectionTimeout, - EnableSsl = options.EnableSsl, - IgnoreSslErrors = options.IgnoreSslErrors, - LoggerFactory = loggerFactory - }; - - if (options.SslVersion != null) - o.SslVersion = options.SslVersion.Value; - - if (options.Serializer != null) - o.Serializer = options.Serializer; - - return o; - } - private void CheckDisposed() { - if (_disposed) - throw new ObjectDisposedException(GetType().Name); + ObjectDisposedException.ThrowIf(_disposed, this); } } diff --git a/src/Hosting/Queue/src/BaseQueueHostedServiceOptions.cs b/src/Hosting/Queue/src/BaseQueueHostedServiceOptions.cs index 0da7694..2a05c86 100644 --- a/src/Hosting/Queue/src/BaseQueueHostedServiceOptions.cs +++ b/src/Hosting/Queue/src/BaseQueueHostedServiceOptions.cs @@ -1,10 +1,7 @@ namespace ClickView.GoodStuff.Hosting.Queue; -using System.Security.Authentication; -using Queues.RabbitMq.Serialization; - /// -/// Base queue options which is shared amongst the various hosted queue workers +/// Base queue options which are shared amongst the various hosted queue workers /// public abstract class BaseQueueHostedServiceOptions { @@ -12,49 +9,4 @@ public abstract class BaseQueueHostedServiceOptions /// The name of the queue to subscribe to. /// public string? QueueName { get; set; } - - /// - /// The hostname or network address of the queue server in which to connect. - /// - public string? Host { get; set; } - - /// - /// The port that the queue server is listening to. - /// - public ushort Port { get; set; } = 5672; - - /// - /// The username of the user for the queue. - /// - public string? Username { get; set; } - - /// - /// The password of the user for the queue. - /// - public string? Password { get; set; } - - /// - /// Timeout for connection attempts - /// - public TimeSpan? ConnectionTimeout { get; set; } - - /// - /// Set to true to enable SSL - /// - public bool EnableSsl { get; set; } - - /// - /// Set to false to ignore SSL errors - /// - public bool IgnoreSslErrors { get; set; } - - /// - /// The TLS protocol version. Set to None to let the OS decide - /// - public SslProtocols? SslVersion { get; set; } - - /// - /// The serializer to use for deserializing messages from the Queue - /// - public IMessageSerializer? Serializer { get; set; } } diff --git a/src/Hosting/Queue/src/BatchQueueHostedService.cs b/src/Hosting/Queue/src/BatchQueueHostedService.cs index 1f47b3c..60a6c85 100644 --- a/src/Hosting/Queue/src/BatchQueueHostedService.cs +++ b/src/Hosting/Queue/src/BatchQueueHostedService.cs @@ -29,8 +29,10 @@ public abstract class BatchQueueHostedService : BaseQueueHos /// Initialises a new instance of . /// /// + /// /// - protected BatchQueueHostedService(IOptions options, ILoggerFactory loggerFactory) : base(options, loggerFactory) + protected BatchQueueHostedService(IOptions options, IQueueClient queueClient, + ILoggerFactory loggerFactory) : base(options, queueClient, loggerFactory) { _currentBuffer = new List(Options.BatchSize); } diff --git a/src/Hosting/Queue/src/QueueHostedService.cs b/src/Hosting/Queue/src/QueueHostedService.cs index 92fc817..3786db5 100644 --- a/src/Hosting/Queue/src/QueueHostedService.cs +++ b/src/Hosting/Queue/src/QueueHostedService.cs @@ -17,8 +17,10 @@ public abstract class QueueHostedService : BaseQueueHostedSe /// Initialises a new instance of . /// /// + /// /// - protected QueueHostedService(IOptions options, ILoggerFactory loggerFactory) : base(options, loggerFactory) + protected QueueHostedService(IOptions options, IQueueClient queueClient, ILoggerFactory loggerFactory) : + base(options, queueClient, loggerFactory) { } diff --git a/src/Hosting/Queue/src/ServiceCollectionExtensions.cs b/src/Hosting/Queue/src/ServiceCollectionExtensions.cs index 25c91d2..389fbad 100644 --- a/src/Hosting/Queue/src/ServiceCollectionExtensions.cs +++ b/src/Hosting/Queue/src/ServiceCollectionExtensions.cs @@ -8,62 +8,61 @@ /// public static class ServiceCollectionExtensions { - /// - /// Add a registration for the given type. - /// /// - /// - /// - /// - /// - public static void AddQueueHostedService(this IServiceCollection services, - IConfiguration config) - where THostedService : QueueHostedService - where TMessage : class, new() - where TOptions : QueueHostedServiceOptions + extension(IServiceCollection services) { - ArgumentNullException.ThrowIfNull(services); - ArgumentNullException.ThrowIfNull(config); + /// + /// Add a registration for the given type. + /// + /// + /// + /// + /// + public void AddQueueHostedService(IConfiguration config) + where THostedService : QueueHostedService + where TMessage : class, new() + where TOptions : QueueHostedServiceOptions + { + ArgumentNullException.ThrowIfNull(services); + ArgumentNullException.ThrowIfNull(config); - services.Configure(config); - services.AddQueueHostedService(); - } + services.Configure(config); + services.AddQueueHostedService(); + } - /// - /// Add a registration for the given type. - /// - /// - /// - /// - /// - /// - public static void AddQueueHostedService(this IServiceCollection services, - Action configure) - where THostedService : QueueHostedService - where TMessage : class, new() - where TOptions : QueueHostedServiceOptions - { - ArgumentNullException.ThrowIfNull(services); - ArgumentNullException.ThrowIfNull(configure); + /// + /// Add a registration for the given type. + /// + /// + /// + /// + /// + public void AddQueueHostedService(Action configure) + where THostedService : QueueHostedService + where TMessage : class, new() + where TOptions : QueueHostedServiceOptions + { + ArgumentNullException.ThrowIfNull(services); + ArgumentNullException.ThrowIfNull(configure); - services.Configure(configure); - services.AddQueueHostedService(); - } + services.Configure(configure); + services.AddQueueHostedService(); + } - /// - /// Add a registration for the given type. - /// - /// - /// - /// - /// - public static void AddQueueHostedService(this IServiceCollection services) - where THostedService : QueueHostedService - where TMessage : class, new() - where TOptions : QueueHostedServiceOptions - { - ArgumentNullException.ThrowIfNull(services); + /// + /// Add a registration for the given type. + /// + /// + /// + /// + public void AddQueueHostedService() + where THostedService : QueueHostedService + where TMessage : class, new() + where TOptions : QueueHostedServiceOptions + { + ArgumentNullException.ThrowIfNull(services); - services.AddHostedService(); + services.AddHostedService(); + } } }