Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,13 @@ public WorkflowInstance(WorkflowInstanceDetails details)
/// <inheritdoc />
public int CurrentHistorySize { get; private set; }

/// <inheritdoc />
public IReadOnlyCollection<Workflows.SuggestContinueAsNewReason> SuggestedContinueAsNewReasons { get; private set; } =
Array.Empty<Workflows.SuggestContinueAsNewReason>();

/// <inheritdoc />
public bool TargetWorkerDeploymentVersionChanged { get; private set; }

/// <inheritdoc />
public WorkflowUpdateInfo? CurrentUpdateInfo => CurrentUpdateInfoLocal.Value;

Expand Down Expand Up @@ -627,6 +634,11 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act)
ContinueAsNewSuggested = act.ContinueAsNewSuggested;
CurrentHistoryLength = checked((int)act.HistoryLength);
CurrentHistorySize = checked((int)act.HistorySizeBytes);
SuggestedContinueAsNewReasons = act.SuggestContinueAsNewReasons
.Select(r => (Workflows.SuggestContinueAsNewReason)(int)r)
.Where(r => r != Workflows.SuggestContinueAsNewReason.Unspecified)
.ToList();
TargetWorkerDeploymentVersionChanged = act.TargetWorkerDeploymentVersionChanged;
if (act.DeploymentVersionForCurrentTask != null)
{
CurrentDeploymentVersion = WorkerDeploymentVersion.FromBridge(act.DeploymentVersionForCurrentTask);
Expand Down Expand Up @@ -1015,6 +1027,10 @@ private async Task RunTopLevelAsync(Func<Task> func)
cmd.VersioningIntent = (Bridge.Api.Common.VersioningIntent)(int)vi;
}
#pragma warning restore CS0618
if (e.Input.Options?.InitialVersioningBehavior is { } ivb)
{
cmd.InitialVersioningBehavior = (Api.Enums.V1.ContinueAsNewVersioningBehavior)(int)ivb;
}
AddCommand(new() { ContinueAsNewWorkflowExecution = cmd });
}
catch (Exception e) when (
Expand Down
9 changes: 9 additions & 0 deletions src/Temporalio/Workflows/ContinueAsNewOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ public class ContinueAsNewOptions : ICloneable
[Obsolete("Worker Build Id versioning is deprecated in favor of Worker Deployment versioning")]
public VersioningIntent VersioningIntent { get; set; } = VersioningIntent.Unspecified;

/// <summary>
/// Gets or sets the versioning behavior for the first task of the new workflow run.
/// For example, set to <see cref="Workflows.InitialVersioningBehavior.AutoUpgrade" /> to
/// upgrade to the latest version on continue-as-new instead of inheriting the pinned
/// version from the previous run.
/// </summary>
/// <remarks>WARNING: Worker deployment based versioning is currently experimental.</remarks>
public InitialVersioningBehavior? InitialVersioningBehavior { get; set; }

/// <summary>
/// Create a shallow copy of these options.
/// </summary>
Expand Down
10 changes: 10 additions & 0 deletions src/Temporalio/Workflows/IWorkflowContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ internal interface IWorkflowContext
/// </summary>
string CurrentBuildId { get; }

/// <summary>
/// Gets value for <see cref="Workflow.SuggestedContinueAsNewReasons" />.
/// </summary>
IReadOnlyCollection<SuggestContinueAsNewReason> SuggestedContinueAsNewReasons { get; }

/// <summary>
/// Gets a value indicating whether <see cref="Workflow.TargetWorkerDeploymentVersionChanged" /> is true.
/// </summary>
bool TargetWorkerDeploymentVersionChanged { get; }

/// <summary>
/// Gets value for <see cref="Workflow.CurrentDeploymentVersion" />.
/// </summary>
Expand Down
23 changes: 23 additions & 0 deletions src/Temporalio/Workflows/InitialVersioningBehavior.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace Temporalio.Workflows
{
/// <summary>
/// Specifies the versioning behavior for the first task of a new workflow run in a
/// continue-as-new chain.
/// </summary>
/// <remarks>WARNING: Worker deployment based versioning is currently experimental.</remarks>
public enum InitialVersioningBehavior
{
/// <summary>
/// Unspecified versioning behavior; inherits from the previous run.
/// </summary>
Unspecified = Temporalio.Api.Enums.V1.ContinueAsNewVersioningBehavior.Unspecified,

/// <summary>
/// Start the new run with AutoUpgrade behavior. Use the Target Version of the workflow's
/// task queue at start-time, as AutoUpgrade workflows do. After the first workflow task
/// completes, use whatever Versioning Behavior the workflow is annotated with in the
/// workflow code.
/// </summary>
AutoUpgrade = Temporalio.Api.Enums.V1.ContinueAsNewVersioningBehavior.AutoUpgrade,
}
}
29 changes: 29 additions & 0 deletions src/Temporalio/Workflows/SuggestContinueAsNewReason.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace Temporalio.Workflows
{
/// <summary>
/// Specifies why continue-as-new is suggested.
/// </summary>
/// <remarks>WARNING: May be removed or changed in the future.</remarks>
public enum SuggestContinueAsNewReason
{
/// <summary>
/// Unspecified reason.
/// </summary>
Unspecified = Temporalio.Api.Enums.V1.SuggestContinueAsNewReason.Unspecified,

/// <summary>
/// Workflow history size is getting too large.
/// </summary>
HistorySizeTooLarge = Temporalio.Api.Enums.V1.SuggestContinueAsNewReason.HistorySizeTooLarge,

/// <summary>
/// Workflow history event count is getting too large.
/// </summary>
TooManyHistoryEvents = Temporalio.Api.Enums.V1.SuggestContinueAsNewReason.TooManyHistoryEvents,

/// <summary>
/// Workflow's count of completed plus in-flight updates is too large.
/// </summary>
TooManyUpdates = Temporalio.Api.Enums.V1.SuggestContinueAsNewReason.TooManyUpdates,
}
}
21 changes: 21 additions & 0 deletions src/Temporalio/Workflows/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,27 @@ public static string CurrentDetails
/// </remarks>
public static int CurrentHistorySize => Context.CurrentHistorySize;

/// <summary>
/// Gets the reasons why continue-as-new is suggested.
/// </summary>
/// <remarks>WARNING: May be removed or changed in the future.</remarks>
public static IReadOnlyCollection<SuggestContinueAsNewReason> SuggestedContinueAsNewReasons =>
Context.SuggestedContinueAsNewReasons;

/// <summary>
/// Gets a value indicating whether the target worker deployment version has changed for
/// this workflow since the last workflow task.
/// </summary>
/// <remarks>
/// This is only relevant for workflows using the PINNED versioning behavior with worker
/// deployment versioning. When true, the workflow's target version has changed, and the
/// workflow may want to continue-as-new with
/// <see cref="InitialVersioningBehavior.AutoUpgrade" /> to move to the new version.
/// <para>WARNING: Worker deployment based versioning is currently experimental.</para>
/// </remarks>
public static bool TargetWorkerDeploymentVersionChanged =>
Context.TargetWorkerDeploymentVersionChanged;

/// <summary>
/// Gets the current workflow update handler for the caller if any.
/// </summary>
Expand Down
51 changes: 51 additions & 0 deletions tests/Temporalio.Tests/TestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,55 @@ public static async Task<Temporalio.Api.WorkflowService.V1.SetWorkerDeploymentCu
ConflictToken = conflictToken,
});
}

