Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ IF TYPE_ID(N'__SchemaNamePlaceholder__.OrchestrationEvents') IS NULL
[PayloadID] uniqueidentifier NULL,
[ParentInstanceID] varchar(100) NULL,
[Version] varchar(100) NULL,
[TraceContext] varchar(800) NULL
[TraceContext] varchar(800) NULL,
[Tags] varchar(8000) NULL
)
GO

Expand Down Expand Up @@ -233,7 +234,8 @@ CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.CreateInstance
@InputText varchar(MAX) = NULL,
@StartTime datetime2 = NULL,
@DedupeStatuses varchar(MAX) = 'Pending,Running',
@TraceContext varchar(800) = NULL
@TraceContext varchar(800) = NULL,
@Tags varchar(8000) = NULL
AS
BEGIN
DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub()
Expand Down Expand Up @@ -302,7 +304,8 @@ BEGIN
[ExecutionID],
[RuntimeStatus],
[InputPayloadID],
[TraceContext])
[TraceContext],
[Tags])
VALUES (
@Name,
@Version,
Expand All @@ -311,7 +314,8 @@ BEGIN
@ExecutionID,
@RuntimeStatus,
@InputPayloadID,
@TraceContext
@TraceContext,
@Tags
)

INSERT INTO NewEvents (
Expand Down Expand Up @@ -348,10 +352,12 @@ BEGIN
DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub()
DECLARE @ParentInstanceID varchar(100)
DECLARE @Version varchar(100)
DECLARE @Tags varchar(8000)

SELECT
@ParentInstanceID = [ParentInstanceID],
@Version = [Version]
@Version = [Version],
@Tags = [Tags]
FROM Instances WHERE [InstanceID] = @InstanceID

SELECT
Expand All @@ -370,7 +376,8 @@ BEGIN
[PayloadID],
@ParentInstanceID as [ParentInstanceID],
@Version as [Version],
H.[TraceContext]
H.[TraceContext],
@Tags as [Tags]
FROM History H WITH (INDEX (PK_History))
LEFT OUTER JOIN Payloads P ON
P.[TaskHub] = @TaskHub AND
Expand Down Expand Up @@ -635,6 +642,7 @@ BEGIN
DECLARE @parentInstanceID varchar(100)
DECLARE @version varchar(100)
DECLARE @runtimeStatus varchar(30)
DECLARE @tags varchar(8000)
DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub()

BEGIN TRANSACTION
Expand All @@ -654,7 +662,8 @@ BEGIN
@instanceID = I.[InstanceID],
@parentInstanceID = I.[ParentInstanceID],
@runtimeStatus = I.[RuntimeStatus],
@version = I.[Version]
@version = I.[Version],
@tags = I.[Tags]
FROM
Instances I WITH (READPAST) INNER JOIN NewEvents E WITH (READPAST) ON
E.[TaskHub] = @TaskHub AND
Expand Down Expand Up @@ -684,7 +693,8 @@ BEGIN
DATEDIFF(SECOND, [Timestamp], @now) AS [WaitTime],
@parentInstanceID as [ParentInstanceID],
@version as [Version],
N.[TraceContext]
N.[TraceContext],
@tags as [Tags]
FROM NewEvents N
LEFT OUTER JOIN __SchemaNamePlaceholder__.[Payloads] P ON
P.[TaskHub] = @TaskHub AND
Expand Down Expand Up @@ -724,7 +734,8 @@ BEGIN
[PayloadID],
@parentInstanceID as [ParentInstanceID],
@version as [Version],
H.[TraceContext]
H.[TraceContext],
@tags as [Tags]
FROM History H WITH (INDEX (PK_History))
LEFT OUTER JOIN Payloads P ON
P.[TaskHub] = @TaskHub AND
Expand Down Expand Up @@ -907,7 +918,8 @@ BEGIN
[Version],
[ParentInstanceID],
[RuntimeStatus],
[TraceContext])
[TraceContext],
[Tags])
SELECT DISTINCT
@TaskHub,
E.[InstanceID],
Expand All @@ -916,7 +928,8 @@ BEGIN
E.[Version],
E.[ParentInstanceID],
'Pending',
E.[TraceContext]
E.[TraceContext],
E.[Tags]
FROM @NewOrchestrationEvents E
WHERE E.[EventType] IN ('ExecutionStarted')
AND NOT EXISTS (
Expand Down Expand Up @@ -1185,7 +1198,8 @@ BEGIN
P.[TaskHub] = @TaskHub AND
P.[InstanceID] = I.[InstanceID] AND
P.[PayloadID] = I.[OutputPayloadID]) ELSE NULL END AS [OutputText],
I.[TraceContext]
I.[TraceContext],
I.[Tags]
FROM Instances I
WHERE
I.[TaskHub] = @TaskHub AND
Expand Down Expand Up @@ -1231,7 +1245,8 @@ BEGIN
P.[TaskHub] = @TaskHub AND
P.[InstanceID] = I.[InstanceID] AND
P.[PayloadID] = I.[OutputPayloadID]) ELSE NULL END AS [OutputText],
I.[TraceContext]
I.[TraceContext],
I.[Tags]
FROM
Instances I
WHERE
Expand Down
24 changes: 24 additions & 0 deletions src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- Copyright (c) Microsoft Corporation.
-- Licensed under the MIT License.

