Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 6 additions & 38 deletions src/Hosting/Queue/src/BaseQueueHostedService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace ClickView.GoodStuff.Hosting.Queue;
using Queues.RabbitMq;

/// <summary>
/// A base class for all queue hosted services to inherit from
/// A base class for all queue-hosted services to inherit from
/// </summary>
/// <typeparam name="TOptions"></typeparam>
public abstract class BaseQueueHostedService<TOptions> : IHostedService, IAsyncDisposable
Expand All @@ -32,16 +32,16 @@ public abstract class BaseQueueHostedService<TOptions> : IHostedService, IAsyncD
/// Initialises a new instance of <see cref="BaseQueueHostedService{TOptions}"/>.
/// </summary>
/// <param name="options"></param>
/// <param name="queueClient"></param>
/// <param name="loggerFactory"></param>
protected BaseQueueHostedService(IOptions<TOptions> options, ILoggerFactory loggerFactory)
protected BaseQueueHostedService(IOptions<TOptions> options, IQueueClient queueClient, ILoggerFactory loggerFactory)
{
var type = GetType();

_name = type.Name;
_queueClient = queueClient;
Options = options.Value;
Logger = loggerFactory.CreateLogger(type);

_queueClient = new RabbitMqClient(CreateOptions(Options, loggerFactory));
}

/// <summary>
Expand Down Expand Up @@ -126,13 +126,10 @@ protected virtual async ValueTask DisposeAsyncCore()

_disposed = true;

// If we still have an open subscription, Dispose it
// If we still have an open subscription, dispose it
if (_subscriptionContext != null)
await _subscriptionContext.DisposeAsync();

// Then finally dispose the client
await _queueClient.DisposeAsync();

// Clean up everything else
_subscriptionLock.Dispose();
}
Expand Down Expand Up @@ -176,37 +173,8 @@ protected ValueTask AcknowledgeAsync(ulong deliveryTag, bool multiple = false, C
return ValueTask.CompletedTask;
}

private static RabbitMqClientOptions CreateOptions(BaseQueueHostedServiceOptions options, ILoggerFactory loggerFactory)
{
var host = options.Host;

if (string.IsNullOrWhiteSpace(host))
throw new ArgumentException($"{nameof(QueueHostedServiceOptions.Host)} is required");

var o = new RabbitMqClientOptions
{
Host = host,
Port = options.Port,
Username = options.Username,
Password = options.Password,
ConnectionTimeout = options.ConnectionTimeout,
EnableSsl = options.EnableSsl,
IgnoreSslErrors = options.IgnoreSslErrors,
LoggerFactory = loggerFactory
};

if (options.SslVersion != null)
o.SslVersion = options.SslVersion.Value;

if (options.Serializer != null)
o.Serializer = options.Serializer;

return o;
}

private void CheckDisposed()
{
if (_disposed)
throw new ObjectDisposedException(GetType().Name);
ObjectDisposedException.ThrowIf(_disposed, this);
}
}
50 changes: 1 addition & 49 deletions src/Hosting/Queue/src/BaseQueueHostedServiceOptions.cs
Original file line number Diff line number Diff line change
@@ -1,60 +1,12 @@
namespace ClickView.GoodStuff.Hosting.Queue;

using System.Security.Authentication;
using Queues.RabbitMq.Serialization;

/// <summary>
/// Base queue options which is shared amongst the various hosted queue workers
/// Base queue options which are shared amongst the various hosted queue workers
/// </summary>
public abstract class BaseQueueHostedServiceOptions
{
/// <summary>
/// The name of the queue to subscribe to.
/// </summary>
public string? QueueName { get; set; }

/// <summary>
/// The hostname or network address of the queue server in which to connect.
/// </summary>
public string? Host { get; set; }

/// <summary>
/// The port that the queue server is listening to.
/// </summary>
public ushort Port { get; set; } = 5672;

/// <summary>
/// The username of the user for the queue.
/// </summary>
public string? Username { get; set; }

/// <summary>
/// The password of the user for the queue.
/// </summary>
public string? Password { get; set; }

/// <summary>
/// Timeout for connection attempts
/// </summary>
public TimeSpan? ConnectionTimeout { get; set; }

/// <summary>
/// Set to true to enable SSL
/// </summary>
public bool EnableSsl { get; set; }

/// <summary>
/// Set to false to ignore SSL errors
/// </summary>
public bool IgnoreSslErrors { get; set; }

/// <summary>
/// The TLS protocol version. Set to None to let the OS decide
/// </summary>
public SslProtocols? SslVersion { get; set; }

/// <summary>
/// The serializer to use for deserializing messages from the Queue
/// </summary>
public IMessageSerializer? Serializer { get; set; }
}
4 changes: 3 additions & 1 deletion src/Hosting/Queue/src/BatchQueueHostedService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ public abstract class BatchQueueHostedService<TMessage, TOptions> : BaseQueueHos
/// Initialises a new instance of <see cref="QueueHostedService{TMessage,TOptions}"/>.
/// </summary>
/// <param name="options"></param>
/// <param name="queueClient"></param>
/// <param name="loggerFactory"></param>
protected BatchQueueHostedService(IOptions<TOptions> options, ILoggerFactory loggerFactory) : base(options, loggerFactory)
protected BatchQueueHostedService(IOptions<TOptions> options, IQueueClient queueClient,
ILoggerFactory loggerFactory) : base(options, queueClient, loggerFactory)
{
_currentBuffer = new List<TMessage>(Options.BatchSize);
}
Expand Down
4 changes: 3 additions & 1 deletion src/Hosting/Queue/src/QueueHostedService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ public abstract class QueueHostedService<TMessage, TOptions> : BaseQueueHostedSe
/// Initialises a new instance of <see cref="QueueHostedService{TMessage,TOptions}"/>.
/// </summary>
/// <param name="options"></param>
/// <param name="queueClient"></param>
/// <param name="loggerFactory"></param>
protected QueueHostedService(IOptions<TOptions> options, ILoggerFactory loggerFactory) : base(options, loggerFactory)
protected QueueHostedService(IOptions<TOptions> options, IQueueClient queueClient, ILoggerFactory loggerFactory) :
base(options, queueClient, loggerFactory)
{
}

