Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions src/LLL.DurableTask.EFCore/EFCoreOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using DurableTask.Core.History;
using LLL.DurableTask.Core;
using LLL.DurableTask.EFCore.Entities;
using LLL.DurableTask.EFCore.Extensions;
using LLL.DurableTask.EFCore.Mappers;
using LLL.DurableTask.EFCore.Polling;
using Microsoft.EntityFrameworkCore;
Expand Down Expand Up @@ -135,9 +134,7 @@ public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAs
.Select(e => _options.DataConverter.Deserialize<HistoryEvent>(e.Content))
.ToArray();

var reopenedEvents = deserializedEvents.Reopen(_options.DataConverter);

var runtimeState = new OrchestrationRuntimeState(reopenedEvents);
var runtimeState = new OrchestrationRuntimeState(deserializedEvents);

var session = new EFCoreOrchestrationSession(
_options,
Expand All @@ -163,7 +160,7 @@ public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAs
{
InstanceId = instance.InstanceId,
LockedUntilUtc = instance.LockedUntil,
OrchestrationRuntimeState = runtimeState,
OrchestrationRuntimeState = session.RuntimeState,
NewMessages = messages,
Session = session
};
Expand Down
34 changes: 32 additions & 2 deletions src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.History;
using LLL.DurableTask.EFCore.Entities;
using LLL.DurableTask.EFCore.Extensions;
using LLL.DurableTask.EFCore.Polling;
using Microsoft.EntityFrameworkCore;

Expand Down Expand Up @@ -87,12 +89,40 @@ public async Task<IList<TaskMessage>> FetchNewMessagesAsync(
.ToArray();
}

Messages.AddRange(newDbMessages);

var deserializedMessages = newDbMessages
.Select(w => _options.DataConverter.Deserialize<TaskMessage>(w.Message))
.ToList();

if (RuntimeState.ExecutionStartedEvent is not null)
{
if (RuntimeState.OrchestrationStatus is OrchestrationStatus.Completed
&& deserializedMessages.Any(m => m.Event.EventType == EventType.EventRaised))
{
// Reopen completed orchestrations after receiving an event raised
RuntimeState = new OrchestrationRuntimeState(
RuntimeState.Events.Reopen(_options.DataConverter)
);
}

var isRunning = RuntimeState.OrchestrationStatus is OrchestrationStatus.Running
or OrchestrationStatus.Suspended
or OrchestrationStatus.Pending;

if (!isRunning)
{
// Discard all messages if not running
foreach (var message in newDbMessages)
{
dbContext.OrchestrationMessages.Attach(message);
dbContext.OrchestrationMessages.Remove(message);
}
newDbMessages = [];
deserializedMessages = [];
}
}

Messages.AddRange(newDbMessages);

return deserializedMessages;
}

Expand Down
32 changes: 32 additions & 0 deletions test/LLL.DurableTask.Tests/Storages/StorageTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using FluentAssertions;
using LLL.DurableTask.Core;
using LLL.DurableTask.Core.Serializing;
using LLL.DurableTask.EFCore.Polling;
using LLL.DurableTask.Tests.Storage.Activities;
using LLL.DurableTask.Tests.Storage.Orchestrations;
Expand Down Expand Up @@ -159,6 +161,36 @@ public async Task TimerOrchestration_ShouldComplete()
state.OrchestrationStatus.Should().Be(OrchestrationStatus.Completed);
}

[Trait("Category", "Integration")]
[SkippableFact]
public async Task TimerOrchestration_ShouldTerminateProperly()
{
var taskHubClient = _host.Services.GetRequiredService<TaskHubClient>();

var input = Guid.NewGuid();

var instance = await taskHubClient.CreateOrchestrationInstanceAsync(
TimerOrchestration.Name,
TimerOrchestration.Version,
input);

await Task.Delay(TimeSpan.FromSeconds(1));

await taskHubClient.TerminateInstanceAsync(instance);

var state = await taskHubClient.WaitForOrchestrationAsync(instance, FastWaitTimeout);

state.Should().NotBeNull();
state.OrchestrationStatus.Should().Be(OrchestrationStatus.Terminated);

await Task.Delay(TimeSpan.FromSeconds(3));

var history = await taskHubClient.GetOrchestrationHistoryAsync(instance);

var events = new TypelessJsonDataConverter().Deserialize<HistoryEvent[]>(history);
events.Should().NotContain(x => x.EventType == EventType.TimerFired);
}

[Trait("Category", "Integration")]
[SkippableTheory]
[InlineData(ContinueAsNewEmptyOrchestration.Name, ContinueAsNewEmptyOrchestration.Version)]
Expand Down