-- PERSISTENT SCHEMA OBJECTS (tables, indexes, types, etc.)
--
-- The contents of this file must never be changed after
-- being published. Any schema changes must be done in
-- new schema-{major}.{minor}.{patch}.sql scripts.

-- Add a new Tags column to the Instances table (JSON blob of string key-value pairs).
IF NOT EXISTS (SELECT 1 FROM sys.columns WHERE object_id = OBJECT_ID('__SchemaNamePlaceholder__.Instances') AND name = 'Tags')
ALTER TABLE __SchemaNamePlaceholder__.Instances ADD [Tags] varchar(8000) NULL

-- Add a Tags column to the OrchestrationEvents table type so that merged tags
-- flow through sub-orchestration creation events. To change a type we must first
-- drop all stored procedures that reference it, then drop the type itself.
-- The type and sprocs will be recreated by logic.sql which executes afterwards.
IF OBJECT_ID('__SchemaNamePlaceholder__._AddOrchestrationEvents') IS NOT NULL
DROP PROCEDURE __SchemaNamePlaceholder__._AddOrchestrationEvents
IF OBJECT_ID('__SchemaNamePlaceholder__._CheckpointOrchestration') IS NOT NULL
DROP PROCEDURE __SchemaNamePlaceholder__._CheckpointOrchestration

IF TYPE_ID('__SchemaNamePlaceholder__.OrchestrationEvents') IS NOT NULL
DROP TYPE __SchemaNamePlaceholder__.OrchestrationEvents
8 changes: 6 additions & 2 deletions src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync(
timerMessages,
continuedAsNewMessage,
currentWorkItem.EventPayloadMappings,
this.settings.SchemaName);
this.settings.SchemaName,
this.traceHelper);