Expand Down
99 changes: 49 additions & 50 deletions src/Hosting/Queue/src/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,62 +8,61 @@
/// </summary>
public static class ServiceCollectionExtensions
{
/// <summary>
/// Add a <see cref="QueueHostedService{TMessage,TOptions}"/> registration for the given type.
/// </summary>
/// <param name="services"></param>
/// <param name="config"></param>
/// <typeparam name="THostedService"></typeparam>
/// <typeparam name="TMessage"></typeparam>
/// <typeparam name="TOptions"></typeparam>
public static void AddQueueHostedService<THostedService, TMessage, TOptions>(this IServiceCollection services,
IConfiguration config)
where THostedService : QueueHostedService<TMessage, TOptions>
where TMessage : class, new()
where TOptions : QueueHostedServiceOptions
extension(IServiceCollection services)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(config);
/// <summary>
/// Add a <see cref="QueueHostedService{TMessage,TOptions}"/> registration for the given type.
/// </summary>
/// <param name="config"></param>
/// <typeparam name="THostedService"></typeparam>
/// <typeparam name="TMessage"></typeparam>
/// <typeparam name="TOptions"></typeparam>
public void AddQueueHostedService<THostedService, TMessage, TOptions>(IConfiguration config)
where THostedService : QueueHostedService<TMessage, TOptions>
where TMessage : class, new()
where TOptions : QueueHostedServiceOptions
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(config);

services.Configure<TOptions>(config);
services.AddQueueHostedService<THostedService, TMessage, TOptions>();
}
services.Configure<TOptions>(config);
services.AddQueueHostedService<THostedService, TMessage, TOptions>();
}

/// <summary>
/// Add a <see cref="QueueHostedService{TMessage,TOptions}"/> registration for the given type.
/// </summary>
/// <param name="services"></param>
/// <param name="configure"></param>
/// <typeparam name="THostedService"></typeparam>
/// <typeparam name="TMessage"></typeparam>
/// <typeparam name="TOptions"></typeparam>
public static void AddQueueHostedService<THostedService, TMessage, TOptions>(this IServiceCollection services,
Action<TOptions> configure)
where THostedService : QueueHostedService<TMessage, TOptions>
where TMessage : class, new()
where TOptions : QueueHostedServiceOptions
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(configure);
/// <summary>
/// Add a <see cref="QueueHostedService{TMessage,TOptions}"/> registration for the given type.
/// </summary>
/// <param name="configure"></param>
/// <typeparam name="THostedService"></typeparam>
/// <typeparam name="TMessage"></typeparam>
/// <typeparam name="TOptions"></typeparam>
public void AddQueueHostedService<THostedService, TMessage, TOptions>(Action<TOptions> configure)
where THostedService : QueueHostedService<TMessage, TOptions>
where TMessage : class, new()
where TOptions : QueueHostedServiceOptions
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(configure);

services.Configure(configure);
services.AddQueueHostedService<THostedService, TMessage, TOptions>();
}
services.Configure(configure);
services.AddQueueHostedService<THostedService, TMessage, TOptions>();
}

/// <summary>
/// Add a <see cref="QueueHostedService{TMessage,TOptions}"/> registration for the given type.
/// </summary>
/// <param name="services"></param>
/// <typeparam name="THostedService"></typeparam>
/// <typeparam name="TMessage"></typeparam>
/// <typeparam name="TOptions"></typeparam>
public static void AddQueueHostedService<THostedService, TMessage, TOptions>(this IServiceCollection services)
where THostedService : QueueHostedService<TMessage, TOptions>
where TMessage : class, new()
where TOptions : QueueHostedServiceOptions
{
ArgumentNullException.ThrowIfNull(services);
/// <summary>
/// Add a <see cref="QueueHostedService{TMessage,TOptions}"/> registration for the given type.
/// </summary>
/// <typeparam name="THostedService"></typeparam>
/// <typeparam name="TMessage"></typeparam>
/// <typeparam name="TOptions"></typeparam>
public void AddQueueHostedService<THostedService, TMessage, TOptions>()
where THostedService : QueueHostedService<TMessage, TOptions>
where TMessage : class, new()
where TOptions : QueueHostedServiceOptions
{
ArgumentNullException.ThrowIfNull(services);

services.AddHostedService<THostedService>();
services.AddHostedService<THostedService>();
}
}
}
Loading