From 895c3f4611d53ef450bf2162cd602a126c501a24 Mon Sep 17 00:00:00 2001 From: Sailesh Bellamkonda <4201725+SaileshBellamkonda@users.noreply.github.com> Date: Fri, 28 Mar 2025 03:26:21 +0530 Subject: [PATCH 1/2] Fix for orchestration with timer keeps running after termination --- .../EFCoreOrchestrationSession.cs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs b/src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs index 8c44aaf..77d6516 100644 --- a/src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs +++ b/src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs @@ -87,6 +87,20 @@ public async Task> FetchNewMessagesAsync( .ToArray(); } + var isRunning = RuntimeState.ExecutionStartedEvent == null + || RuntimeState.OrchestrationStatus is OrchestrationStatus.Running + or OrchestrationStatus.Suspended + or OrchestrationStatus.Pending; + if (!isRunning) + { + foreach (var message in newDbMessages) + { + dbContext.OrchestrationMessages.Attach(message); + dbContext.OrchestrationMessages.Remove(message); + } + newDbMessages = []; + } + Messages.AddRange(newDbMessages); var deserializedMessages = newDbMessages From 844c2e6d0cd821098182ac1d571e5c0bde093fa4 Mon Sep 17 00:00:00 2001 From: Lucas Lara Date: Mon, 28 Apr 2025 13:00:16 +0200 Subject: [PATCH 2/2] Add tests and fix reopen of orchestrations --- .../EFCoreOrchestrationService.cs | 7 +--- .../EFCoreOrchestrationSession.cs | 42 +++++++++++++------ .../Storages/StorageTestBase.cs | 32 ++++++++++++++ 3 files changed, 63 insertions(+), 18 deletions(-) diff --git a/src/LLL.DurableTask.EFCore/EFCoreOrchestrationService.cs b/src/LLL.DurableTask.EFCore/EFCoreOrchestrationService.cs index 5566c41..da3f6a0 100644 --- a/src/LLL.DurableTask.EFCore/EFCoreOrchestrationService.cs +++ b/src/LLL.DurableTask.EFCore/EFCoreOrchestrationService.cs @@ -7,7 +7,6 @@ using DurableTask.Core.History; using LLL.DurableTask.Core; using LLL.DurableTask.EFCore.Entities; -using LLL.DurableTask.EFCore.Extensions; using LLL.DurableTask.EFCore.Mappers; using LLL.DurableTask.EFCore.Polling; using Microsoft.EntityFrameworkCore; @@ -135,9 +134,7 @@ public async Task LockNextTaskOrchestrationWorkItemAs .Select(e => _options.DataConverter.Deserialize(e.Content)) .ToArray(); - var reopenedEvents = deserializedEvents.Reopen(_options.DataConverter); - - var runtimeState = new OrchestrationRuntimeState(reopenedEvents); + var runtimeState = new OrchestrationRuntimeState(deserializedEvents); var session = new EFCoreOrchestrationSession( _options, @@ -163,7 +160,7 @@ public async Task LockNextTaskOrchestrationWorkItemAs { InstanceId = instance.InstanceId, LockedUntilUtc = instance.LockedUntil, - OrchestrationRuntimeState = runtimeState, + OrchestrationRuntimeState = session.RuntimeState, NewMessages = messages, Session = session }; diff --git a/src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs b/src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs index 77d6516..a14cde2 100644 --- a/src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs +++ b/src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs @@ -4,7 +4,9 @@ using System.Threading; using System.Threading.Tasks; using DurableTask.Core; +using DurableTask.Core.History; using LLL.DurableTask.EFCore.Entities; +using LLL.DurableTask.EFCore.Extensions; using LLL.DurableTask.EFCore.Polling; using Microsoft.EntityFrameworkCore; @@ -87,26 +89,40 @@ public async Task> FetchNewMessagesAsync( .ToArray(); } - var isRunning = RuntimeState.ExecutionStartedEvent == null - || RuntimeState.OrchestrationStatus is OrchestrationStatus.Running - or OrchestrationStatus.Suspended - or OrchestrationStatus.Pending; - if (!isRunning) + var deserializedMessages = newDbMessages + .Select(w => _options.DataConverter.Deserialize(w.Message)) + .ToList(); + + if (RuntimeState.ExecutionStartedEvent is not null) { - foreach (var message in newDbMessages) + if (RuntimeState.OrchestrationStatus is OrchestrationStatus.Completed + && deserializedMessages.Any(m => m.Event.EventType == EventType.EventRaised)) { - dbContext.OrchestrationMessages.Attach(message); - dbContext.OrchestrationMessages.Remove(message); + // Reopen completed orchestrations after receiving an event raised + RuntimeState = new OrchestrationRuntimeState( + RuntimeState.Events.Reopen(_options.DataConverter) + ); + } + + var isRunning = RuntimeState.OrchestrationStatus is OrchestrationStatus.Running + or OrchestrationStatus.Suspended + or OrchestrationStatus.Pending; + + if (!isRunning) + { + // Discard all messages if not running + foreach (var message in newDbMessages) + { + dbContext.OrchestrationMessages.Attach(message); + dbContext.OrchestrationMessages.Remove(message); + } + newDbMessages = []; + deserializedMessages = []; } - newDbMessages = []; } Messages.AddRange(newDbMessages); - var deserializedMessages = newDbMessages - .Select(w => _options.DataConverter.Deserialize(w.Message)) - .ToList(); - return deserializedMessages; } diff --git a/test/LLL.DurableTask.Tests/Storages/StorageTestBase.cs b/test/LLL.DurableTask.Tests/Storages/StorageTestBase.cs index 801c0d6..775d712 100644 --- a/test/LLL.DurableTask.Tests/Storages/StorageTestBase.cs +++ b/test/LLL.DurableTask.Tests/Storages/StorageTestBase.cs @@ -3,9 +3,11 @@ using System.Threading; using System.Threading.Tasks; using DurableTask.Core; +using DurableTask.Core.History; using DurableTask.Core.Query; using FluentAssertions; using LLL.DurableTask.Core; +using LLL.DurableTask.Core.Serializing; using LLL.DurableTask.EFCore.Polling; using LLL.DurableTask.Tests.Storage.Activities; using LLL.DurableTask.Tests.Storage.Orchestrations; @@ -159,6 +161,36 @@ public async Task TimerOrchestration_ShouldComplete() state.OrchestrationStatus.Should().Be(OrchestrationStatus.Completed); } + [Trait("Category", "Integration")] + [SkippableFact] + public async Task TimerOrchestration_ShouldTerminateProperly() + { + var taskHubClient = _host.Services.GetRequiredService(); + + var input = Guid.NewGuid(); + + var instance = await taskHubClient.CreateOrchestrationInstanceAsync( + TimerOrchestration.Name, + TimerOrchestration.Version, + input); + + await Task.Delay(TimeSpan.FromSeconds(1)); + + await taskHubClient.TerminateInstanceAsync(instance); + + var state = await taskHubClient.WaitForOrchestrationAsync(instance, FastWaitTimeout); + + state.Should().NotBeNull(); + state.OrchestrationStatus.Should().Be(OrchestrationStatus.Terminated); + + await Task.Delay(TimeSpan.FromSeconds(3)); + + var history = await taskHubClient.GetOrchestrationHistoryAsync(instance); + + var events = new TypelessJsonDataConverter().Deserialize(history); + events.Should().NotContain(x => x.EventType == EventType.TimerFired); + } + [Trait("Category", "Integration")] [SkippableTheory] [InlineData(ContinueAsNewEmptyOrchestration.Name, ContinueAsNewEmptyOrchestration.Version)]