Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Garnet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ETag", "samples\ETag\ETag.c
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Garnet.fuzz", "test\Garnet.fuzz\Garnet.fuzz.csproj", "{7A42F7AA-EE93-49B1-8711-A1D6D948F5FC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Btree", "playground\BTree\Btree.csproj", "{CE12831B-2805-469E-8208-759DC4B4862C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Device.benchmark", "benchmark\Device.benchmark\Device.benchmark.csproj", "{5422F66F-327C-AABE-98B2-9AFC349745D0}"
EndProject
Global
Expand Down Expand Up @@ -360,6 +362,14 @@ Global
{7A42F7AA-EE93-49B1-8711-A1D6D948F5FC}.Release|Any CPU.Build.0 = Release|Any CPU
{7A42F7AA-EE93-49B1-8711-A1D6D948F5FC}.Release|x64.ActiveCfg = Release|Any CPU
{7A42F7AA-EE93-49B1-8711-A1D6D948F5FC}.Release|x64.Build.0 = Release|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Debug|x64.ActiveCfg = Debug|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Debug|x64.Build.0 = Debug|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Release|Any CPU.Build.0 = Release|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Release|x64.ActiveCfg = Release|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Release|x64.Build.0 = Release|Any CPU
{5422F66F-327C-AABE-98B2-9AFC349745D0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5422F66F-327C-AABE-98B2-9AFC349745D0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5422F66F-327C-AABE-98B2-9AFC349745D0}.Debug|x64.ActiveCfg = Debug|Any CPU
Expand Down Expand Up @@ -406,6 +416,7 @@ Global
{4FBA1587-BAFC-49F8-803A-D1CF431A26F5} = {7068BB97-1958-4060-B5F1-859464592E56}
{7A42F7AA-EE93-49B1-8711-A1D6D948F5FC} = {9A03717A-4E0B-49CA-8579-A02A4C1D003F}
{5422F66F-327C-AABE-98B2-9AFC349745D0} = {346A5A53-51E4-4A75-B7E6-491D950382CE}
{CE12831B-2805-469E-8208-759DC4B4862C} = {69A71E2C-00E3-42F3-854E-BE157A24834E}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2C02C405-4798-41CA-AF98-61EDFEF6772E}
Expand Down
30 changes: 30 additions & 0 deletions benchmark/BDN.benchmark/Network/StreamOperations.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using BenchmarkDotNet.Attributes;
using Embedded.server;

namespace BDN.benchmark.Network
{
/// <summary>
/// Benchmark for BasicOperations
/// </summary>
[MemoryDiagnoser]
public class StreamOperations : NetworkBase
{
static ReadOnlySpan<byte> XADD => "*5\r\n$4\r\nXADD\r\n$8\r\nmystream\r\n$1\r\n*\r\n$5\r\nfield\r\n$5\r\nvalue\r\n"u8;
Request xadd;

public override void GlobalSetup()
{
base.GlobalSetup();
SetupOperation(ref xadd, XADD);
}

[Benchmark]
public async ValueTask InlineXAdd()
{
await Send(xadd);
}
}
}
1 change: 1 addition & 0 deletions benchmark/Resp.benchmark/OpType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public enum OpType
READ_TXN, WRITE_TXN, READWRITETX, WATCH_TXN, SAMPLEUPDATETX, SAMPLEDELETETX,
SCRIPTSET, SCRIPTGET, SCRIPTRETKEY,
PUBLISH, SPUBLISH,
XADD,
READONLY = 8888,
AUTH = 9999,
}
Expand Down
3 changes: 2 additions & 1 deletion benchmark/Resp.benchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ static void WaitForServer(Options opts)
}
break;
}
Console.WriteLine($"Successfully connected to Redis instance at {opts.Address}:{opts.Port}");
}

static void RunBasicCommandsBenchmark(Options opts)
Expand All @@ -220,7 +221,7 @@ static void RunBasicCommandsBenchmark(Options opts)
int keyLen = opts.KeyLength;
int valueLen = opts.ValueLength;