public static async Task WaitForRoutingConfigPropagationAsync(
ITemporalClient client,
string deploymentName,
string expectedCurrentBuildId)
{
await AssertMore.EventuallyAsync(async () =>
{
var response = await client.WorkflowService.DescribeWorkerDeploymentAsync(
new()
{
Namespace = client.Options.Namespace,
DeploymentName = deploymentName,
});

var info = response.WorkerDeploymentInfo;
Assert.NotNull(info.RoutingConfig);
Assert.NotNull(info.RoutingConfig.CurrentDeploymentVersion);
Assert.Equal(expectedCurrentBuildId, info.RoutingConfig.CurrentDeploymentVersion.BuildId);
Assert.NotEqual(
Temporalio.Api.Enums.V1.RoutingConfigUpdateState.InProgress,
info.RoutingConfigUpdateState);
});
}

public static async Task WaitForWorkflowRunningOnVersionAsync(
ITemporalClient client,
string workflowId,
string expectedBuildId)
{
await AssertMore.EventuallyAsync(async () =>
{
var response = await client.WorkflowService.DescribeWorkflowExecutionAsync(
new()
{
Namespace = client.Options.Namespace,
Execution = new Temporalio.Api.Common.V1.WorkflowExecution
{
WorkflowId = workflowId,
},
});

var execInfo = response.WorkflowExecutionInfo;
Assert.Equal(
Temporalio.Api.Enums.V1.WorkflowExecutionStatus.Running,
execInfo.Status);
Assert.NotNull(execInfo.VersioningInfo);
Assert.NotNull(execInfo.VersioningInfo.DeploymentVersion);
Assert.Equal(expectedBuildId, execInfo.VersioningInfo.DeploymentVersion.BuildId);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -525,4 +525,94 @@ await worker.ExecuteAsync(async () =>
Temporalio.Api.Workflow.V1.VersioningOverride.OverrideOneofCase.Pinned));
});
}

