Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions LiteDB/Document/BsonValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ internal virtual int GetBytesCount(bool recalc)
case BsonType.Double: return 8;
case BsonType.Decimal: return 16;

case BsonType.String: return Encoding.UTF8.GetByteCount(this.AsString);
case BsonType.String: return StringEncoding.UTF8.GetByteCount(this.AsString);

case BsonType.Binary: return this.AsBinary.Length;
case BsonType.ObjectId: return 12;
Expand All @@ -674,7 +674,7 @@ protected int GetBytesCountElement(string key, BsonValue value)

return
1 + // element type
Encoding.UTF8.GetByteCount(key) + // CString
StringEncoding.UTF8.GetByteCount(key) + // CString
1 + // CString \0
value.GetBytesCount(true) +
(variant ? 5 : 0); // bytes.Length + 0x??
Expand Down
2 changes: 0 additions & 2 deletions LiteDB/Engine/Disk/DiskService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ public int WriteAsync(IEnumerable<PageBuffer> pages)
count++;
}

_queue.Value.Run();

return count;
}

Expand Down
87 changes: 38 additions & 49 deletions LiteDB/Engine/Disk/DiskWriterQueue.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using static LiteDB.Constants;
Expand All @@ -18,10 +17,12 @@ internal class DiskWriterQueue : IDisposable

// async thread controls
private Task _task;
private bool _shouldClose = false;

private readonly ConcurrentQueue<PageBuffer> _queue = new ConcurrentQueue<PageBuffer>();

private int _running = 0;
private readonly object _queueSync = new object();
private readonly AsyncManualResetEvent _queueHasItems = new AsyncManualResetEvent();
private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true);

public DiskWriterQueue(Stream stream)
{
Expand All @@ -40,26 +41,15 @@ public DiskWriterQueue(Stream stream)
public void EnqueuePage(PageBuffer page)
{
ENSURE(page.Origin == FileOrigin.Log, "async writer must use only for Log file");

_queue.Enqueue(page);
}

/// <summary>
/// If queue contains pages and are not running, starts run queue again now
/// </summary>
public void Run()
{
lock (_queue)
lock (_queueSync)
{
if (_queue.Count == 0) return;
_queueIsEmpty.Reset();
_queue.Enqueue(page);
_queueHasItems.Set();

var oldValue = Interlocked.CompareExchange(ref _running, 1, 0);

if (oldValue == 0)
if (_task == null)
{
// Schedule a new thread to process the pages in the queue.
// https://blog.stephencleary.com/2013/08/startnew-is-dangerous.html
_task = Task.Run(ExecuteQueue);
_task = Task.Factory.StartNew(ExecuteQueue, TaskCreationOptions.LongRunning);
}
}
}
Expand All @@ -69,53 +59,47 @@ public void Run()
/// </summary>
public void Wait()
{
lock (_queue)
{
if (_task != null)
{
_task.Wait();
}

Run();
}

_queueIsEmpty.Wait();
ENSURE(_queue.Count == 0, "queue should be empty after wait() call");
}

/// <summary>
/// Execute all items in queue sync
/// </summary>
private void ExecuteQueue()
private async Task ExecuteQueue()
{
do
while (true)
{
if (_queue.TryDequeue(out var page))
{
WritePageToStream(page);
}

while (page == null)
else
{
_stream.FlushToDisk();
Volatile.Write(ref _running, 0);

if (!_queue.Any()) return;

// Another item was added to the queue after we detected it was empty.
var oldValue = Interlocked.CompareExchange(ref _running, 1, 0);

if (oldValue == 1)
lock (_queueSync)
{
// A new thread was already scheduled for execution, this thread can return.
return;
if (_queue.Count > 0) continue;
_queueIsEmpty.Set();
_queueHasItems.Reset();
if (_shouldClose) return;
}
TryFlushStream();

// This thread will continue to process the queue as a new thread was not scheduled.
_queue.TryDequeue(out page);
WritePageToStream(page);
await _queueHasItems.WaitAsync();
}
}
}

} while (true);
private void TryFlushStream()
{
try
{
_stream.FlushToDisk();
}
catch (IOException)
{
// Disk is probably full. This may be unrecoverable problem but until we have enough space in the buffer we may be ok.
}
}

