Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ StartedActivityCancelEventRecorded -[#blue]-> TimedOut: ActivityTaskTimedOut
StartedActivityCancelEventRecorded -[#blue]-> Canceled: ActivityTaskCanceled
Canceled -[#blue]-> Canceled: ActivityTaskStarted
Canceled -[#blue]-> Canceled: ActivityTaskCompleted
Completed --> [*]
Failed --> [*]
TimedOut --> [*]
Completed --> [*]
@enduml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ CompletedCommandCreated -[#blue]-> CompletedCommandRecorded: WorkflowExecutionUp
CompletedImmediately -[#blue]-> CompletedImmediatelyAcceptCreated: CommandProtocolMessage
CompletedImmediatelyAcceptCreated -[#blue]-> CompletedImmediatelyCompleteCreated: CommandProtocolMessage
CompletedImmediatelyCompleteCreated -[#blue]-> CompletedCommandCreated: WorkflowExecutionUpdateAccepted
CompletedCommandRecorded --> [*]
Rejected --> [*]
CompletedCommandRecorded --> [*]
@enduml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Scheduled --> TimedOut: WorkflowTaskTimedOut
Started -[#blue]-> Completed: WorkflowTaskCompleted
Started -[#blue]-> Failed: WorkflowTaskFailed
Started -[#blue]-> TimedOut: WorkflowTaskTimedOut
TimedOut --> [*]
Completed --> [*]
Failed --> [*]
TimedOut --> [*]
@enduml
24 changes: 11 additions & 13 deletions crates/sdk-core/tests/integ_tests/worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
CoreWfStarter, activity_functions::StdActivities, fake_grpc_server::fake_server,
get_integ_runtime_options, get_integ_server_options, get_integ_telem_options, mock_sdk_cfg,
},
shared_tests,
shared_tests::{self, is_oversize_grpc_event},
};
use assert_matches::assert_matches;
use futures_util::FutureExt;
Expand Down Expand Up @@ -35,14 +35,14 @@ use temporalio_common::{
common::v1::WorkerVersionStamp,
enums::v1::{
EventType,
WorkflowTaskFailedCause::{self, GrpcMessageTooLarge},
WorkflowTaskFailedCause::{self},
},
failure::v1::Failure as InnerFailure,
history::v1::{
ActivityTaskScheduledEventAttributes,
history_event::{
self,
Attributes::{self as EventAttributes, WorkflowTaskFailedEventAttributes},
Attributes::{self as EventAttributes},
},
},
workflowservice::v1::{
Expand Down Expand Up @@ -238,7 +238,6 @@ async fn oversize_grpc_message() {
#[workflow_methods(factory_only)]
impl OversizeGrpcMessageWf {
#[run]
#[allow(dead_code)]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<Vec<u8>> {
if ctx.state(|wf| wf.has_run.load(Relaxed)) {
Ok(vec![])
Expand All @@ -258,15 +257,14 @@ async fn oversize_grpc_message() {
.await;
core.run_until_done().await.unwrap();

assert!(starter.get_history().await.events.iter().any(|e| {
e.event_type == EventType::WorkflowTaskFailed as i32
&& if let WorkflowTaskFailedEventAttributes(attr) = e.attributes.as_ref().unwrap() {
attr.cause == GrpcMessageTooLarge as i32
&& attr.failure.as_ref().unwrap().message == "GRPC Message too large"
} else {
false
}
}));
assert!(
starter
.get_history()
.await
.events
.iter()
.any(is_oversize_grpc_event)
);

// Verify the workflow task failure metric includes the GrpcMessageTooLarge reason
let tq = starter.get_task_queue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::common::{
};
use anyhow::anyhow;
use assert_matches::assert_matches;
use futures_util::future::join_all;
use std::{
sync::{
Arc,
Expand Down Expand Up @@ -1103,7 +1102,7 @@ async fn graceful_shutdown() {
},
)
});
join_all(act_futs).await;
temporalio_sdk::workflows::join_all(act_futs).await;
Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl CancelledWf {
#[run]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
let mut reason = "".to_string();
let cancelled = tokio::select! {
let cancelled = temporalio_sdk::workflows::select! {
_ = ctx.timer(Duration::from_secs(500)) => false,
r = ctx.cancelled() => {
reason = r;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::common::{
};
use anyhow::anyhow;
use crossbeam_queue::SegQueue;
use futures_util::{FutureExt, future::join_all};
use futures_util::FutureExt;
use rstest::Context;
use std::{
collections::HashMap,
Expand Down Expand Up @@ -124,7 +124,7 @@ impl LocalActConcurrentWithTimerWf {
LocalActivityOptions::default(),
);
let timer = ctx.timer(Duration::from_secs(1));
let _ = tokio::join!(la, timer);
let _ = temporalio_sdk::workflows::join!(la, timer);
Ok(())
}
}
Expand Down Expand Up @@ -244,7 +244,7 @@ impl LocalActFanoutWf {
})
.collect();
ctx.timer(Duration::from_secs(1)).await;
join_all(las).await;
temporalio_sdk::workflows::join_all(las).await;
Ok(())
}
}
Expand Down Expand Up @@ -831,7 +831,7 @@ async fn timer_backoff_concurrent_with_non_timer_backoff() {
..Default::default()
},
);
let (r1, r2) = tokio::join!(r1, r2);
let (r1, r2) = temporalio_sdk::workflows::join!(r1, r2);
assert!(matches!(r1, Err(ActivityExecutionError::Failed(_))));
assert!(matches!(r2, Err(ActivityExecutionError::Failed(_))));
Ok(())
Expand Down Expand Up @@ -867,8 +867,8 @@ async fn repro_nondeterminism_with_timer_bug() {
impl ReproNondeterminismWithTimerBugWf {
#[run]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
let t1 = ctx.timer(Duration::from_secs(30));
let r1 = ctx.start_local_activity(
let mut t1 = ctx.timer(Duration::from_secs(30));
let mut r1 = ctx.start_local_activity(
StdActivities::delay,
Duration::from_secs(2),
LocalActivityOptions {
Expand All @@ -883,9 +883,8 @@ async fn repro_nondeterminism_with_timer_bug() {
..Default::default()
},
);
tokio::pin!(t1);
tokio::select! {
_ = &mut t1 => {},
temporalio_sdk::workflows::select! {
_ = t1 => {},
_ = r1 => {
t1.cancel();
},
Expand Down Expand Up @@ -1022,7 +1021,7 @@ async fn la_resolve_same_time_as_other_cancel() {
impl LaResolveSameTimeAsOtherCancelWf {
#[run]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
let normal_act = ctx.start_activity(
let mut normal_act = ctx.start_activity(
DelayWithCancellation::delay,
Duration::from_secs(9),
ActivityOptions {
Expand All @@ -1035,7 +1034,7 @@ async fn la_resolve_same_time_as_other_cancel() {
ctx.timer(Duration::from_millis(1)).await;

// Start LA and cancel the activity at the same time
let local_act = ctx.start_local_activity(
let mut local_act = ctx.start_local_activity(
DelayWithCancellation::delay,
Duration::from_millis(100),
LocalActivityOptions {
Expand All @@ -1044,8 +1043,7 @@ async fn la_resolve_same_time_as_other_cancel() {
);
normal_act.cancel();
// Race them, starting a timer if LA completes first
tokio::select! {
biased;
temporalio_sdk::workflows::select! {
_ = normal_act => {},
_ = local_act => {
ctx.timer(Duration::from_millis(1)).await;
Expand Down Expand Up @@ -1207,11 +1205,10 @@ async fn local_activity_with_heartbeat_only_causes_one_wakeup() {
#[run]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<usize> {
let mut wakeup_counter = 0;
tokio::join!(
// Interestingly - needed because if the condition is polled first, we won't see
// that resolved is true.
// TODO [rust-sdk-branch] - See if we can fix this and know that we should re-poll.
biased;
// Interestingly LA munst come first because if the condition is polled first, we won't
// see that resolved is true.
// TODO [rust-sdk-branch] - See if we can fix this and know that we should re-poll.
temporalio_sdk::workflows::join!(
async {
ctx.start_local_activity(
StdActivities::delay,
Expand Down Expand Up @@ -2233,7 +2230,7 @@ async fn wft_failure_cancels_running_las() {
(),
Default::default(),
);
tokio::join!(
temporalio_sdk::workflows::join!(
async {
ctx.timer(Duration::from_secs(1)).await;
panic!("ahhh I'm failing wft")
Expand Down Expand Up @@ -2872,7 +2869,7 @@ struct TwoLaWfParallel;
impl TwoLaWfParallel {
#[run(name = DEFAULT_WORKFLOW_TYPE)]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
let _ = tokio::join!(
let _ = temporalio_sdk::workflows::join!(
ctx.start_local_activity(StdActivities::default, (), LocalActivityOptions::default()),
ctx.start_local_activity(StdActivities::default, (), LocalActivityOptions::default())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl ParallelTimerWf {
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
let t1 = ctx.timer(Duration::from_secs(1));
let t2 = ctx.timer(Duration::from_secs(1));
let _ = tokio::join!(t1, t2);
let _ = temporalio_sdk::workflows::join!(t1, t2);
Ok(())
}
}
Expand Down
40 changes: 22 additions & 18 deletions crates/sdk-core/tests/shared_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,28 @@ pub(crate) async fn grpc_message_too_large() {
// Depending on the version of server, it may terminate the workflow, or simply be a task
// failure
assert!(
events.iter().any(|e| {
// Task failure
e.event_type == EventType::WorkflowTaskFailed as i32
&& if let WorkflowTaskFailedEventAttributes(attr) = e.attributes.as_ref().unwrap() {
attr.cause == GrpcMessageTooLarge as i32
&& attr.failure.as_ref().unwrap().message == "GRPC Message too large"
} else {
false
}
// Workflow terminated
||
e.event_type == EventType::WorkflowExecutionTerminated as i32
&& if let WorkflowExecutionTerminatedEventAttributes(attr) = e.attributes.as_ref().unwrap() {
attr.reason == "GrpcMessageTooLarge"
} else {
false
}
}),
events.iter().any(is_oversize_grpc_event),
"Expected workflow task failure or termination b/c grpc message too large: {events:?}",
);
}

pub(crate) fn is_oversize_grpc_event(
e: &temporalio_common::protos::temporal::api::history::v1::HistoryEvent,
) -> bool {
// Task failure
e.event_type == EventType::WorkflowTaskFailed as i32
&& if let WorkflowTaskFailedEventAttributes(attr) = e.attributes.as_ref().unwrap() {
attr.cause == GrpcMessageTooLarge as i32
&& attr.failure.as_ref().unwrap().message == "GRPC Message too large"
} else {
false
}
// Workflow terminated
||
e.event_type == EventType::WorkflowExecutionTerminated as i32
&& if let WorkflowExecutionTerminatedEventAttributes(attr) = e.attributes.as_ref().unwrap() {
attr.reason == "GrpcMessageTooLarge"
} else {
false
}
}
2 changes: 1 addition & 1 deletion crates/sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async-trait = "0.1"
anyhow = "1.0"
bon = { workspace = true }
derive_more = { workspace = true }
futures-util = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false, features = ["async-await-macro"] }
gethostname = "1.0.2"
parking_lot = { version = "0.12", features = ["send_guard"] }
prost-types = { workspace = true }
Expand Down
14 changes: 10 additions & 4 deletions crates/sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,13 @@ Workflow code must be deterministic. This means:
- No threading or random number generation
- No access to system time (use `ctx.workflow_time()` instead)
- No global mutable state
- Future select! and join! should always used `biased` - we will provide first-class APIs in the
future for these purposes. All interactions with futures should be done deterministically.
- **Do not use `tokio` or `futures` concurrency primitives directly in workflow code.** Many of them
(e.g. `tokio::select!`, `tokio::spawn`, `futures::select!`) introduce nondeterministic behavior
that will break workflow replay. Instead, use the deterministic wrappers provided in
`temporalio_sdk::workflows`:
- `select!` — deterministic select (polls in declaration order)
- `join!` — deterministic join for a fixed number of futures
- `join_all` — deterministic join for a dynamic collection of futures

### Timers

Expand Down Expand Up @@ -260,12 +265,13 @@ Workflows and activities support cancellation. Note that in an activity, you mus
heartbeat with `ctx.record_heartbeat(...)` to receive cancellations.

```rust
use temporalio_sdk::workflows::select;

// In a workflow: wait for cancellation
let reason = ctx.cancelled().await;

// Race a timer against cancellation
tokio::select! {
biased;
select! {
_ = ctx.timer(Duration::from_secs(60)) => { /* timer fired */ }
reason = ctx.cancelled() => { /* workflow cancelled */ }
}
Expand Down
16 changes: 16 additions & 0 deletions crates/sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ mod workflow_context;
mod workflow_future;
pub mod workflows;

#[macro_export]
#[doc(hidden)]
macro_rules! __temporal_select {
($($tokens:tt)*) => {
::futures_util::select_biased! { $($tokens)* }
};
}

#[macro_export]
#[doc(hidden)]
macro_rules! __temporal_join {
($($tokens:tt)*) => {
::futures_util::join!($($tokens)*)
};
}

use workflow_future::WorkflowFunction;

pub use temporalio_client::Namespace;
Expand Down
Loading
Loading