diff --git a/src/LLL.DurableTask.EFCore/EFCoreOrchestrationService.cs b/src/LLL.DurableTask.EFCore/EFCoreOrchestrationService.cs index 5566c41..da3f6a0 100644 --- a/src/LLL.DurableTask.EFCore/EFCoreOrchestrationService.cs +++ b/src/LLL.DurableTask.EFCore/EFCoreOrchestrationService.cs @@ -7,7 +7,6 @@ using DurableTask.Core.History; using LLL.DurableTask.Core; using LLL.DurableTask.EFCore.Entities; -using LLL.DurableTask.EFCore.Extensions; using LLL.DurableTask.EFCore.Mappers; using LLL.DurableTask.EFCore.Polling; using Microsoft.EntityFrameworkCore; @@ -135,9 +134,7 @@ public async Task LockNextTaskOrchestrationWorkItemAs .Select(e => _options.DataConverter.Deserialize(e.Content)) .ToArray(); - var reopenedEvents = deserializedEvents.Reopen(_options.DataConverter); - - var runtimeState = new OrchestrationRuntimeState(reopenedEvents); + var runtimeState = new OrchestrationRuntimeState(deserializedEvents); var session = new EFCoreOrchestrationSession( _options, @@ -163,7 +160,7 @@ public async Task LockNextTaskOrchestrationWorkItemAs { InstanceId = instance.InstanceId, LockedUntilUtc = instance.LockedUntil, - OrchestrationRuntimeState = runtimeState, + OrchestrationRuntimeState = session.RuntimeState, NewMessages = messages, Session = session }; diff --git a/src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs b/src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs index 8c44aaf..a14cde2 100644 --- a/src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs +++ b/src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs @@ -4,7 +4,9 @@ using System.Threading; using System.Threading.Tasks; using DurableTask.Core; +using DurableTask.Core.History; using LLL.DurableTask.EFCore.Entities; +using LLL.DurableTask.EFCore.Extensions; using LLL.DurableTask.EFCore.Polling; using Microsoft.EntityFrameworkCore; @@ -87,12 +89,40 @@ public async Task> FetchNewMessagesAsync( .ToArray(); } - Messages.AddRange(newDbMessages); - var deserializedMessages = newDbMessages .Select(w => _options.DataConverter.Deserialize(w.Message)) .ToList(); + if (RuntimeState.ExecutionStartedEvent is not null) + { + if (RuntimeState.OrchestrationStatus is OrchestrationStatus.Completed + && deserializedMessages.Any(m => m.Event.EventType == EventType.EventRaised)) + { + // Reopen completed orchestrations after receiving an event raised + RuntimeState = new OrchestrationRuntimeState( + RuntimeState.Events.Reopen(_options.DataConverter) + ); + } + + var isRunning = RuntimeState.OrchestrationStatus is OrchestrationStatus.Running + or OrchestrationStatus.Suspended + or OrchestrationStatus.Pending; + + if (!isRunning) + { + // Discard all messages if not running + foreach (var message in newDbMessages) + { + dbContext.OrchestrationMessages.Attach(message); + dbContext.OrchestrationMessages.Remove(message); + } + newDbMessages = []; + deserializedMessages = []; + } + } + + Messages.AddRange(newDbMessages); + return deserializedMessages; } diff --git a/test/LLL.DurableTask.Tests/Storages/StorageTestBase.cs b/test/LLL.DurableTask.Tests/Storages/StorageTestBase.cs index 801c0d6..775d712 100644 --- a/test/LLL.DurableTask.Tests/Storages/StorageTestBase.cs +++ b/test/LLL.DurableTask.Tests/Storages/StorageTestBase.cs @@ -3,9 +3,11 @@ using System.Threading; using System.Threading.Tasks; using DurableTask.Core; +using DurableTask.Core.History; using DurableTask.Core.Query; using FluentAssertions; using LLL.DurableTask.Core; +using LLL.DurableTask.Core.Serializing; using LLL.DurableTask.EFCore.Polling; using LLL.DurableTask.Tests.Storage.Activities; using LLL.DurableTask.Tests.Storage.Orchestrations; @@ -159,6 +161,36 @@ public async Task TimerOrchestration_ShouldComplete() state.OrchestrationStatus.Should().Be(OrchestrationStatus.Completed); } + [Trait("Category", "Integration")] + [SkippableFact] + public async Task TimerOrchestration_ShouldTerminateProperly() + { + var taskHubClient = _host.Services.GetRequiredService(); + + var input = Guid.NewGuid(); + + var instance = await taskHubClient.CreateOrchestrationInstanceAsync( + TimerOrchestration.Name, + TimerOrchestration.Version, + input); + + await Task.Delay(TimeSpan.FromSeconds(1)); + + await taskHubClient.TerminateInstanceAsync(instance); + + var state = await taskHubClient.WaitForOrchestrationAsync(instance, FastWaitTimeout); + + state.Should().NotBeNull(); + state.OrchestrationStatus.Should().Be(OrchestrationStatus.Terminated); + + await Task.Delay(TimeSpan.FromSeconds(3)); + + var history = await taskHubClient.GetOrchestrationHistoryAsync(instance); + + var events = new TypelessJsonDataConverter().Deserialize(history); + events.Should().NotContain(x => x.EventType == EventType.TimerFired); + } + [Trait("Category", "Integration")] [SkippableTheory] [InlineData(ContinueAsNewEmptyOrchestration.Name, ContinueAsNewEmptyOrchestration.Version)]