Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Microsoft.DurableTask.Worker.Grpc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using P = Microsoft.DurableTask.Protobuf;

namespace Microsoft.DurableTask;

Expand Down Expand Up @@ -77,6 +78,8 @@ static IDurableTaskWorkerBuilder UseExternalizedPayloadsCore(IDurableTaskWorkerB
throw new ArgumentException(
"Channel or CallInvoker must be provided to use Azure Blob Payload Externalization feature");
}

opt.Capabilities.Add(P.WorkerCapability.LargePayloads);
});

return builder;
Expand Down
16 changes: 13 additions & 3 deletions src/Grpc/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -692,14 +692,14 @@ message AbandonEntityTaskResponse {
}

message SkipGracefulOrchestrationTerminationsRequest {
InstanceBatch instanceBatch = 1;
google.protobuf.StringValue reason = 2;
InstanceBatch instanceBatch = 1;
google.protobuf.StringValue reason = 2;
}

message SkipGracefulOrchestrationTerminationsResponse {
// Those instances which could not be terminated because they had locked entities at the time of this termination call,
// are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged)
repeated string unterminatedInstanceIds = 1;
repeated string unterminatedInstanceIds = 1;
}

service TaskHubSidecarService {
Expand Down Expand Up @@ -797,6 +797,16 @@ enum WorkerCapability {
// When set, the service may return work items without any history events as an optimization.
// It is strongly recommended that all SDKs support this capability.
WORKER_CAPABILITY_HISTORY_STREAMING = 1;

// Indicates that the worker supports scheduled tasks.
// The service may send schedule-triggered orchestration work items,
// and the worker must handle them, including the scheduledTime field.
WORKER_CAPABILITY_SCHEDULED_TASKS = 2;

// Signals that the worker can handle large payloads stored externally (e.g., Blob Storage).
// Work items may contain URI references instead of inline data, and the worker must fetch them.
// This avoids message size limits and reduces network overhead.
WORKER_CAPABILITY_LARGE_PAYLOADS = 3;
}

message WorkItem {
Expand Down
4 changes: 2 additions & 2 deletions src/Grpc/versions.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# The following files were downloaded from branch main at 2025-11-04 23:19:51 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/8c0d166673593700cfa9d0b123cd55e025b2846e/protos/orchestrator_service.proto
# The following files were downloaded from branch main at 2025-11-14 16:36:47 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/9f762f1301b91e3e7c736b9c5a29c2e09f2a850e/protos/orchestrator_service.proto
11 changes: 11 additions & 0 deletions src/ScheduledTasks/Extension/DurableTaskWorkerBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using Microsoft.DurableTask.Worker;
using Microsoft.DurableTask.Worker.Grpc;
using Microsoft.Extensions.DependencyInjection;
using P = Microsoft.DurableTask.Protobuf;

namespace Microsoft.DurableTask.ScheduledTasks;

Expand All @@ -20,5 +23,13 @@ public static void UseScheduledTasks(this IDurableTaskWorkerBuilder builder)
r.AddEntity<Schedule>();
r.AddOrchestrator<ExecuteScheduleOperationOrchestrator>();
});

// Register the capability for gRPC workers
builder.Services
.AddOptions<GrpcDurableTaskWorkerOptions>(builder.Name)
.PostConfigure(opt =>
{
opt.Capabilities.Add(P.WorkerCapability.ScheduledTasks);
});
}
}
1 change: 1 addition & 0 deletions src/ScheduledTasks/ScheduledTasks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<ProjectReference Include="../Abstractions/Abstractions.csproj" />
<ProjectReference Include="../Client/Core/Client.csproj" />
<ProjectReference Include="../Worker/Core/Worker.csproj" />
<ProjectReference Include="../Worker/Grpc/Worker.Grpc.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ async ValueTask<OrchestrationRuntimeState> BuildRuntimeStateAsync(
workerOptions.Concurrency.MaximumConcurrentOrchestrationWorkItems,
MaxConcurrentEntityWorkItems =
workerOptions.Concurrency.MaximumConcurrentEntityWorkItems,
Capabilities = { P.WorkerCapability.HistoryStreaming },
Capabilities = { this.worker.grpcOptions.Capabilities },
},
cancellationToken: cancellation);
}
Expand Down
8 changes: 8 additions & 0 deletions src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using P = Microsoft.DurableTask.Protobuf;

namespace Microsoft.DurableTask.Worker.Grpc;

/// <summary>
Expand All @@ -23,6 +25,12 @@ public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions
/// </summary>
public CallInvoker? CallInvoker { get; set; }

/// <summary>
/// Gets the collection of capabilities enabled on this worker.
/// Capabilities are announced to the backend on connection.
/// </summary>
public HashSet<P.WorkerCapability> Capabilities { get; } = new() { P.WorkerCapability.HistoryStreaming };

/// <summary>
/// Gets the internal protocol options. These are used to control backend-dependent features.
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions test/Worker/Grpc.Tests/Worker.Grpc.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,9 @@
<ProjectReference Include="$(SrcRoot)Worker/Grpc/Worker.Grpc.csproj" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net6.0'">
<ProjectReference Include="$(SrcRoot)ScheduledTasks/ScheduledTasks.csproj" />
<ProjectReference Include="$(SrcRoot)Extensions/AzureBlobPayloads/AzureBlobPayloads.csproj" />
</ItemGroup>

</Project>
Loading
Loading