[Workflow("CanVersionUpgradeWorkflow", VersioningBehavior = VersioningBehavior.Pinned)]
public class CanVersionUpgradeWorkflowV1
{
[WorkflowRun]
public async Task<string> RunAsync(int attempt)
{
if (attempt > 0)
{
return "v1.0";
}

// Sleep in a loop to trigger new WFTs that refresh the flag
while (!Workflow.TargetWorkerDeploymentVersionChanged)
{
await Workflow.DelayAsync(TimeSpan.FromMilliseconds(10));
}

// Continue-as-new with AUTO_UPGRADE to move to the new version
throw Workflow.CreateContinueAsNewException(
(CanVersionUpgradeWorkflowV1 wf) => wf.RunAsync(attempt + 1),
new ContinueAsNewOptions
{
InitialVersioningBehavior = InitialVersioningBehavior.AutoUpgrade,
});
}
}

[Workflow("CanVersionUpgradeWorkflow", VersioningBehavior = VersioningBehavior.Pinned)]
public class CanVersionUpgradeWorkflowV2
{
[WorkflowRun]
public async Task<string> RunAsync(int attempt)
{
return "v2.0";
}
}

[Fact]
public async Task ContinueAsNew_WithVersionUpgrade_MovesToNewVersion()
{
var deploymentName = $"deployment-can-{Guid.NewGuid()}";
var v1 = new WorkerDeploymentVersion(deploymentName, "1.0");
var v2 = new WorkerDeploymentVersion(deploymentName, "2.0");
var taskQueue = $"tq-{Guid.NewGuid()}";

using var worker1 = new TemporalWorker(
Client,
new TemporalWorkerOptions(taskQueue)
{
DeploymentOptions = new(v1, true),
}.AddWorkflow<CanVersionUpgradeWorkflowV1>());

using var worker2 = new TemporalWorker(
Client,
new TemporalWorkerOptions(taskQueue)
{
DeploymentOptions = new(v2, true),
}.AddWorkflow<CanVersionUpgradeWorkflowV2>());

var testTask = ExecuteTest();
await Task.WhenAll(
worker1.ExecuteAsync(() => testTask),
worker2.ExecuteAsync(() => testTask));

async Task ExecuteTest()
{
// Wait for v1 to be visible and set as current
var describe1 = await TestUtils.WaitUntilWorkerDeploymentVisibleAsync(Client, v1);
await TestUtils.SetCurrentDeploymentVersionAsync(Client, describe1.ConflictToken, v1);
await TestUtils.WaitForRoutingConfigPropagationAsync(Client, deploymentName, v1.BuildId);

// Start workflow on v1
var handle = await Client.StartWorkflowAsync(
(CanVersionUpgradeWorkflowV1 wf) => wf.RunAsync(0),
new(id: $"can-upgrade-{Guid.NewGuid()}", taskQueue: taskQueue));

await TestUtils.WaitForWorkflowRunningOnVersionAsync(Client, handle.Id, v1.BuildId);

// Wait for v2 to be visible and set as current (this changes the target version)
var describe2 = await TestUtils.WaitUntilWorkerDeploymentVisibleAsync(Client, v2);
await TestUtils.SetCurrentDeploymentVersionAsync(Client, describe2.ConflictToken, v2);
await TestUtils.WaitForRoutingConfigPropagationAsync(Client, deploymentName, v2.BuildId);

// The v1 workflow should detect the version change, CAN with AUTO_UPGRADE,
// and the new run should execute on v2
var result = await handle.GetResultAsync();
Assert.Equal("v2.0", result);
}
}
}
2 changes: 1 addition & 1 deletion tests/Temporalio.Tests/WorkflowEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public async Task InitializeAsync()
{
DevServerOptions = new()
{
DownloadVersion = "v1.6.1-server-1.31.0-151.0",
DownloadVersion = "v1.6.2-server-1.31.0-151.6",
ExtraArgs = new List<string>
{
// Disable search attribute cache
Expand Down
Loading