Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions src/LLL.DurableTask.Core/Serializing/HistoryEventConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class HistoryEventConverter : JsonConverter
//{ EventType.ExecutionFailed, typeof(ExecutionFailedEvent) },
{ EventType.ExecutionStarted, typeof(ExecutionStartedEvent) },
{ EventType.ExecutionTerminated, typeof(ExecutionTerminatedEvent) },
{ EventType.ExecutionRewound, typeof(ExecutionRewoundEvent) },
{ EventType.GenericEvent, typeof(GenericEvent) },
{ EventType.HistoryState, typeof(HistoryStateEvent) },
{ EventType.OrchestratorCompleted, typeof(OrchestratorCompletedEvent) },
Expand All @@ -43,15 +44,24 @@ public override object ReadJson(JsonReader reader, Type objectType, object exist
{
var jObject = JObject.Load(reader);

var eventTypeToken = jObject.GetValue("EventType", StringComparison.OrdinalIgnoreCase);
var eventType = jObject.GetValue("EventType", StringComparison.OrdinalIgnoreCase)
?.ToObject<EventType>()
?? throw new Exception("Expected EventType field in HistoryEvent");

if (eventTypeToken is null)
throw new Exception("Expected EventType field in HistoryEvent");

var eventType = eventTypeToken.ToObject<EventType>();
var eventId = jObject.GetValue("EventId", StringComparison.OrdinalIgnoreCase)
?.ToObject<int>()
?? throw new Exception("Expected EventId field in HistoryEvent");

var type = _typesMap[eventType];

if (type == typeof(ExecutionRewoundEvent))
{
// Handles multiple constructors present in ExecutionRewoundEvent
var @event = new ExecutionRewoundEvent(eventId);
serializer.Populate(jObject.CreateReader(), @event);
return @event;
}

return jObject.ToObject(type, serializer);
}

Expand Down
1 change: 1 addition & 0 deletions src/LLL.DurableTask.EFCore/EFCoreOrchestrationOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ public class EFCoreOrchestrationOptions
public TimeSpan ActivtyLockTimeout { get; set; } = TimeSpan.FromMinutes(1);
public TimeSpan FetchNewMessagesPollingTimeout { get; set; } = TimeSpan.FromSeconds(10);
public int DelayInSecondsAfterFailure { get; set; } = 5;
public bool UseDTFxRewind { get; set; } = true;
}
23 changes: 20 additions & 3 deletions src/LLL.DurableTask.EFCore/EFCoreOrchestrationServiceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using DurableTask.Core.Tracing;
using LLL.DurableTask.Core;
using LLL.DurableTask.EFCore.Extensions;
using LLL.DurableTask.EFCore.Mappers;
Expand Down Expand Up @@ -240,9 +241,25 @@ public async Task<OrchestrationQueryResult> GetOrchestrationWithQueryAsync(Orche

public async Task RewindTaskOrchestrationAsync(string instanceId, string reason)
{
using var dbContext = _dbContextFactory.CreateDbContext();
await RewindInstanceAsync(dbContext, instanceId, reason, true, FindLastErrorOrCompletionRewindPoint);
await dbContext.SaveChangesAsync();
if (_options.UseDTFxRewind)
{
var taskMessage = new TaskMessage
{
OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
Event = new ExecutionRewoundEvent(-1, reason)
{
// Set a dummy trace context to avoid an exception in DTFx
ParentTraceContext = new DistributedTraceContext($"{instanceId}")
}
};
await SendTaskOrchestrationMessageAsync(taskMessage);
}
else
{
using var dbContext = _dbContextFactory.CreateDbContext();
await RewindInstanceAsync(dbContext, instanceId, reason, true, FindLastErrorOrCompletionRewindPoint);
await dbContext.SaveChangesAsync();
}
}

private async Task RewindInstanceAsync(OrchestrationDbContext dbContext, string instanceId, string reason, bool rewindParents, Func<IList<HistoryEvent>, HistoryEvent> findRewindPoint)
Expand Down
78 changes: 39 additions & 39 deletions src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,54 +70,38 @@ public async Task<IList<TaskMessage>> FetchNewMessagesAsync(
.OrderBy(w => w.AvailableAt)
.ThenBy(w => w.SequenceNumber)
.AsNoTracking()
.ToArrayAsync(cancellationToken);

var messagesToDiscard = newDbMessages
.Where(m => m.ExecutionId is not null && m.ExecutionId != Instance.LastExecutionId)
.ToArray();

if (messagesToDiscard.Length > 0)
{
foreach (var message in messagesToDiscard)
{
dbContext.OrchestrationMessages.Attach(message);
dbContext.OrchestrationMessages.Remove(message);
}

newDbMessages = newDbMessages
.Except(messagesToDiscard)
.ToArray();
}
.ToListAsync(cancellationToken);

var deserializedMessages = newDbMessages
.Select(w => _options.DataConverter.Deserialize<TaskMessage>(w.Message))
.ToList();

if (RuntimeState.ExecutionStartedEvent is not null)
if (RuntimeState.ExecutionStartedEvent is not null
&& RuntimeState.OrchestrationStatus is OrchestrationStatus.Completed
&& deserializedMessages.Any(m => m.Event.EventType == EventType.EventRaised))
{
if (RuntimeState.OrchestrationStatus is OrchestrationStatus.Completed
&& deserializedMessages.Any(m => m.Event.EventType == EventType.EventRaised))
{
// Reopen completed orchestrations after receiving an event raised
RuntimeState = new OrchestrationRuntimeState(
RuntimeState.Events.Reopen(_options.DataConverter)
);
}
// Reopen completed orchestrations after receiving an event raised
RuntimeState = new OrchestrationRuntimeState(
RuntimeState.Events.Reopen(_options.DataConverter)
);
}

var isRunning = RuntimeState.ExecutionStartedEvent is null
|| RuntimeState.OrchestrationStatus is OrchestrationStatus.Running
or OrchestrationStatus.Suspended
or OrchestrationStatus.Pending;

var isRunning = RuntimeState.OrchestrationStatus is OrchestrationStatus.Running
or OrchestrationStatus.Suspended
or OrchestrationStatus.Pending;
for (var i = newDbMessages.Count - 1; i >= 0; i--)
{
var dbMessage = newDbMessages[i];
var deserializedMessage = deserializedMessages[i];

if (!isRunning)
if (ShouldDropNewMessage(isRunning, dbMessage, deserializedMessage))
{
// Discard all messages if not running
foreach (var message in newDbMessages)
{
dbContext.OrchestrationMessages.Attach(message);
dbContext.OrchestrationMessages.Remove(message);
}
newDbMessages = [];
deserializedMessages = [];
dbContext.OrchestrationMessages.Attach(dbMessage);
dbContext.OrchestrationMessages.Remove(dbMessage);
newDbMessages.RemoveAt(i);
deserializedMessages.RemoveAt(i);
}
}

Expand All @@ -126,6 +110,22 @@ or OrchestrationStatus.Suspended
return deserializedMessages;
}

private bool ShouldDropNewMessage(
bool isRunning,
OrchestrationMessage dbMessage,
TaskMessage taskMessage)
{
// Drop messages to previous executions
if (dbMessage.ExecutionId is not null && dbMessage.ExecutionId != Instance.LastExecutionId)
return true;

// When not running, drop anything that is not execution rewound
if (!isRunning && taskMessage.Event.EventType != EventType.ExecutionRewound)
return true;

return false;
}

public void ClearMessages()
{
Messages.Clear();
Expand Down