Skip to content

Commit 4001f09

Browse files
committed
Fix #668: Change work item filters from auto opt-in to explicit opt-in
Work item filters were auto-generated from the DurableTaskRegistry by default (introduced in PR #616). This caused a breaking behavioral change where orchestrations calling unregistered entity/activity types would hang indefinitely instead of failing with an error. The root cause: when filters are active, the DTS scheduler only dispatches work items to workers whose filters match. If an orchestration calls an unregistered entity type, no worker has a matching filter, so the work item sits in the base queue forever no worker reads it. Fix: Remove auto-generation of work item filters from the registry. By default, no filters are sent and the worker processes all work items (legacy pre-1.22.0 behavior). Users can explicitly opt-in to filtering by calling: builder.UseWorkItemFilters() // auto-generate from registry builder.UseWorkItemFilters(customFilters) // explicit filters Also includes: - New parameterless UseWorkItemFilters() overload for easy opt-in - Updated unit tests to reflect opt-in semantics - E2E regression test against real DTS scheduler
1 parent edf73ee commit 4001f09

7 files changed

Lines changed: 381 additions & 36 deletions

File tree

src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,10 @@ public static IDurableTaskWorkerBuilder UseOrchestrationFilter(this IDurableTask
143143
/// </summary>
144144
/// <param name="builder">The builder to set the builder target for.</param>
145145
/// <param name="workItemFilters">The instance of a <see cref="DurableTaskWorkerWorkItemFilters"/> to use.
146-
/// If <c>null</c>, the auto-generated default filters will be cleared.</param>
146+
/// If <c>null</c>, any previously configured filters will be cleared and filtering will be disabled.</param>
147147
/// <returns>The same <see cref="IDurableTaskWorkerBuilder"/> instance, allowing for method chaining.</returns>
148-
/// <remarks>Work item filters are auto-generated from the registry by default.
149-
/// Use this method with explicit filters to override the defaults, or with <c>null</c> to opt out of filtering entirely.</remarks>
148+
/// <remarks>By default, no work item filters are applied and the worker processes all work items.
149+
/// Use this method with explicit filters to enable filtering, or with <c>null</c> to disable filtering.</remarks>
150150
public static IDurableTaskWorkerBuilder UseWorkItemFilters(this IDurableTaskWorkerBuilder builder, DurableTaskWorkerWorkItemFilters? workItemFilters)
151151
{
152152
Check.NotNull(builder);
@@ -172,4 +172,43 @@ public static IDurableTaskWorkerBuilder UseWorkItemFilters(this IDurableTaskWork
172172

173173
return builder;
174174
}
175+
176+
/// <summary>
177+
/// Enables work item filtering by auto-generating filters from the <see cref="DurableTaskRegistry"/>.
178+
/// When enabled, the backend will only dispatch work items for registered orchestrations, activities,
179+
/// and entities to this worker.
180+
/// </summary>
181+
/// <param name="builder">The builder to set the builder target for.</param>
182+
/// <returns>The same <see cref="IDurableTaskWorkerBuilder"/> instance, allowing for method chaining.</returns>
183+
/// <remarks>
184+
/// <para>
185+
/// Work item filtering can improve efficiency in multi-worker deployments by ensuring each worker
186+
/// only receives work items it can handle. However, orchestrations that call unregistered types
187+
/// (e.g., an entity, activity, or sub-orchestrator not registered with any worker) will hang
188+
/// indefinitely instead of failing with an error.
189+
/// </para>
190+
/// <para>
191+
/// Only use this method when all task types referenced by orchestrations are guaranteed to be
192+
/// registered with at least one connected worker.
193+
/// </para>
194+
/// </remarks>
195+
public static IDurableTaskWorkerBuilder UseWorkItemFilters(this IDurableTaskWorkerBuilder builder)
196+
{
197+
Check.NotNull(builder);
198+
199+
builder.Services.AddOptions<DurableTaskWorkerWorkItemFilters>(builder.Name)
200+
.PostConfigure<IOptionsMonitor<DurableTaskRegistry>, IOptionsMonitor<DurableTaskWorkerOptions>>(
201+
(opts, registryMonitor, workerOptionsMonitor) =>
202+
{
203+
DurableTaskRegistry registry = registryMonitor.Get(builder.Name);
204+
DurableTaskWorkerOptions workerOptions = workerOptionsMonitor.Get(builder.Name);
205+
DurableTaskWorkerWorkItemFilters generated =
206+
DurableTaskWorkerWorkItemFilters.FromDurableTaskRegistry(registry, workerOptions);
207+
opts.Orchestrations = generated.Orchestrations;
208+
opts.Activities = generated.Activities;
209+
opts.Entities = generated.Entities;
210+
});
211+
212+
return builder;
213+
}
175214
}

src/Worker/Core/DependencyInjection/ServiceCollectionExtensions.cs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -86,21 +86,6 @@ static IServiceCollection ConfigureDurableOptions(IServiceCollection services, s
8686
}
8787
});
8888

