diff --git a/crates/sdk-core/machine_coverage/ActivityMachine_Coverage.puml b/crates/sdk-core/machine_coverage/ActivityMachine_Coverage.puml index a5b5ed085..b69458913 100644 --- a/crates/sdk-core/machine_coverage/ActivityMachine_Coverage.puml +++ b/crates/sdk-core/machine_coverage/ActivityMachine_Coverage.puml @@ -26,7 +26,7 @@ StartedActivityCancelEventRecorded -[#blue]-> TimedOut: ActivityTaskTimedOut StartedActivityCancelEventRecorded -[#blue]-> Canceled: ActivityTaskCanceled Canceled -[#blue]-> Canceled: ActivityTaskStarted Canceled -[#blue]-> Canceled: ActivityTaskCompleted -Completed --> [*] Failed --> [*] TimedOut --> [*] +Completed --> [*] @enduml \ No newline at end of file diff --git a/crates/sdk-core/machine_coverage/UpdateMachine_Coverage.puml b/crates/sdk-core/machine_coverage/UpdateMachine_Coverage.puml index 2d9f28548..dfd4e1bd2 100644 --- a/crates/sdk-core/machine_coverage/UpdateMachine_Coverage.puml +++ b/crates/sdk-core/machine_coverage/UpdateMachine_Coverage.puml @@ -14,6 +14,6 @@ CompletedCommandCreated -[#blue]-> CompletedCommandRecorded: WorkflowExecutionUp CompletedImmediately -[#blue]-> CompletedImmediatelyAcceptCreated: CommandProtocolMessage CompletedImmediatelyAcceptCreated -[#blue]-> CompletedImmediatelyCompleteCreated: CommandProtocolMessage CompletedImmediatelyCompleteCreated -[#blue]-> CompletedCommandCreated: WorkflowExecutionUpdateAccepted -CompletedCommandRecorded --> [*] Rejected --> [*] +CompletedCommandRecorded --> [*] @enduml \ No newline at end of file diff --git a/crates/sdk-core/machine_coverage/WorkflowTaskMachine_Coverage.puml b/crates/sdk-core/machine_coverage/WorkflowTaskMachine_Coverage.puml index d6fd502d6..e65467351 100644 --- a/crates/sdk-core/machine_coverage/WorkflowTaskMachine_Coverage.puml +++ b/crates/sdk-core/machine_coverage/WorkflowTaskMachine_Coverage.puml @@ -5,7 +5,7 @@ Scheduled --> TimedOut: WorkflowTaskTimedOut Started -[#blue]-> Completed: WorkflowTaskCompleted Started -[#blue]-> Failed: WorkflowTaskFailed Started -[#blue]-> TimedOut: WorkflowTaskTimedOut +TimedOut --> [*] Completed --> [*] Failed --> [*] -TimedOut --> [*] @enduml \ No newline at end of file diff --git a/crates/sdk-core/tests/integ_tests/worker_tests.rs b/crates/sdk-core/tests/integ_tests/worker_tests.rs index 7b44182ac..60def06ed 100644 --- a/crates/sdk-core/tests/integ_tests/worker_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_tests.rs @@ -3,7 +3,7 @@ use crate::{ CoreWfStarter, activity_functions::StdActivities, fake_grpc_server::fake_server, get_integ_runtime_options, get_integ_server_options, get_integ_telem_options, mock_sdk_cfg, }, - shared_tests, + shared_tests::{self, is_oversize_grpc_event}, }; use assert_matches::assert_matches; use futures_util::FutureExt; @@ -35,14 +35,14 @@ use temporalio_common::{ common::v1::WorkerVersionStamp, enums::v1::{ EventType, - WorkflowTaskFailedCause::{self, GrpcMessageTooLarge}, + WorkflowTaskFailedCause::{self}, }, failure::v1::Failure as InnerFailure, history::v1::{ ActivityTaskScheduledEventAttributes, history_event::{ self, - Attributes::{self as EventAttributes, WorkflowTaskFailedEventAttributes}, + Attributes::{self as EventAttributes}, }, }, workflowservice::v1::{ @@ -238,7 +238,6 @@ async fn oversize_grpc_message() { #[workflow_methods(factory_only)] impl OversizeGrpcMessageWf { #[run] - #[allow(dead_code)] async fn run(ctx: &mut WorkflowContext) -> WorkflowResult> { if ctx.state(|wf| wf.has_run.load(Relaxed)) { Ok(vec![]) @@ -258,15 +257,14 @@ async fn oversize_grpc_message() { .await; core.run_until_done().await.unwrap(); - assert!(starter.get_history().await.events.iter().any(|e| { - e.event_type == EventType::WorkflowTaskFailed as i32 - && if let WorkflowTaskFailedEventAttributes(attr) = e.attributes.as_ref().unwrap() { - attr.cause == GrpcMessageTooLarge as i32 - && attr.failure.as_ref().unwrap().message == "GRPC Message too large" - } else { - false - } - })); + assert!( + starter + .get_history() + .await + .events + .iter() + .any(is_oversize_grpc_event) + ); // Verify the workflow task failure metric includes the GrpcMessageTooLarge reason let tq = starter.get_task_queue(); diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/activities.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/activities.rs index dbb08a137..eb52a1d14 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/activities.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/activities.rs @@ -5,7 +5,6 @@ use crate::common::{ }; use anyhow::anyhow; use assert_matches::assert_matches; -use futures_util::future::join_all; use std::{ sync::{ Arc, @@ -1103,7 +1102,7 @@ async fn graceful_shutdown() { }, ) }); - join_all(act_futs).await; + temporalio_sdk::workflows::join_all(act_futs).await; Ok(()) } } diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/cancel_wf.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/cancel_wf.rs index 8ebbe8d1a..6ceeadf41 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/cancel_wf.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/cancel_wf.rs @@ -24,7 +24,7 @@ impl CancelledWf { #[run] async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { let mut reason = "".to_string(); - let cancelled = tokio::select! { + let cancelled = temporalio_sdk::workflows::select! { _ = ctx.timer(Duration::from_secs(500)) => false, r = ctx.cancelled() => { reason = r; diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs index b59ddfa36..2dff951a0 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs @@ -6,7 +6,7 @@ use crate::common::{ }; use anyhow::anyhow; use crossbeam_queue::SegQueue; -use futures_util::{FutureExt, future::join_all}; +use futures_util::FutureExt; use rstest::Context; use std::{ collections::HashMap, @@ -124,7 +124,7 @@ impl LocalActConcurrentWithTimerWf { LocalActivityOptions::default(), ); let timer = ctx.timer(Duration::from_secs(1)); - let _ = tokio::join!(la, timer); + let _ = temporalio_sdk::workflows::join!(la, timer); Ok(()) } } @@ -244,7 +244,7 @@ impl LocalActFanoutWf { }) .collect(); ctx.timer(Duration::from_secs(1)).await; - join_all(las).await; + temporalio_sdk::workflows::join_all(las).await; Ok(()) } } @@ -831,7 +831,7 @@ async fn timer_backoff_concurrent_with_non_timer_backoff() { ..Default::default() }, ); - let (r1, r2) = tokio::join!(r1, r2); + let (r1, r2) = temporalio_sdk::workflows::join!(r1, r2); assert!(matches!(r1, Err(ActivityExecutionError::Failed(_)))); assert!(matches!(r2, Err(ActivityExecutionError::Failed(_)))); Ok(()) @@ -867,8 +867,8 @@ async fn repro_nondeterminism_with_timer_bug() { impl ReproNondeterminismWithTimerBugWf { #[run] async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { - let t1 = ctx.timer(Duration::from_secs(30)); - let r1 = ctx.start_local_activity( + let mut t1 = ctx.timer(Duration::from_secs(30)); + let mut r1 = ctx.start_local_activity( StdActivities::delay, Duration::from_secs(2), LocalActivityOptions { @@ -883,9 +883,8 @@ async fn repro_nondeterminism_with_timer_bug() { ..Default::default() }, ); - tokio::pin!(t1); - tokio::select! { - _ = &mut t1 => {}, + temporalio_sdk::workflows::select! { + _ = t1 => {}, _ = r1 => { t1.cancel(); }, @@ -1022,7 +1021,7 @@ async fn la_resolve_same_time_as_other_cancel() { impl LaResolveSameTimeAsOtherCancelWf { #[run] async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { - let normal_act = ctx.start_activity( + let mut normal_act = ctx.start_activity( DelayWithCancellation::delay, Duration::from_secs(9), ActivityOptions { @@ -1035,7 +1034,7 @@ async fn la_resolve_same_time_as_other_cancel() { ctx.timer(Duration::from_millis(1)).await; // Start LA and cancel the activity at the same time - let local_act = ctx.start_local_activity( + let mut local_act = ctx.start_local_activity( DelayWithCancellation::delay, Duration::from_millis(100), LocalActivityOptions { @@ -1044,8 +1043,7 @@ async fn la_resolve_same_time_as_other_cancel() { ); normal_act.cancel(); // Race them, starting a timer if LA completes first - tokio::select! { - biased; + temporalio_sdk::workflows::select! { _ = normal_act => {}, _ = local_act => { ctx.timer(Duration::from_millis(1)).await; @@ -1207,11 +1205,10 @@ async fn local_activity_with_heartbeat_only_causes_one_wakeup() { #[run] async fn run(ctx: &mut WorkflowContext) -> WorkflowResult { let mut wakeup_counter = 0; - tokio::join!( - // Interestingly - needed because if the condition is polled first, we won't see - // that resolved is true. - // TODO [rust-sdk-branch] - See if we can fix this and know that we should re-poll. - biased; + // Interestingly LA munst come first because if the condition is polled first, we won't + // see that resolved is true. + // TODO [rust-sdk-branch] - See if we can fix this and know that we should re-poll. + temporalio_sdk::workflows::join!( async { ctx.start_local_activity( StdActivities::delay, @@ -2233,7 +2230,7 @@ async fn wft_failure_cancels_running_las() { (), Default::default(), ); - tokio::join!( + temporalio_sdk::workflows::join!( async { ctx.timer(Duration::from_secs(1)).await; panic!("ahhh I'm failing wft") @@ -2872,7 +2869,7 @@ struct TwoLaWfParallel; impl TwoLaWfParallel { #[run(name = DEFAULT_WORKFLOW_TYPE)] async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { - let _ = tokio::join!( + let _ = temporalio_sdk::workflows::join!( ctx.start_local_activity(StdActivities::default, (), LocalActivityOptions::default()), ctx.start_local_activity(StdActivities::default, (), LocalActivityOptions::default()) ); diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/timers.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/timers.rs index 5b96b3368..bbad6956c 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/timers.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/timers.rs @@ -142,7 +142,7 @@ impl ParallelTimerWf { async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { let t1 = ctx.timer(Duration::from_secs(1)); let t2 = ctx.timer(Duration::from_secs(1)); - let _ = tokio::join!(t1, t2); + let _ = temporalio_sdk::workflows::join!(t1, t2); Ok(()) } } diff --git a/crates/sdk-core/tests/shared_tests/mod.rs b/crates/sdk-core/tests/shared_tests/mod.rs index 6770eaf2f..5b294038a 100644 --- a/crates/sdk-core/tests/shared_tests/mod.rs +++ b/crates/sdk-core/tests/shared_tests/mod.rs @@ -67,24 +67,28 @@ pub(crate) async fn grpc_message_too_large() { // Depending on the version of server, it may terminate the workflow, or simply be a task // failure assert!( - events.iter().any(|e| { - // Task failure - e.event_type == EventType::WorkflowTaskFailed as i32 - && if let WorkflowTaskFailedEventAttributes(attr) = e.attributes.as_ref().unwrap() { - attr.cause == GrpcMessageTooLarge as i32 - && attr.failure.as_ref().unwrap().message == "GRPC Message too large" - } else { - false - } - // Workflow terminated - || - e.event_type == EventType::WorkflowExecutionTerminated as i32 - && if let WorkflowExecutionTerminatedEventAttributes(attr) = e.attributes.as_ref().unwrap() { - attr.reason == "GrpcMessageTooLarge" - } else { - false - } - }), + events.iter().any(is_oversize_grpc_event), "Expected workflow task failure or termination b/c grpc message too large: {events:?}", ); } + +pub(crate) fn is_oversize_grpc_event( + e: &temporalio_common::protos::temporal::api::history::v1::HistoryEvent, +) -> bool { + // Task failure + e.event_type == EventType::WorkflowTaskFailed as i32 + && if let WorkflowTaskFailedEventAttributes(attr) = e.attributes.as_ref().unwrap() { + attr.cause == GrpcMessageTooLarge as i32 + && attr.failure.as_ref().unwrap().message == "GRPC Message too large" + } else { + false + } + // Workflow terminated + || + e.event_type == EventType::WorkflowExecutionTerminated as i32 + && if let WorkflowExecutionTerminatedEventAttributes(attr) = e.attributes.as_ref().unwrap() { + attr.reason == "GrpcMessageTooLarge" + } else { + false + } +} diff --git a/crates/sdk/Cargo.toml b/crates/sdk/Cargo.toml index 7a5bcb476..5c46d04d6 100644 --- a/crates/sdk/Cargo.toml +++ b/crates/sdk/Cargo.toml @@ -16,7 +16,7 @@ async-trait = "0.1" anyhow = "1.0" bon = { workspace = true } derive_more = { workspace = true } -futures-util = { version = "0.3", default-features = false } +futures-util = { version = "0.3", default-features = false, features = ["async-await-macro"] } gethostname = "1.0.2" parking_lot = { version = "0.12", features = ["send_guard"] } prost-types = { workspace = true } diff --git a/crates/sdk/README.md b/crates/sdk/README.md index c3486895d..9cfd0920d 100644 --- a/crates/sdk/README.md +++ b/crates/sdk/README.md @@ -163,8 +163,13 @@ Workflow code must be deterministic. This means: - No threading or random number generation - No access to system time (use `ctx.workflow_time()` instead) - No global mutable state -- Future select! and join! should always used `biased` - we will provide first-class APIs in the - future for these purposes. All interactions with futures should be done deterministically. +- **Do not use `tokio` or `futures` concurrency primitives directly in workflow code.** Many of them + (e.g. `tokio::select!`, `tokio::spawn`, `futures::select!`) introduce nondeterministic behavior + that will break workflow replay. Instead, use the deterministic wrappers provided in + `temporalio_sdk::workflows`: + - `select!` — deterministic select (polls in declaration order) + - `join!` — deterministic join for a fixed number of futures + - `join_all` — deterministic join for a dynamic collection of futures ### Timers @@ -260,12 +265,13 @@ Workflows and activities support cancellation. Note that in an activity, you mus heartbeat with `ctx.record_heartbeat(...)` to receive cancellations. ```rust +use temporalio_sdk::workflows::select; + // In a workflow: wait for cancellation let reason = ctx.cancelled().await; // Race a timer against cancellation -tokio::select! { - biased; +select! { _ = ctx.timer(Duration::from_secs(60)) => { /* timer fired */ } reason = ctx.cancelled() => { /* workflow cancelled */ } } diff --git a/crates/sdk/src/lib.rs b/crates/sdk/src/lib.rs index b085e05b3..939bcd134 100644 --- a/crates/sdk/src/lib.rs +++ b/crates/sdk/src/lib.rs @@ -77,6 +77,22 @@ mod workflow_context; mod workflow_future; pub mod workflows; +#[macro_export] +#[doc(hidden)] +macro_rules! __temporal_select { + ($($tokens:tt)*) => { + ::futures_util::select_biased! { $($tokens)* } + }; +} + +#[macro_export] +#[doc(hidden)] +macro_rules! __temporal_join { + ($($tokens:tt)*) => { + ::futures_util::join!($($tokens)*) + }; +} + use workflow_future::WorkflowFunction; pub use temporalio_client::Namespace; diff --git a/crates/sdk/src/workflow_context.rs b/crates/sdk/src/workflow_context.rs index 18cd0e056..8abd0556b 100644 --- a/crates/sdk/src/workflow_context.rs +++ b/crates/sdk/src/workflow_context.rs @@ -11,7 +11,11 @@ use crate::{ SupportsCancelReason, TimerResult, UnblockEvent, Unblockable, workflow_context::options::IntoWorkflowCommand, }; -use futures_util::{FutureExt, future::Shared, task::Context}; +use futures_util::{ + FutureExt, + future::{FusedFuture, Shared}, + task::Context, +}; use std::{ cell::{Ref, RefCell}, collections::HashMap, @@ -592,24 +596,20 @@ impl SyncWorkflowContext { } /// A future that resolves if/when the workflow is cancelled, with the user provided cause - pub async fn cancelled(&self) -> String { - if let Some(s) = self.base.inner.am_cancelled.borrow().as_ref() { - return s.clone(); + pub fn cancelled(&self) -> impl FusedFuture + '_ { + let am_cancelled = self.base.inner.am_cancelled.clone(); + async move { + if let Some(s) = am_cancelled.borrow().as_ref() { + return s.clone(); + } + am_cancelled + .clone() + .changed() + .await + .expect("Cancelled send half not dropped"); + am_cancelled.borrow().as_ref().cloned().unwrap_or_default() } - self.base - .inner - .am_cancelled - .clone() - .changed() - .await - .expect("Cancelled send half not dropped"); - self.base - .inner - .am_cancelled - .borrow() - .as_ref() - .cloned() - .unwrap_or_default() + .fuse() } /// Request to create a timer @@ -739,7 +739,7 @@ impl SyncWorkflowContext { &self, target: NamespacedWorkflowExecution, reason: String, - ) -> impl Future { + ) -> impl FusedFuture { let seq = self .base .inner @@ -908,8 +908,8 @@ impl WorkflowContext { } /// A future that resolves if/when the workflow is cancelled, with the user provided cause - pub async fn cancelled(&self) -> String { - self.sync.cancelled().await + pub fn cancelled(&self) -> impl FusedFuture + '_ { + self.sync.cancelled() } /// Request to create a timer @@ -987,7 +987,7 @@ impl WorkflowContext { &self, target: NamespacedWorkflowExecution, reason: String, - ) -> impl Future { + ) -> impl FusedFuture { self.sync.cancel_external(target, reason) } @@ -1104,7 +1104,7 @@ pub(crate) struct WorkflowContextSharedData { /// A Future that can be cancelled. /// Used in the prototype SDK for cancelling operations like timers and activities. -pub trait CancellableFuture: Future { +pub trait CancellableFuture: Future + FusedFuture { /// Cancel this Future fn cancel(&self); } @@ -1149,8 +1149,6 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.result_rx.poll_unpin(cx).map(|x| { - // SAFETY: Because we can only enter this section once the future has resolved, we - // know it will never be polled again, therefore consuming the option is OK. let od = self .other_dat .take() @@ -1159,6 +1157,14 @@ where }) } } +impl FusedFuture for WFCommandFut +where + T: Unblockable, +{ + fn is_terminated(&self) -> bool { + self.other_dat.is_none() + } +} struct CancellableWFCommandFut { cmd_fut: WFCommandFut, @@ -1201,6 +1207,14 @@ where self.cmd_fut.poll_unpin(cx) } } +impl FusedFuture for CancellableWFCommandFut +where + T: Unblockable, +{ + fn is_terminated(&self) -> bool { + self.cmd_fut.is_terminated() + } +} impl CancellableFuture for CancellableWFCommandFut where @@ -1231,6 +1245,7 @@ struct LATimerBackoffFut { next_attempt: u32, next_sched_time: Option, did_cancel: AtomicBool, + terminated: bool, } impl LATimerBackoffFut { pub(crate) fn new( @@ -1254,6 +1269,7 @@ impl LATimerBackoffFut { next_attempt: 1, next_sched_time: None, did_cancel: AtomicBool::new(false), + terminated: false, } } } @@ -1281,6 +1297,7 @@ impl Future for LATimerBackoffFut { )); Poll::Pending } else { + self.terminated = true; Poll::Ready(ActivityResolution { status: Some( activity_resolution::Status::Cancelled(Default::default()), @@ -1299,6 +1316,7 @@ impl Future for LATimerBackoffFut { // return cancel status. This can happen if cancel comes after the LA says it wants // to back off but before we have scheduled the timer. if self.did_cancel.load(Ordering::Acquire) { + self.terminated = true; return Poll::Ready(ActivityResolution { status: Some(activity_resolution::Status::Cancelled(Default::default())), }); @@ -1315,9 +1333,17 @@ impl Future for LATimerBackoffFut { self.next_sched_time.clone_from(&b.original_schedule_time); return Poll::Pending; } + if poll_res.is_ready() { + self.terminated = true; + } poll_res } } +impl FusedFuture for LATimerBackoffFut { + fn is_terminated(&self) -> bool { + self.terminated + } +} impl CancellableFuture for LATimerBackoffFut { fn cancel(&self) { self.did_cancel.store(true, Ordering::Release); @@ -1341,6 +1367,7 @@ enum ActivityFut { payload_converter: PayloadConverter, _phantom: PhantomData, }, + Terminated, } impl ActivityFut { @@ -1370,7 +1397,8 @@ where type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.get_mut() { + let this = self.get_mut(); + let poll = match this { ActivityFut::Errored { error, .. } => { Poll::Ready(Err(error.take().expect("polled after completion"))) } @@ -1413,7 +1441,22 @@ where } }), }, + ActivityFut::Terminated => panic!("polled after termination"), + }; + if poll.is_ready() { + *this = ActivityFut::Terminated; } + poll + } +} + +impl FusedFuture for ActivityFut +where + F: Future + Unpin, + Output: TemporalDeserializable + 'static, +{ + fn is_terminated(&self) -> bool { + matches!(self, ActivityFut::Terminated) } } @@ -1423,9 +1466,8 @@ where Output: TemporalDeserializable + 'static, { fn cancel(&self) { - match self { - ActivityFut::Errored { .. } => {} - ActivityFut::Running { inner, .. } => inner.cancel(), + if let ActivityFut::Running { inner, .. } = self { + inner.cancel() } } } diff --git a/crates/sdk/src/workflows.rs b/crates/sdk/src/workflows.rs index 653c9a7d1..b70bba3c3 100644 --- a/crates/sdk/src/workflows.rs +++ b/crates/sdk/src/workflows.rs @@ -43,6 +43,74 @@ //! } //! ``` +/// Deterministic `select!` for use in Temporal workflows. +/// +/// Polls branches in declaration order (top to bottom), ensuring deterministic +/// behavior across workflow replays. Delegates to [`futures_util::select_biased!`]. +/// +/// All workflow futures (timers, activities, child workflows, etc.) implement +/// `FusedFuture`, so they can be stored in variables and passed to `select!` +/// without needing `.fuse()`. +/// +/// # Example +/// +/// ```ignore +/// use temporalio_sdk::workflows::select; +/// use temporalio_sdk::WorkflowContext; +/// use std::time::Duration; +/// +/// # async fn hidden(ctx: &mut WorkflowContext<()>) { +/// select! { +/// _ = ctx.timer(Duration::from_secs(60)) => { /* timer fired */ } +/// reason = ctx.cancelled() => { /* cancelled */ } +/// }; +/// # } +/// ``` +#[doc(inline)] +pub use crate::__temporal_select as select; + +/// Deterministic `join!` for use in Temporal workflows. +/// +/// Polls all futures concurrently to completion in declaration order, +/// ensuring deterministic behavior across workflow replays. Delegates +/// to [`futures_util::join!`]. +/// +/// # Example +/// +/// ```ignore +/// use temporalio_sdk::workflows::join; +/// +/// # async fn hidden() { +/// let future_a = async { 1 }; +/// let future_b = async { 2 }; +/// let (a, b) = join!(future_a, future_b); +/// # } +/// ``` +#[doc(inline)] +pub use crate::__temporal_join as join; + +/// Deterministic `join_all` for use in Temporal workflows. +/// +/// Polls a collection of futures concurrently to completion in declaration order, +/// returning a `Vec` of their results. Delegates to [`futures_util::future::join_all`]. +/// +/// # Example +/// +/// ```ignore +/// use temporalio_sdk::workflows::join_all; +/// use temporalio_sdk::WorkflowContext; +/// use std::time::Duration; +/// +/// # async fn hidden(ctx: &mut WorkflowContext<()>) { +/// let timers = vec![ +/// ctx.timer(Duration::from_secs(1)), +/// ctx.timer(Duration::from_secs(2)), +/// ]; +/// let results = join_all(timers).await; +/// # } +/// ``` +pub use futures_util::future::join_all; + use crate::{ BaseWorkflowContext, SyncWorkflowContext, WorkflowContext, WorkflowContextView, WorkflowTermination,