command.Parameters.AddTaskEventsParameter(
"@NewTaskEvents",
Expand All @@ -388,6 +389,7 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync(
currentWorkItem.EventPayloadMappings,
this.settings.SchemaName);


try
{
await SqlUtils.ExecuteNonQueryAsync(command, this.traceHelper, instance.InstanceId);
Expand Down Expand Up @@ -522,6 +524,8 @@ public override async Task CreateTaskOrchestrationAsync(TaskMessage creationMess
command.Parameters.Add("@StartTime", SqlDbType.DateTime2).Value = startEvent.ScheduledStartTime;
command.Parameters.Add("@TraceContext", SqlDbType.VarChar, size: 800).Value = SqlUtils.GetTraceContext(startEvent);

command.Parameters.AddTagsParameter(startEvent.Tags);

if (dedupeStatuses?.Length > 0)
{
command.Parameters.Add("@DedupeStatuses", SqlDbType.VarChar).Value = string.Join(",", dedupeStatuses);
Expand All @@ -543,7 +547,7 @@ public override async Task SendTaskOrchestrationMessageAsync(TaskMessage message
using SqlConnection connection = await this.GetAndOpenConnectionAsync();
using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}._AddOrchestrationEvents");

command.Parameters.AddOrchestrationEventsParameter("@NewOrchestrationEvents", message, this.settings.SchemaName);
command.Parameters.AddOrchestrationEventsParameter("@NewOrchestrationEvents", message, this.settings.SchemaName, this.traceHelper);

string instanceId = message.OrchestrationInstance.InstanceId;
await SqlUtils.ExecuteNonQueryAsync(command, this.traceHelper, instanceId);
Expand Down
24 changes: 15 additions & 9 deletions src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ static class OrchestrationEventSqlType
new SqlMetaData("ParentInstanceID", SqlDbType.VarChar, 100),
new SqlMetaData("Version", SqlDbType.VarChar, 100),
new SqlMetaData("TraceContext", SqlDbType.VarChar, 800),
new SqlMetaData("Tags", SqlDbType.VarChar, 8000),
};

static class ColumnOrdinals
Expand All @@ -50,6 +51,7 @@ static class ColumnOrdinals
public const int ParentInstanceID = 11;
public const int Version = 12;
public const int TraceContext = 13;
public const int Tags = 14;
}

public static SqlParameter AddOrchestrationEventsParameter(
Expand All @@ -59,7 +61,8 @@ public static SqlParameter AddOrchestrationEventsParameter(
IList<TaskMessage> timerMessages,
TaskMessage continuedAsNewMessage,
EventPayloadMap eventPayloadMap,
string schemaName)
string schemaName,
LogHelper logHelper)
{
SqlParameter param = commandParameters.Add(paramName, SqlDbType.Structured);
param.TypeName = $"{schemaName}.OrchestrationEvents";
Expand All @@ -70,25 +73,27 @@ public static SqlParameter AddOrchestrationEventsParameter(
messages = messages.Append(continuedAsNewMessage);
}

param.Value = ToOrchestrationMessageParameter(messages, eventPayloadMap);
param.Value = ToOrchestrationMessageParameter(messages, eventPayloadMap, logHelper);
return param;
}

public static SqlParameter AddOrchestrationEventsParameter(
this SqlParameterCollection commandParameters,
string paramName,
TaskMessage message,
string schemaName)
string schemaName,
LogHelper logHelper)
{
SqlParameter param = commandParameters.Add(paramName, SqlDbType.Structured);
param.TypeName = $"{schemaName}.OrchestrationEvents";
param.Value = ToOrchestrationMessageParameter(message);
param.Value = ToOrchestrationMessageParameter(message, logHelper);
return param;
}

static IEnumerable<SqlDataRecord>? ToOrchestrationMessageParameter(
this IEnumerable<TaskMessage> messages,
EventPayloadMap eventPayloadMap)
EventPayloadMap eventPayloadMap,
LogHelper logHelper)
{
if (!messages.Any())
{
Expand All @@ -105,18 +110,18 @@ IEnumerable<SqlDataRecord> GetOrchestrationMessageRecords()
var record = new SqlDataRecord(OrchestrationEventSchema);
foreach (TaskMessage msg in messages)
{
yield return PopulateOrchestrationMessage(msg, record, eventPayloadMap);
yield return PopulateOrchestrationMessage(msg, record, eventPayloadMap, logHelper);
}
}
}

static IEnumerable<SqlDataRecord> ToOrchestrationMessageParameter(TaskMessage msg)
static IEnumerable<SqlDataRecord> ToOrchestrationMessageParameter(TaskMessage msg, LogHelper logHelper)
{
var record = new SqlDataRecord(OrchestrationEventSchema);
yield return PopulateOrchestrationMessage(msg, record, eventPayloadMap: null);
yield return PopulateOrchestrationMessage(msg, record, eventPayloadMap: null, logHelper);
}

static SqlDataRecord PopulateOrchestrationMessage(TaskMessage msg, SqlDataRecord record, EventPayloadMap? eventPayloadMap)
static SqlDataRecord PopulateOrchestrationMessage(TaskMessage msg, SqlDataRecord record, EventPayloadMap? eventPayloadMap, LogHelper logHelper)
{
string instanceId = msg.OrchestrationInstance.InstanceId;

Expand Down Expand Up @@ -152,6 +157,7 @@ static SqlDataRecord PopulateOrchestrationMessage(TaskMessage msg, SqlDataRecord
record.SetSqlString(ColumnOrdinals.ParentInstanceID, SqlUtils.GetParentInstanceId(msg.Event));
record.SetSqlString(ColumnOrdinals.Version, SqlUtils.GetVersion(msg.Event));
record.SetSqlString(ColumnOrdinals.TraceContext, SqlUtils.GetTraceContext(msg.Event));
record.SetSqlString(ColumnOrdinals.Tags, SqlUtils.GetTagsJson(msg.Event, logHelper));

return record;
}
Expand Down
Loading
Loading