From 33f85d53048c8a746df79cbfc999aedfa85464c3 Mon Sep 17 00:00:00 2001 From: Lubomir Tetak Date: Thu, 25 Jan 2024 11:19:11 +0100 Subject: [PATCH 1/3] Simplified DiskWriterQueue with blocking concurrency --- LiteDB/Engine/Disk/DiskService.cs | 2 - LiteDB/Engine/Disk/DiskWriterQueue.cs | 73 ++++++++------------------- 2 files changed, 22 insertions(+), 53 deletions(-) diff --git a/LiteDB/Engine/Disk/DiskService.cs b/LiteDB/Engine/Disk/DiskService.cs index ad2bc8344..de6d0b503 100644 --- a/LiteDB/Engine/Disk/DiskService.cs +++ b/LiteDB/Engine/Disk/DiskService.cs @@ -186,8 +186,6 @@ public int WriteAsync(IEnumerable pages) count++; } - _queue.Value.Run(); - return count; } diff --git a/LiteDB/Engine/Disk/DiskWriterQueue.cs b/LiteDB/Engine/Disk/DiskWriterQueue.cs index 0592cc6b9..88f0c1653 100644 --- a/LiteDB/Engine/Disk/DiskWriterQueue.cs +++ b/LiteDB/Engine/Disk/DiskWriterQueue.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Concurrent; using System.IO; -using System.Linq; using System.Threading; using System.Threading.Tasks; using static LiteDB.Constants; @@ -18,14 +17,17 @@ internal class DiskWriterQueue : IDisposable // async thread controls private Task _task; + private bool _shouldClose = false; private readonly ConcurrentQueue _queue = new ConcurrentQueue(); - - private int _running = 0; + private readonly object _queueSync = new object(); + private readonly ManualResetEventSlim _queueHasItems = new ManualResetEventSlim(false); + private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true); public DiskWriterQueue(Stream stream) { _stream = stream; + _task = Task.Run(ExecuteQueue); } /// @@ -40,27 +42,11 @@ public DiskWriterQueue(Stream stream) public void EnqueuePage(PageBuffer page) { ENSURE(page.Origin == FileOrigin.Log, "async writer must use only for Log file"); - - _queue.Enqueue(page); - } - - /// - /// If queue contains pages and are not running, starts run queue again now - /// - public void Run() - { - lock (_queue) + lock (_queueSync) { - if (_queue.Count == 0) return; - - var oldValue = Interlocked.CompareExchange(ref _running, 1, 0); - - if (oldValue == 0) - { - // Schedule a new thread to process the pages in the queue. - // https://blog.stephencleary.com/2013/08/startnew-is-dangerous.html - _task = Task.Run(ExecuteQueue); - } + _queueIsEmpty.Reset(); + _queue.Enqueue(page); + _queueHasItems.Set(); } } @@ -69,16 +55,7 @@ public void Run() /// public void Wait() { - lock (_queue) - { - if (_task != null) - { - _task.Wait(); - } - - Run(); - } - + _queueIsEmpty.Wait(); ENSURE(_queue.Count == 0, "queue should be empty after wait() call"); } @@ -87,35 +64,25 @@ public void Wait() /// private void ExecuteQueue() { - do + while (true) { if (_queue.TryDequeue(out var page)) { WritePageToStream(page); } - - while (page == null) + else { _stream.FlushToDisk(); - Volatile.Write(ref _running, 0); - - if (!_queue.Any()) return; - - // Another item was added to the queue after we detected it was empty. - var oldValue = Interlocked.CompareExchange(ref _running, 1, 0); - - if (oldValue == 1) + lock (_queueSync) { - // A new thread was already scheduled for execution, this thread can return. - return; + if (_queue.Count > 0) continue; + _queueIsEmpty.Set(); } - // This thread will continue to process the queue as a new thread was not scheduled. - _queue.TryDequeue(out page); - WritePageToStream(page); + _queueHasItems.Wait(); + if (_shouldClose) return; } - - } while (true); + } } private void WritePageToStream(PageBuffer page) @@ -137,8 +104,12 @@ public void Dispose() { LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK"); + _shouldClose = true; + // run all items in queue before dispose this.Wait(); + _task?.Wait(); + _task = null; } } } \ No newline at end of file From f21cd8405eda7c7b59b018c9a541b0c3605eb5da Mon Sep 17 00:00:00 2001 From: Lubomir Tetak Date: Fri, 26 Jan 2024 10:06:08 +0100 Subject: [PATCH 2/3] Async DiskWriterQueue implementation --- LiteDB/Engine/Disk/DiskWriterQueue.cs | 18 +++++++++----- LiteDB/Utils/AsyncManualResetEvent.cs | 35 +++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 6 deletions(-) create mode 100644 LiteDB/Utils/AsyncManualResetEvent.cs diff --git a/LiteDB/Engine/Disk/DiskWriterQueue.cs b/LiteDB/Engine/Disk/DiskWriterQueue.cs index 88f0c1653..2b2e67041 100644 --- a/LiteDB/Engine/Disk/DiskWriterQueue.cs +++ b/LiteDB/Engine/Disk/DiskWriterQueue.cs @@ -21,13 +21,12 @@ internal class DiskWriterQueue : IDisposable private readonly ConcurrentQueue _queue = new ConcurrentQueue(); private readonly object _queueSync = new object(); - private readonly ManualResetEventSlim _queueHasItems = new ManualResetEventSlim(false); + private readonly AsyncManualResetEvent _queueHasItems = new AsyncManualResetEvent(); private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true); public DiskWriterQueue(Stream stream) { _stream = stream; - _task = Task.Run(ExecuteQueue); } /// @@ -47,6 +46,11 @@ public void EnqueuePage(PageBuffer page) _queueIsEmpty.Reset(); _queue.Enqueue(page); _queueHasItems.Set(); + + if (_task == null) + { + _task = Task.Factory.StartNew(ExecuteQueue, TaskCreationOptions.LongRunning); + } } } @@ -62,7 +66,7 @@ public void Wait() /// /// Execute all items in queue sync /// - private void ExecuteQueue() + private async Task ExecuteQueue() { while (true) { @@ -72,15 +76,16 @@ private void ExecuteQueue() } else { - _stream.FlushToDisk(); lock (_queueSync) { if (_queue.Count > 0) continue; _queueIsEmpty.Set(); + _queueHasItems.Reset(); + if (_shouldClose) return; } + _stream.FlushToDisk(); - _queueHasItems.Wait(); - if (_shouldClose) return; + await _queueHasItems.WaitAsync(); } } } @@ -105,6 +110,7 @@ public void Dispose() LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK"); _shouldClose = true; + _queueHasItems.Set(); // unblock the running loop in case there are no items // run all items in queue before dispose this.Wait(); diff --git a/LiteDB/Utils/AsyncManualResetEvent.cs b/LiteDB/Utils/AsyncManualResetEvent.cs new file mode 100644 index 000000000..0cbaf3421 --- /dev/null +++ b/LiteDB/Utils/AsyncManualResetEvent.cs @@ -0,0 +1,35 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace LiteDB +{ + /// + /// Async implementation of ManualResetEvent + /// https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-1-asyncmanualresetevent/ + /// + internal class AsyncManualResetEvent + { + private volatile TaskCompletionSource _tcs = new TaskCompletionSource(); + + public Task WaitAsync() + { + return _tcs.Task; + } + + public void Set() + { + _tcs.TrySetResult(true); + } + + public void Reset() + { + while (true) + { + var tcs = _tcs; + if (!tcs.Task.IsCompleted || + Interlocked.CompareExchange(ref _tcs, new TaskCompletionSource(), tcs) == tcs) + return; + } + } + } +} \ No newline at end of file From fab1a407a1764be7c2675319d3ece97dbaee222c Mon Sep 17 00:00:00 2001 From: Lubomir Tetak Date: Thu, 25 Jan 2024 11:19:11 +0100 Subject: [PATCH 3/3] DiskWriterQueue resiliency --- LiteDB/Engine/Disk/DiskWriterQueue.cs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/LiteDB/Engine/Disk/DiskWriterQueue.cs b/LiteDB/Engine/Disk/DiskWriterQueue.cs index 2b2e67041..3f7826ddd 100644 --- a/LiteDB/Engine/Disk/DiskWriterQueue.cs +++ b/LiteDB/Engine/Disk/DiskWriterQueue.cs @@ -83,13 +83,25 @@ private async Task ExecuteQueue() _queueHasItems.Reset(); if (_shouldClose) return; } - _stream.FlushToDisk(); + TryFlushStream(); await _queueHasItems.WaitAsync(); } } } + private void TryFlushStream() + { + try + { + _stream.FlushToDisk(); + } + catch (IOException) + { + // Disk is probably full. This may be unrecoverable problem but until we have enough space in the buffer we may be ok. + } + } + private void WritePageToStream(PageBuffer page) { if (page == null) return;