Skip to content

Commit 29738bb

Browse files
committed
Expand worker capacibilities
1 parent 27dfdc8 commit 29738bb

6 files changed

Lines changed: 31 additions & 4 deletions

File tree

src/Extensions/AzureBlobPayloads/DependencyInjection/DurableTaskWorkerBuilderExtensions.AzureBlobPayloads.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using Microsoft.DurableTask.Worker.Grpc;
99
using Microsoft.Extensions.DependencyInjection;
1010
using Microsoft.Extensions.Options;
11+
using P = Microsoft.DurableTask.Protobuf;
1112

1213
namespace Microsoft.DurableTask;
1314

@@ -77,6 +78,8 @@ static IDurableTaskWorkerBuilder UseExternalizedPayloadsCore(IDurableTaskWorkerB
7778
throw new ArgumentException(
7879
"Channel or CallInvoker must be provided to use Azure Blob Payload Externalization feature");
7980
}
81+
82+
opt.Capabilities.Add(P.WorkerCapability.LargePayloads);
8083
});
8184

8285
return builder;

src/Grpc/orchestrator_service.proto

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -692,14 +692,14 @@ message AbandonEntityTaskResponse {
692692
}
693693

694694
message SkipGracefulOrchestrationTerminationsRequest {
695-
InstanceBatch instanceBatch = 1;
696-
google.protobuf.StringValue reason = 2;
695+
InstanceBatch instanceBatch = 1;
696+
google.protobuf.StringValue reason = 2;
697697
}
698698

699699
message SkipGracefulOrchestrationTerminationsResponse {
700700
// Those instances which could not be terminated because they had locked entities at the time of this termination call,
701701
// are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged)
702-
repeated string unterminatedInstanceIds = 1;
702+
repeated string unterminatedInstanceIds = 1;
703703
}
704704

705705
service TaskHubSidecarService {
@@ -797,6 +797,10 @@ enum WorkerCapability {
797797
// When set, the service may return work items without any history events as an optimization.
798798
// It is strongly recommended that all SDKs support this capability.
799799
WORKER_CAPABILITY_HISTORY_STREAMING = 1;
800+
801+
WORKER_CAPABILITY_SCHEDULED_TASKS = 2;
802+
803+
WORKER_CAPABILITY_LARGE_PAYLOADS = 3;
800804
}
801805

802806
message WorkItem {

src/ScheduledTasks/Extension/DurableTaskWorkerBuilderExtensions.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33
using Microsoft.DurableTask.Worker;
4+
using Microsoft.DurableTask.Worker.Grpc;
5+
using Microsoft.Extensions.DependencyInjection;
6+
using P = Microsoft.DurableTask.Protobuf;
47

58
namespace Microsoft.DurableTask.ScheduledTasks;
69

@@ -20,5 +23,13 @@ public static void UseScheduledTasks(this IDurableTaskWorkerBuilder builder)
2023
r.AddEntity<Schedule>();
2124
r.AddOrchestrator<ExecuteScheduleOperationOrchestrator>();
2225
});
26+
27+
// Register the feature for gRPC workers
28+
builder.Services
29+
.AddOptions<GrpcDurableTaskWorkerOptions>(builder.Name)
30+
.PostConfigure(opt =>
31+
{
32+
opt.Capabilities.Add(P.WorkerCapability.ScheduledTasks);
33+
});
2334
}
2435
}

src/ScheduledTasks/ScheduledTasks.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
<ProjectReference Include="../Abstractions/Abstractions.csproj" />
1212
<ProjectReference Include="../Client/Core/Client.csproj" />
1313
<ProjectReference Include="../Worker/Core/Worker.csproj" />
14+
<ProjectReference Include="../Worker/Grpc/Worker.Grpc.csproj" />
1415
</ItemGroup>
1516

1617
<ItemGroup>

src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ async ValueTask<OrchestrationRuntimeState> BuildRuntimeStateAsync(
254254
workerOptions.Concurrency.MaximumConcurrentOrchestrationWorkItems,
255255
MaxConcurrentEntityWorkItems =
256256
workerOptions.Concurrency.MaximumConcurrentEntityWorkItems,
257-
Capabilities = { P.WorkerCapability.HistoryStreaming },
257+
Capabilities = { this.worker.grpcOptions.Capabilities },
258258
},
259259
cancellationToken: cancellation);
260260
}

src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4+
using P = Microsoft.DurableTask.Protobuf;
5+
46
namespace Microsoft.DurableTask.Worker.Grpc;
57

68
/// <summary>
@@ -23,6 +25,12 @@ public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions
2325
/// </summary>
2426
public CallInvoker? CallInvoker { get; set; }
2527

28+
/// <summary>
29+
/// Gets the collection of capabilities enabled on this worker.
30+
/// Capabilities are announced to the backend on connection.
31+
/// </summary>
32+
public HashSet<P.WorkerCapability> Capabilities { get; } = new() { P.WorkerCapability.HistoryStreaming };
33+
2634
/// <summary>
2735
/// Gets the internal protocol options. These are used to control backend-dependent features.
2836
/// </summary>

0 commit comments

Comments
 (0)