Parallel Steps — wait_all
Zart supports running multiple steps in parallel via ctx.schedule_step() and ctx.wait_all(). Each scheduled step becomes an independent task in the scheduler — it can run on a different worker node and is fully durable.
Basic Usage
Section titled “Basic Usage”use zart::{TaskHandler, TaskContext, TaskError, Scheduler};use async_trait::async_trait;
pub struct OnboardingTask;
#[async_trait]impl TaskHandler for OnboardingTask { type Data = OnboardingData; type Output = ();
async fn run( &self, ctx: &mut TaskContext<impl Scheduler>, data: OnboardingData, ) -> Result<(), TaskError> { // Schedule three parallel steps let h1 = ctx.schedule_step("send-email", || async { send_email(&data.email).await }); let h2 = ctx.schedule_step("setup-billing", || async { setup_stripe_customer(&data.email).await }); let h3 = ctx.schedule_step("provision-resources", || async { provision_aws(&data.user_id).await });
// Durably wait for all three to complete let _results = ctx.wait_all(vec![h1, h2, h3]).await?;
// This line only runs after all three succeed ctx.step("send-ready-notification", || async { notify_user_ready(&data.email).await }).await?;
Ok(()) }}How It Works
Section titled “How It Works”Each ctx.schedule_step() call:
- Inserts a new child task into
zart_executionswith a reference back to the parent. - Returns a
StepHandle— a token identifying that scheduled task. - The child task can be picked up by any available worker, potentially running on a different machine.
ctx.wait_all(handles):
- Marks the parent execution as waiting in the database.
- Returns the parent to the queue — no worker thread is blocked.
- When all child tasks complete, the scheduler re-queues the parent.
- On resumption,
wait_allreturns the collected results.
Getting Results
Section titled “Getting Results”Each scheduled step returns its output through the wait_all result vector, in the same order as the handles:
#[derive(Serialize, Deserialize)]struct EmailResult { message_id: String }
#[derive(Serialize, Deserialize)]struct BillingResult { customer_id: String }
#[derive(Serialize, Deserialize)]struct ProvisionResult { instance_id: String }
// ...let h1 = ctx.schedule_step("send-email", || async { send_email(&data.email).await});let h2 = ctx.schedule_step("setup-billing", || async { setup_stripe(&data.email).await});
let results = ctx.wait_all(vec![h1, h2]).await?;let email_result: EmailResult = results[0].deserialize()?;let billing_result: BillingResult = results[1].deserialize()?;
println!("Customer ID: {}", billing_result.customer_id);Error Handling
Section titled “Error Handling”If any child task fails (returns Err), wait_all returns that error. Other in-flight tasks are not cancelled — they run to completion (or failure), but their results are not returned to the parent.
let results = ctx.wait_all(vec![h1, h2, h3]).await;match results { Ok(outputs) => { /* all succeeded */ } Err(e) => { // At least one child failed // The parent can retry — already-completed children won't re-run return Err(e); }}Dynamic Fan-Out
Section titled “Dynamic Fan-Out”Schedule a variable number of parallel tasks based on runtime data:
#[async_trait]impl TaskHandler for ProcessRegionsTask { type Data = MultiRegionJob; type Output = Vec<RegionResult>;
async fn run( &self, ctx: &mut TaskContext<impl Scheduler>, data: MultiRegionJob, ) -> Result<Self::Output, TaskError> { // Schedule one task per region dynamically let handles: Vec<_> = data.regions .iter() .map(|region| { let region = region.clone(); ctx.schedule_step( &format!("process-{region}"), move || { let r = region.clone(); async move { process_region(&r).await } }, ) }) .collect();
let raw_results = ctx.wait_all(handles).await?; let results: Vec<RegionResult> = raw_results .into_iter() .map(|r| r.deserialize()) .collect::<Result<_, _>>()?;
Ok(results) }}Nesting Parallel Steps
Section titled “Nesting Parallel Steps”Parallel steps can themselves schedule further parallel steps, creating a tree of concurrent work:
// Parent: fans out to two pipelinesctx.schedule_step("pipeline-a", || async { run_pipeline_a().await });ctx.schedule_step("pipeline-b", || async { run_pipeline_b().await });
// Inside run_pipeline_a — further parallelismctx.schedule_step("pipeline-a-step-1", || async { ... });ctx.schedule_step("pipeline-a-step-2", || async { ... });Each level is fully durable and can span multiple worker nodes.
Comparison to ctx.step()
Section titled “Comparison to ctx.step()”ctx.step() | ctx.schedule_step() + wait_all | |
|---|---|---|
| Execution | Sequential, in-process | Parallel, potentially cross-worker |
| Durability | Yes | Yes |
| Result storage | Yes | Yes |
| Best for | Sequential work | Independent, parallelizable work |
| Blocking | Awaits immediately | Awaits all at wait_all |