Skip to content

Parallel Steps — wait_all

Zart supports running multiple steps in parallel via ctx.schedule_step() and ctx.wait_all(). Each scheduled step becomes an independent task in the scheduler — it can run on a different worker node and is fully durable.

use zart::{TaskHandler, TaskContext, TaskError, Scheduler};
use async_trait::async_trait;
pub struct OnboardingTask;
#[async_trait]
impl TaskHandler for OnboardingTask {
type Data = OnboardingData;
type Output = ();
async fn run(
&self,
ctx: &mut TaskContext<impl Scheduler>,
data: OnboardingData,
) -> Result<(), TaskError> {
// Schedule three parallel steps
let h1 = ctx.schedule_step("send-email", || async {
send_email(&data.email).await
});
let h2 = ctx.schedule_step("setup-billing", || async {
setup_stripe_customer(&data.email).await
});
let h3 = ctx.schedule_step("provision-resources", || async {
provision_aws(&data.user_id).await
});
// Durably wait for all three to complete
let _results = ctx.wait_all(vec![h1, h2, h3]).await?;
// This line only runs after all three succeed
ctx.step("send-ready-notification", || async {
notify_user_ready(&data.email).await
}).await?;
Ok(())
}
}

Each ctx.schedule_step() call:

  1. Inserts a new child task into zart_executions with a reference back to the parent.
  2. Returns a StepHandle — a token identifying that scheduled task.
  3. The child task can be picked up by any available worker, potentially running on a different machine.

ctx.wait_all(handles):

  1. Marks the parent execution as waiting in the database.
  2. Returns the parent to the queue — no worker thread is blocked.
  3. When all child tasks complete, the scheduler re-queues the parent.
  4. On resumption, wait_all returns the collected results.

Each scheduled step returns its output through the wait_all result vector, in the same order as the handles:

#[derive(Serialize, Deserialize)]
struct EmailResult { message_id: String }
#[derive(Serialize, Deserialize)]
struct BillingResult { customer_id: String }
#[derive(Serialize, Deserialize)]
struct ProvisionResult { instance_id: String }
// ...
let h1 = ctx.schedule_step("send-email", || async {
send_email(&data.email).await
});
let h2 = ctx.schedule_step("setup-billing", || async {
setup_stripe(&data.email).await
});
let results = ctx.wait_all(vec![h1, h2]).await?;
let email_result: EmailResult = results[0].deserialize()?;
let billing_result: BillingResult = results[1].deserialize()?;
println!("Customer ID: {}", billing_result.customer_id);

If any child task fails (returns Err), wait_all returns that error. Other in-flight tasks are not cancelled — they run to completion (or failure), but their results are not returned to the parent.

let results = ctx.wait_all(vec![h1, h2, h3]).await;
match results {
Ok(outputs) => { /* all succeeded */ }
Err(e) => {
// At least one child failed
// The parent can retry — already-completed children won't re-run
return Err(e);
}
}

Schedule a variable number of parallel tasks based on runtime data:

#[async_trait]
impl TaskHandler for ProcessRegionsTask {
type Data = MultiRegionJob;
type Output = Vec<RegionResult>;
async fn run(
&self,
ctx: &mut TaskContext<impl Scheduler>,
data: MultiRegionJob,
) -> Result<Self::Output, TaskError> {
// Schedule one task per region dynamically
let handles: Vec<_> = data.regions
.iter()
.map(|region| {
let region = region.clone();
ctx.schedule_step(
&format!("process-{region}"),
move || {
let r = region.clone();
async move { process_region(&r).await }
},
)
})
.collect();
let raw_results = ctx.wait_all(handles).await?;
let results: Vec<RegionResult> = raw_results
.into_iter()
.map(|r| r.deserialize())
.collect::<Result<_, _>>()?;
Ok(results)
}
}

Parallel steps can themselves schedule further parallel steps, creating a tree of concurrent work:

// Parent: fans out to two pipelines
ctx.schedule_step("pipeline-a", || async { run_pipeline_a().await });
ctx.schedule_step("pipeline-b", || async { run_pipeline_b().await });
// Inside run_pipeline_a — further parallelism
ctx.schedule_step("pipeline-a-step-1", || async { ... });
ctx.schedule_step("pipeline-a-step-2", || async { ... });

Each level is fully durable and can span multiple worker nodes.

ctx.step()ctx.schedule_step() + wait_all
ExecutionSequential, in-processParallel, potentially cross-worker
DurabilityYesYes
Result storageYesYes
Best forSequential workIndependent, parallelizable work
BlockingAwaits immediatelyAwaits all at wait_all