|
1 | 1 | use crate::common::{CoreWfStarter, build_fake_sdk, init_core_and_create_wf}; |
2 | | -use std::time::Duration; |
| 2 | +use futures_util::{StreamExt, stream::FuturesUnordered}; |
| 3 | +use std::{future::Future, pin::Pin, time::Duration}; |
3 | 4 | use temporalio_client::WorkflowStartOptions; |
4 | 5 | use temporalio_common::{ |
5 | 6 | prost_dur, |
@@ -300,3 +301,43 @@ async fn cancel_before_sent_to_server() { |
300 | 301 | worker.register_workflow::<CancelBeforeSentWf>(); |
301 | 302 | worker.run().await.unwrap(); |
302 | 303 | } |
| 304 | + |
| 305 | +#[workflow] |
| 306 | +#[derive(Default)] |
| 307 | +struct WaitConditionWakerWf { |
| 308 | + done: bool, |
| 309 | +} |
| 310 | + |
| 311 | +#[workflow_methods] |
| 312 | +impl WaitConditionWakerWf { |
| 313 | + #[run(name = DEFAULT_WORKFLOW_TYPE)] |
| 314 | + async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> { |
| 315 | + let mut futs: FuturesUnordered<Pin<Box<dyn Future<Output = ()>>>> = FuturesUnordered::new(); |
| 316 | + |
| 317 | + // Future 1: await timer, then set flag via state_mut |
| 318 | + let ctx1 = ctx.clone(); |
| 319 | + futs.push(Box::pin(async move { |
| 320 | + ctx1.timer(Duration::from_millis(500)).await; |
| 321 | + ctx1.state_mut(|s| s.done = true); |
| 322 | + })); |
| 323 | + |
| 324 | + // Future 2: wait_condition on the flag (waker-dependent inside FuturesUnordered) |
| 325 | + let ctx2 = ctx.clone(); |
| 326 | + futs.push(Box::pin(async move { |
| 327 | + ctx2.wait_condition(|s| s.done).await; |
| 328 | + })); |
| 329 | + |
| 330 | + // Drive both to completion |
| 331 | + while futs.next().await.is_some() {} |
| 332 | + Ok(()) |
| 333 | + } |
| 334 | +} |
| 335 | + |
| 336 | +#[tokio::test] |
| 337 | +async fn wait_condition_waker_in_futures_unordered() { |
| 338 | + let t = canned_histories::single_timer_wf_completes("1"); |
| 339 | + let mock_cfg = MockPollCfg::from_hist_builder(t); |
| 340 | + let mut worker = build_fake_sdk(mock_cfg); |
| 341 | + worker.register_workflow::<WaitConditionWakerWf>(); |
| 342 | + worker.run().await.unwrap(); |
| 343 | +} |
0 commit comments