if (opts.Op == OpType.PUBLISH || opts.Op == OpType.SPUBLISH || opts.Op == OpType.ZADD || opts.Op == OpType.ZREM || opts.Op == OpType.ZADDREM || opts.Op == OpType.PING || opts.Op == OpType.GEOADD || opts.Op == OpType.GEOADDREM || opts.Op == OpType.SETEX || opts.Op == OpType.ZCARD || opts.Op == OpType.ZADDCARD)
if (opts.Op == OpType.PUBLISH || opts.Op == OpType.SPUBLISH || opts.Op == OpType.ZADD || opts.Op == OpType.ZREM || opts.Op == OpType.ZADDREM || opts.Op == OpType.PING || opts.Op == OpType.GEOADD || opts.Op == OpType.GEOADDREM || opts.Op == OpType.SETEX || opts.Op == OpType.ZCARD || opts.Op == OpType.ZADDCARD || opts.Op == OpType.XADD)
opts.SkipLoad = true;

//if we have scripts ops we need to load them in memory
Expand Down
5 changes: 5 additions & 0 deletions benchmark/Resp.benchmark/ReqGen.cs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,11 @@ public static (int, int) OnResponse(byte* buf, int bytesRead, int opType)
for (int i = 0; i < bytesRead; i++)
if (buf[i] == '*') count++;
break;
case OpType.XADD:
// XADD returns a bulk string with the stream ID
for (int i = 0; i < bytesRead; i++)
if (buf[i] == '$') count++;
break;
default:
break;
}
Expand Down
2 changes: 2 additions & 0 deletions benchmark/Resp.benchmark/ReqGenLoadBuffers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ private bool GenerateBatch(int i, int start, int end, OpType opType)
OpType.SCRIPTRETKEY => System.Text.Encoding.ASCII.GetBytes($"*4\r\n$7\r\nEVALSHA\r\n{BenchUtils.sha1RetKeyScript}\r\n$1\r\n1\r\n"),
OpType.PUBLISH => System.Text.Encoding.ASCII.GetBytes($"*3\r\n$7\r\nPUBLISH\r\n"),
OpType.SPUBLISH => System.Text.Encoding.ASCII.GetBytes($"*3\r\n$8\r\nSPUBLISH\r\n"),
OpType.XADD => System.Text.Encoding.ASCII.GetBytes($"*5\r\n$4\r\nXADD\r\n"),
_ => null
};

Expand Down Expand Up @@ -178,6 +179,7 @@ private bool GenerateBatch(int i, int start, int end, OpType opType)
case OpType.SCRIPTRETKEY:
case OpType.PUBLISH:
case OpType.SPUBLISH:
case OpType.XADD:
writeSuccess = GenerateSingleKeyValueOp(i, opHeader, start, end, opType);
return writeSuccess;
default:
Expand Down
15 changes: 15 additions & 0 deletions benchmark/Resp.benchmark/ReqGenUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private bool WriteOp(ref byte* curr, byte* vend, OpType opType)
case OpType.SCRIPTRETKEY:
case OpType.PUBLISH:
case OpType.SPUBLISH:
case OpType.XADD:
if (!WriteKey(ref curr, vend, out keyData))
return false;
break;
Expand Down Expand Up @@ -307,6 +308,11 @@ private bool WriteOp(ref byte* curr, byte* vend, OpType opType)
if (!WriteStringBytes(ref curr, vend, valueBuffer))
return false;
break;
case OpType.XADD:
// Auto-generate ID with *
if (!WriteStringBytes(ref curr, vend, System.Text.Encoding.ASCII.GetBytes("*")))
return false;
break;
default:
break;
}
Expand All @@ -319,6 +325,15 @@ private bool WriteOp(ref byte* curr, byte* vend, OpType opType)
if (!WriteInteger(n, ref curr, vend))
return false;
break;
case OpType.XADD:
// Write field name
if (!WriteStringBytes(ref curr, vend, System.Text.Encoding.ASCII.GetBytes("field")))
return false;
// Write field value
RandomString();
if (!WriteStringBytes(ref curr, vend, valueBuffer))
return false;
break;
default:
break;
}
Expand Down
3 changes: 1 addition & 2 deletions libs/common/RespMemoryWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace Garnet.common
ref SpanByteAndMemory output;
public readonly bool resp3;