private void WritePageToStream(PageBuffer page)
Expand All @@ -137,8 +121,13 @@ public void Dispose()
{
LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK");

_shouldClose = true;
_queueHasItems.Set(); // unblock the running loop in case there are no items

// run all items in queue before dispose
this.Wait();
_task?.Wait();
_task = null;
}
}
}
8 changes: 4 additions & 4 deletions LiteDB/Engine/Disk/Serializer/BufferReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public string ReadString(int count)
// if fits in current segment, use inner array - otherwise copy from multiples segments
if (_currentPosition + count <= _current.Count)
{
value = Encoding.UTF8.GetString(_current.Array, _current.Offset + _currentPosition, count);
value = StringEncoding.UTF8.GetString(_current.Array, _current.Offset + _currentPosition, count);

this.MoveForward(count);
}
Expand All @@ -165,7 +165,7 @@ public string ReadString(int count)

this.Read(buffer, 0, count);

value = Encoding.UTF8.GetString(buffer, 0, count);
value = StringEncoding.UTF8.GetString(buffer, 0, count);

BufferPool.Return(buffer);
}
Expand Down Expand Up @@ -204,7 +204,7 @@ public string ReadCString()

this.MoveForward(1); // +1 to '\0'

return Encoding.UTF8.GetString(mem.ToArray());
return StringEncoding.UTF8.GetString(mem.ToArray());
}
}
}
Expand All @@ -220,7 +220,7 @@ private bool TryReadCStringCurrentSegment(out string value)
{
if (_current[pos] == 0x00)
{
value = Encoding.UTF8.GetString(_current.Array, _current.Offset + _currentPosition, count);
value = StringEncoding.UTF8.GetString(_current.Array, _current.Offset + _currentPosition, count);
this.MoveForward(count + 1); // +1 means '\0'
return true;
}
Expand Down
12 changes: 6 additions & 6 deletions LiteDB/Engine/Disk/Serializer/BufferWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,13 @@ public void WriteCString(string value)
{
if (value.IndexOf('\0') > -1) throw LiteException.InvalidNullCharInString();

var bytesCount = Encoding.UTF8.GetByteCount(value);
var bytesCount = StringEncoding.UTF8.GetByteCount(value);
var available = _current.Count - _currentPosition; // avaiable in current segment

// can write direct in current segment (use < because need +1 \0)
if (bytesCount < available)
{
Encoding.UTF8.GetBytes(value, 0, value.Length, _current.Array, _current.Offset + _currentPosition);
StringEncoding.UTF8.GetBytes(value, 0, value.Length, _current.Array, _current.Offset + _currentPosition);

_current[_currentPosition + bytesCount] = 0x00;

Expand All @@ -168,7 +168,7 @@ public void WriteCString(string value)
{
var buffer = BufferPool.Rent(bytesCount);

Encoding.UTF8.GetBytes(value, 0, value.Length, buffer, 0);
StringEncoding.UTF8.GetBytes(value, 0, value.Length, buffer, 0);

this.Write(buffer, 0, bytesCount);

Expand All @@ -186,7 +186,7 @@ public void WriteCString(string value)
/// </summary>
public void WriteString(string value, bool specs)
{
var count = Encoding.UTF8.GetByteCount(value);
var count = StringEncoding.UTF8.GetByteCount(value);

if (specs)
{
Expand All @@ -195,7 +195,7 @@ public void WriteString(string value, bool specs)

if (count <= _current.Count - _currentPosition)
{
Encoding.UTF8.GetBytes(value, 0, value.Length, _current.Array, _current.Offset + _currentPosition);
StringEncoding.UTF8.GetBytes(value, 0, value.Length, _current.Array, _current.Offset + _currentPosition);

this.MoveForward(count);
}
Expand All @@ -204,7 +204,7 @@ public void WriteString(string value, bool specs)
// rent a buffer to be re-usable
var buffer = BufferPool.Rent(count);

Encoding.UTF8.GetBytes(value, 0, value.Length, buffer, 0);
StringEncoding.UTF8.GetBytes(value, 0, value.Length, buffer, 0);

this.Write(buffer, 0, count);

Expand Down
4 changes: 2 additions & 2 deletions LiteDB/Engine/Engine/Upgrade.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public static bool Upgrade(string filename, string password = null, Collation co
Collation = collation
};

var backup = FileHelper.GetSufixFile(filename, "-backup", true);
var backup = FileHelper.GetSuffixFile(filename, "-backup", true);

settings.Filename = FileHelper.GetSufixFile(filename, "-temp", true);
settings.Filename = FileHelper.GetSuffixFile(filename, "-temp", true);

var buffer = new byte[PAGE_SIZE * 2];
IFileReader reader;
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Pages/HeaderPage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace LiteDB.Engine
{
/// <summary>
/// Header page represent first page on datafile. Engine contains a single instance of HeaderPage and all changes
/// must be syncornized (using lock).
/// must be synchronized (using lock).
/// </summary>
internal class HeaderPage : BasePage
{
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Services/TransactionMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private int GetInitialSize()
{
var sum = 0;

// if there is no avaiable pages, reduce all open transactions
// if there is no available pages, reduce all open transactions
foreach (var trans in _transactions.Values)
{
//TODO: revisar estas contas, o reduce tem que fechar 1000
Expand Down
4 changes: 2 additions & 2 deletions LiteDB/Engine/Services/TransactionService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public void Commit()
}
}

// dispose all snapshosts
// dispose all snapshots
foreach (var snapshot in _snapshots.Values)
{
snapshot.Dispose();
Expand All @@ -291,7 +291,7 @@ public void Rollback()
this.ReturnNewPages();
}

// dispose all snaphosts
// dispose all snapshots
foreach (var snapshot in _snapshots.Values)
{
// but first, if writable, discard changes
Expand Down
4 changes: 2 additions & 2 deletions LiteDB/Engine/Structures/CollectionIndex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ public static int GetLength(string name, string expr)
return
1 + // Slot
1 + // IndexType
Encoding.UTF8.GetByteCount(name) + 1 + // Name + \0
Encoding.UTF8.GetByteCount(expr) + 1 + // Expression + \0
StringEncoding.UTF8.GetByteCount(name) + 1 + // Name + \0
StringEncoding.UTF8.GetByteCount(expr) + 1 + // Expression + \0
1 + // Unique
PageAddress.SIZE + // Head
PageAddress.SIZE + // Tail
Expand Down
35 changes: 35 additions & 0 deletions LiteDB/Utils/AsyncManualResetEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System.Threading;
using System.Threading.Tasks;

namespace LiteDB
{
/// <summary>
/// Async implementation of ManualResetEvent
/// https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-1-asyncmanualresetevent/
/// </summary>
internal class AsyncManualResetEvent
{
private volatile TaskCompletionSource<bool> _tcs = new TaskCompletionSource<bool>();

public Task WaitAsync()
{
return _tcs.Task;
}

public void Set()
{
_tcs.TrySetResult(true);
}

public void Reset()
{
while (true)
{
var tcs = _tcs;
if (!tcs.Task.IsCompleted ||
Interlocked.CompareExchange(ref _tcs, new TaskCompletionSource<bool>(), tcs) == tcs)
return;
}
}
}
}
12 changes: 12 additions & 0 deletions LiteDB/Utils/Encoding.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Text;

namespace LiteDB
{
internal class StringEncoding
{
// Original Encoding.UTF8 will replace unpaired surrogate with U+FFFD, which is not suitable for database
// so, we need to use new UTF8Encoding(false, true) to make throw exception when unpaired surrogate is found
//public static System.Text.Encoding UTF8 = new UTF8Encoding(false, true);
public static Encoding UTF8 = new UTF8Encoding(false, true);
}
}
6 changes: 3 additions & 3 deletions LiteDB/Utils/FileHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ internal static class FileHelper
/// <summary>
/// Create a temp filename based on original filename - checks if file exists (if exists, append counter number)
/// </summary>
public static string GetSufixFile(string filename, string suffix = "-temp", bool checkIfExists = true)
public static string GetSuffixFile(string filename, string suffix = "-temp", bool checkIfExists = true)
{
var count = 0;
var temp = Path.Combine(Path.GetDirectoryName(filename),
Expand All @@ -37,12 +37,12 @@ public static string GetSufixFile(string filename, string suffix = "-temp", bool
/// <summary>
/// Get LOG file based on data file
/// </summary>
public static string GetLogFile(string filename) => GetSufixFile(filename, "-log", false);
public static string GetLogFile(string filename) => GetSuffixFile(filename, "-log", false);

/// <summary>
/// Get TEMP file based on data file
/// </summary>
public static string GetTempFile(string filename) => GetSufixFile(filename, "-tmp", false);
public static string GetTempFile(string filename) => GetSuffixFile(filename, "-tmp", false);

/// <summary>
/// Test if file are used by any process
Expand Down