89-
// Auto-generate work item filters from the registry by default.
90-
// Users can override these by calling UseWorkItemFilters(customFilters) on the builder.
91-
services.AddOptions<DurableTaskWorkerWorkItemFilters>(name)
92-
.Configure<IOptionsMonitor<DurableTaskRegistry>, IOptionsMonitor<DurableTaskWorkerOptions>>(
93-
(opts, registryMonitor, workerOptionsMonitor) =>
94-
{
95-
DurableTaskRegistry registry = registryMonitor.Get(name);
96-
DurableTaskWorkerOptions workerOptions = workerOptionsMonitor.Get(name);
97-
DurableTaskWorkerWorkItemFilters generated =
98-
DurableTaskWorkerWorkItemFilters.FromDurableTaskRegistry(registry, workerOptions);
99-
opts.Orchestrations = generated.Orchestrations;
100-
opts.Activities = generated.Activities;
101-
opts.Entities = generated.Entities;
102-
});
103-
10489
return services;
10590
}
10691

src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ namespace Microsoft.DurableTask.Worker;
66
/// <summary>
77
/// A class that represents work item filters for a Durable Task Worker. These filters are passed to the backend
88
/// and only work items matching the filters will be processed by the worker. If no filters are provided,
9-
/// the worker will process all work items. By default, these are auto-generated from the registered orchestrations,
10-
/// activities, and entities in the <see cref="DurableTaskRegistry"/>. To opt-out of filters, provide a <c>null</c>
11-
/// value to the <see cref="DurableTaskWorkerBuilderExtensions.UseWorkItemFilters"/> method when configuring the worker.
9+
/// the worker will process all work items. To opt-in to work item filtering, call
10+
/// <see cref="DurableTaskWorkerBuilderExtensions.UseWorkItemFilters"/> on the worker builder with either
11+
/// explicit filters or auto-generated filters from the <see cref="DurableTaskRegistry"/>.
1212
/// </summary>
1313
public class DurableTaskWorkerWorkItemFilters
1414
{
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
// End-to-end regression test for https://github.com/microsoft/durabletask-dotnet/issues/668
5+
//
6+
// This test validates that calling an unregistered entity type from an orchestration
7+
// correctly fails with an EntityOperationFailedException instead of hanging indefinitely.
8+
//
9+
// Prerequisites:
10+
// Set the DTS_CONNECTION_STRING environment variable to a valid DTS connection string.
11+
// Example: Endpoint=https://myscheduler.eastasia.durabletask.io;Authentication=DefaultAzure;TaskHub=myHub
12+
//
13+
// Usage:
14+
// dotnet run --project test/ManualE2ETests/WorkItemFilterRegression
15+
16+
using Microsoft.DurableTask;
17+
using Microsoft.DurableTask.Client;
18+
using Microsoft.DurableTask.Client.AzureManaged;
19+
using Microsoft.DurableTask.Entities;
20+
using Microsoft.DurableTask.Worker;
21+
using Microsoft.DurableTask.Worker.AzureManaged;
22+
using Microsoft.Extensions.DependencyInjection;
23+
using Microsoft.Extensions.Hosting;
24+
using Microsoft.Extensions.Logging;
25+
26+
string connectionString = Environment.GetEnvironmentVariable("DTS_CONNECTION_STRING")
27+
?? throw new InvalidOperationException(
28+
"DTS_CONNECTION_STRING environment variable is not set. "
29+
+ "Example: Endpoint=https://myscheduler.eastasia.durabletask.io;Authentication=DefaultAzure;TaskHub=myHub");
30+
31+
Console.WriteLine("=== Work Item Filter Regression Test (Issue #668) ===");
32+
Console.WriteLine($"Connection: {MaskConnectionString(connectionString)}");
33+
Console.WriteLine();
34+
35+
HostApplicationBuilder builder = Host.CreateApplicationBuilder(args);
36+
37+
builder.Logging.SetMinimumLevel(LogLevel.Warning);
38+
39+
// Register worker with only one entity type ("Counter"), but the orchestration
40+
// will attempt to call an entity type ("UnregisteredEntity") that is NOT registered.
41+
builder.Services.AddDurableTaskWorker(workerBuilder =>
42+
{
43+
workerBuilder.AddTasks(registry =>
44+
{
45+
registry.AddOrchestrator<CallUnregisteredEntityOrchestrator>();
46+
registry.AddOrchestrator<CallRegisteredEntityOrchestrator>();
47+
registry.AddActivity<NoOpActivity>();
48+
registry.AddEntity<Counter>();
49+
});
50+
workerBuilder.UseDurableTaskScheduler(connectionString);
51+
});
52+
53+
builder.Services.AddDurableTaskClient(clientBuilder =>
54+
{
55+
clientBuilder.UseDurableTaskScheduler(connectionString);
56+
});
57+
58+
using IHost host = builder.Build();
59+
await host.StartAsync();
60+
61+
await using DurableTaskClient client = host.Services.GetRequiredService<DurableTaskClient>();
62+
63+
int passed = 0;
64+
int failed = 0;
65+
66+
// Test 1: Calling an UNREGISTERED entity should fail with an error, not hang.
67+
await RunTestAsync("CallEntityAsync targeting unregistered entity fails with error", async () =>
68+
{
69+
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
70+
nameof(CallUnregisteredEntityOrchestrator));
71+
72+
OrchestrationMetadata? result = await client.WaitForInstanceCompletionAsync(
73+
instanceId, getInputsAndOutputs: true, cancellation: new CancellationTokenSource(TimeSpan.FromSeconds(30)).Token);
74+
75+
if (result == null)
76+
{
77+
throw new Exception("Orchestration result was null.");
78+
}
79+
80+
if (result.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
81+
{
82+
string? output = result.ReadOutputAs<string>();
83+
Console.WriteLine($" Orchestration completed with output: {output}");
84+
85+
// The orchestration is designed to catch the exception and return a success message.
86+
if (output?.Contains("EntityOperationFailedException") == true
87+
|| output?.Contains("EntityTaskNotFound") == true)
88+
{
89+
Console.WriteLine(" PASS: Orchestration correctly caught the entity error.");
90+
}
91+
else
92+
{
93+
throw new Exception($"Unexpected output: {output}");
94+
}
95+
}
96+
else if (result.RuntimeStatus == OrchestrationRuntimeStatus.Failed)
97+
{
98+
// Also acceptable — the error propagated as a failure.
99+
Console.WriteLine($" Orchestration failed (expected). FailureDetails: {result.FailureDetails?.ErrorMessage}");
100+
}
101+
else
102+
{
103+
throw new Exception($"Unexpected status: {result.RuntimeStatus}");
104+
}
105+
});
106+
107+
// Test 2: Calling a REGISTERED entity should succeed normally.
108+
await RunTestAsync("CallEntityAsync targeting registered entity succeeds", async () =>
109+
{
110+
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
111+
nameof(CallRegisteredEntityOrchestrator));
112+
113+
OrchestrationMetadata? result = await client.WaitForInstanceCompletionAsync(
114+
instanceId, getInputsAndOutputs: true, cancellation: new CancellationTokenSource(TimeSpan.FromSeconds(30)).Token);
115+
116+
if (result == null)
117+
{
118+
throw new Exception("Orchestration result was null.");
119+
}
120+
121+
if (result.RuntimeStatus != OrchestrationRuntimeStatus.Completed)
122+
{
123+
throw new Exception($"Expected Completed but got {result.RuntimeStatus}. FailureDetails: {result.FailureDetails?.ErrorMessage}");
124+
}
125+
126+
string? output = result.ReadOutputAs<string>();
127+
Console.WriteLine($" Orchestration completed with output: {output}");
128+
});
129+
130+
// Test 3: Calling an unregistered activity should fail with an error, not hang.
131+
await RunTestAsync("CallActivityAsync targeting unregistered activity fails with error", async () =>
132+
{
133+
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
134+
"CallUnregisteredActivityOrchestrator");
135+
136+
OrchestrationMetadata? result = await client.WaitForInstanceCompletionAsync(
137+
instanceId, getInputsAndOutputs: true, cancellation: new CancellationTokenSource(TimeSpan.FromSeconds(30)).Token);
138+
139+
if (result == null)
140+
{
141+
throw new Exception("Orchestration result was null.");
142+
}
143+
144+
// This orchestration is NOT registered, so the work item should be dispatched to the worker,
145+
// which will fail with OrchestratorTaskNotFound. This is expected behavior.
146+
if (result.RuntimeStatus == OrchestrationRuntimeStatus.Failed)
147+
{
148+
Console.WriteLine($" Orchestration failed as expected: {result.FailureDetails?.ErrorType}");
149+
}
150+
else
151+
{
152+
throw new Exception($"Expected Failed but got {result.RuntimeStatus}");
153+
}
154+
});
155+
156+
Console.WriteLine();
157+
Console.WriteLine($"=== Results: {passed} passed, {failed} failed ===");
158+
Console.WriteLine();
159+
160+
await host.StopAsync();
161+
Environment.Exit(failed > 0 ? 1 : 0);
162+
163+
async Task RunTestAsync(string testName, Func<Task> testAction)
164+
{
165+
Console.WriteLine($"[TEST] {testName}");
166+
try
167+
{
168+
await testAction();
169+
Console.WriteLine($" RESULT: PASS");
170+
passed++;
171+
}
172+
catch (OperationCanceledException)
173+
{
174+
Console.WriteLine($" RESULT: FAIL - Timed out (orchestration hung, regression detected!)");
175+
failed++;
176+
}
177+
catch (Exception ex)
178+
{
179+
Console.WriteLine($" RESULT: FAIL - {ex.Message}");
180+
failed++;
181+
}
182+
183+
Console.WriteLine();
184+
}
185+
186+
static string MaskConnectionString(string cs)
187+
{
188+
// Mask the endpoint for security, show just enough for identification
189+
int endpointIdx = cs.IndexOf("Endpoint=", StringComparison.OrdinalIgnoreCase);
190+
if (endpointIdx >= 0)
191+
{
192+
int semicolonIdx = cs.IndexOf(';', endpointIdx + 9);
193+
string endpoint = semicolonIdx >= 0 ? cs.Substring(endpointIdx + 9, semicolonIdx - endpointIdx - 9) : cs.Substring(endpointIdx + 9);
194+
return $"Endpoint={endpoint};...";
195+
}
196+
197+
return "***";
198+
}
199+
200+
// ===== Orchestrators and entities =====
201+
202+
/// <summary>
203+
/// Orchestrator that calls an entity type that is NOT registered with the worker.
204+
/// Before the fix (issue #668), this would hang indefinitely.
205+
/// After the fix, this should fail with EntityOperationFailedException.
206+
/// </summary>
207+
sealed class CallUnregisteredEntityOrchestrator : TaskOrchestrator<object?, string>
208+
{
209+
public override async Task<string> RunAsync(TaskOrchestrationContext context, object? input)
210+
{
211+
try
212+
{
213+
EntityInstanceId unregistered = new("UnregisteredEntity", "key1");
214+
await context.Entities.CallEntityAsync<string>(unregistered, "get");
215+
return "ERROR: CallEntityAsync did not throw for unregistered entity";
216+
}
217+
catch (EntityOperationFailedException ex)
218+
{
219+
return $"OK: Got EntityOperationFailedException - {ex.FailureDetails.ErrorType}: {ex.FailureDetails.ErrorMessage}";
220+
}
221+
catch (Exception ex)
222+
{
223+
return $"OK: Got exception - {ex.GetType().Name}: {ex.Message}";
224+
}
225+
}
226+
}
227+
228+
/// <summary>
229+
/// Orchestrator that calls a registered entity type. This should succeed.
230+
/// </summary>
231+
sealed class CallRegisteredEntityOrchestrator : TaskOrchestrator<object?, string>
232+
{
233+
public override async Task<string> RunAsync(TaskOrchestrationContext context, object? input)
234+
{
235+
EntityInstanceId counter = new(nameof(Counter), Guid.NewGuid().ToString("N"));
236+
await context.Entities.CallEntityAsync(counter, "add", 5);
237+
int result = await context.Entities.CallEntityAsync<int>(counter, "get");
238+
return $"Counter value: {result}";
239+
}
240+
}
241+
242+
/// <summary>
243+
/// A simple counter entity used for testing registered entity calls.
244+
/// </summary>
245+
sealed class Counter : TaskEntity<int>
246+
{
247+
public int Add(int value)
248+
{
249+
this.State += value;
250+
return this.State;
251+
}
252+
253+
public int Get() => this.State;
254+
}
255+
256+
/// <summary>
257+
/// A no-op activity for testing.
258+
/// </summary>
259+
sealed class NoOpActivity : TaskActivity<string?, string>
260+
{
261+
public override Task<string> RunAsync(TaskActivityContext context, string? input)
262+
{
263+
return Task.FromResult("done");
264+
}
265+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<Nullable>enable</Nullable>
7+
<ImplicitUsings>enable</ImplicitUsings>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<PackageReference Include="Azure.Identity" />
12+
<PackageReference Include="Microsoft.Extensions.Hosting" />
13+
</ItemGroup>
14+
15+
<ItemGroup>
16+
<ProjectReference Include="..\..\..\src\Client\AzureManaged\Client.AzureManaged.csproj" />
17+
<ProjectReference Include="..\..\..\src\Worker\AzureManaged\Worker.AzureManaged.csproj" />
18+
</ItemGroup>
19+
20+
</Project>

0 commit comments

Comments
 (0)