public unsafe RespMemoryWriter(byte respVersion, ref SpanByteAndMemory output)
public RespMemoryWriter(byte respVersion, ref SpanByteAndMemory output)
{
this.output = ref output;
ptrHandle = default;
Expand Down Expand Up @@ -519,7 +519,6 @@ private void ReallocateOutput(int extraLenHint = 0, bool lowerMinimum = false)
if (ptrHandle.Pointer != default)
{
ptrHandle.Dispose();
output.Memory.Dispose();
}
else
{
Expand Down
10 changes: 10 additions & 0 deletions libs/common/RespReadUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,16 @@ public static bool TryReadInt64WithLengthHeader(out long number, ref byte* ptr,
return true;
}

/// <summary>
/// Tries to read a Ulong from the given ASCII-encoded RESP string.
/// Note: this does not check for any length headers and is simply an accessor to TryReadUlong.
/// </summary>
/// <param name="number">If parsing was successful, contains the parsed ulong value.</param>
/// <param name="ptr">The starting position in the RESP string. Will be advanced if parsing is successful.</param>
/// <param name="end">The current end of the RESP string.</param>
/// <returns>True if a ulong was successfully parsed.</returns>
public static bool ReadUlong(out ulong number, ref byte* ptr, byte* end) => TryReadUInt64(ref ptr, end, out number, out _);

/// <summary>
/// Read long with length header
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions libs/host/Configuration/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,10 @@ public IEnumerable<string> LuaAllowedFunctions
[Option("expired-key-deletion-scan-freq", Required = false, HelpText = "Frequency of background scan for expired key deletion, in seconds")]
public int ExpiredKeyDeletionScanFrequencySecs { get; set; }

[OptionValidation]
[Option("streams", Required = false, HelpText = "Enable streams on server.")]
public bool? EnableStreams { get; set; }

[IntRangeValidation(0, int.MaxValue, includeMin: true, isRequired: false)]
[Option("cluster-replication-reestablishment-timeout")]
public int ClusterReplicationReestablishmentTimeout { get; set; }
Expand Down Expand Up @@ -893,6 +897,7 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
UnixSocketPermission = unixSocketPermissions,
MaxDatabases = MaxDatabases,
ExpiredKeyDeletionScanFrequencySecs = ExpiredKeyDeletionScanFrequencySecs,
EnableStreams = EnableStreams.GetValueOrDefault(),
ClusterReplicationReestablishmentTimeout = ClusterReplicationReestablishmentTimeout,
ClusterReplicaResumeWithData = ClusterReplicaResumeWithData,
};
Expand Down
8 changes: 4 additions & 4 deletions libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,10 @@ private TsavoriteKV<StoreFunctions, StoreAllocator> CreateStore(int dbId, IClust
clusterFactory.CreateCheckpointManager(opts.DeviceFactoryCreator, defaultNamingScheme, isMainStore: true, logger) :
new GarnetCheckpointManager(opts.DeviceFactoryCreator, defaultNamingScheme, removeOutdated: true);

var store = new TsavoriteKV<StoreFunctions, StoreAllocator>(kvSettings
, Tsavorite.core.StoreFunctions.Create(new SpanByteComparer(),
() => new GarnetObjectSerializer(customCommandManager))
, (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions));
var store = new TsavoriteKV<StoreFunctions, StoreAllocator>(kvSettings: kvSettings,
storeFunctions: Tsavorite.core.StoreFunctions.Create(new SpanByteComparer(),
valueSerializerCreator: () => new GarnetObjectSerializer(customCommandManager)),
allocatorFactory: (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions));

if (heapMemorySize > 0 || readCacheHeapMemorySize > 0)
sizeTracker = new CacheSizeTracker(store, heapMemorySize, readCacheHeapMemorySize, this.loggerFactory);
Expand Down
3 changes: 3 additions & 0 deletions libs/host/defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,9 @@
/* Max number of logical databases allowed in a single Garnet server instance */
"MaxDatabases": 16,

/* Enable use of streams inside Garnet */
"EnableStreams": false,

/* Frequency of background scan for expired key deletion, in seconds */
"ExpiredKeyDeletionScanFrequencySecs": -1,

Expand Down
Loading
Loading