TaskHandler Trait
TaskHandler is the trait you implement to define a durable workflow. Zart calls run each time a worker claims your execution. Completed steps are skipped automatically — only the next pending step executes.
The Trait
Section titled “The Trait”#[async_trait]pub trait TaskHandler: Send + Sync { type Data: serde::de::DeserializeOwned + Send + Sync; type Output: serde::Serialize + Send + Sync;
async fn run( &self, ctx: &mut TaskContext<impl Scheduler>, data: Self::Data, ) -> Result<Self::Output, TaskError>;
/// Retry the full execution this many times on task-level failure. Default: 0. fn max_retries(&self) -> usize { 0 }
/// Wall-clock timeout for the entire execution. Default: None. fn timeout(&self) -> Option<Duration> { None }}Both Data and Output must implement serde traits — Zart serializes them to the database.
TaskRegistry
Section titled “TaskRegistry”Register all handlers before starting a worker. The string name is how you reference the workflow when scheduling.
let mut registry = TaskRegistry::new();registry.register("onboarding", OnboardingTask);registry.register("checkout", CheckoutTask);Core Steps
Section titled “Core Steps”ctx.step() and ctx.step_with_retry() are what you’ll use in almost every workflow. Both follow the same rule: if a result is already stored in the database, the closure is not called.
ctx.step()
Section titled “ctx.step()”let customer_id: String = ctx.step("create-customer", || async { crm.create_customer(&data.email).await}).await?;- The step name must be unique within the execution.
- The closure return type must implement
Serialize + DeserializeOwned. - On failure, the step can be retried by re-running the execution (see
max_retries).
ctx.step_with_retry()
Section titled “ctx.step_with_retry()”Apply a retry policy directly to the step, independent of the task-level max_retries. Use this for calls to external APIs or any operation that can transiently fail.
let order_id: String = ctx.step_with_retry( "place-order", RetryConfig::exponential(5, Duration::from_secs(1)), || async { payments_api.charge(&data.card).await },).await?;See RetryConfig below for all available policies.
Timing
Section titled “Timing”Use these methods to pause a workflow for a fixed duration or until a specific time. The worker releases the task during the wait — it does not hold a database connection or thread.
ctx.sleep()
Section titled “ctx.sleep()”// Pause for 30 minutes, then continuectx.sleep(Duration::from_secs(1800)).await?;ctx.sleep_until()
Section titled “ctx.sleep_until()”// Resume at a specific timestampctx.sleep_until(next_billing_date).await?;Events
Section titled “Events”wait_for_event suspends the workflow until an external signal arrives — a human approval, a webhook callback, or a message from another system.
ctx.wait_for_event()
Section titled “ctx.wait_for_event()”// Suspend until "manager-approval" event is delivered, or fail after 24hlet decision: ApprovalPayload = ctx.wait_for_event( "manager-approval", Some(Duration::from_secs(86_400)),).await?;Deliver the event from anywhere using DurableScheduler::offer_event() or via the HTTP API:
POST /api/v1/events/{execution_id}/manager-approval{"approved": true, "reviewer": "bob@example.com"}Returns Err(TaskError::EventTimeout) if the timeout elapses before the event arrives. See Wait for Event for full patterns.
Parallel Execution
Section titled “Parallel Execution”Fan out independent work across multiple concurrent sub-tasks. Each schedule_step call becomes a separate row in the scheduler — sub-tasks can run on different workers simultaneously.
ctx.schedule_step() / ctx.wait_all()
Section titled “ctx.schedule_step() / ctx.wait_all()”// Schedule three steps to run in parallellet h1 = ctx.schedule_step("send-email", || async { send_email().await });let h2 = ctx.schedule_step("setup-billing", || async { setup_billing().await });let h3 = ctx.schedule_step("provision", || async { provision_resources().await });
// Durably wait for all three — survives restartslet [email_ok, billing_ok, provision_ok] = ctx.wait_all(vec![h1, h2, h3]).await?;See Parallel Steps for error handling and dynamic fan-out patterns.
Metadata
Section titled “Metadata”ctx.execution_id()
Section titled “ctx.execution_id()”Returns the current execution’s ID string. Useful for building idempotency keys for external API calls that must not be duplicated.
let key = format!("charge-{}", ctx.execution_id());stripe.charge_with_idempotency_key(&card, amount, &key).await?;ctx.data() / ctx.set_data()
Section titled “ctx.data() / ctx.set_data()”A mutable JSON blob persisted alongside the execution. Use it to accumulate state across steps when you need values that aren’t step return values.
// Read current progress (default if not yet set)let mut progress: Progress = serde_json::from_value(ctx.data().clone()) .unwrap_or_default();
progress.items_processed += 1;ctx.set_data(serde_json::to_value(&progress)?);RetryConfig
Section titled “RetryConfig”Controls retry behaviour for step_with_retry and the task-level max_retries. All three constructors share the same fields: number of attempts, initial delay, and backoff multiplier.
| Constructor | Delay between retries |
|---|---|
RetryConfig::none() | No retries — fail immediately |
RetryConfig::fixed(n, delay) | Constant delay between each attempt |
RetryConfig::exponential(n, initial) | Doubles each time: initial, 2×, 4×, … |
// No retries (idempotent steps that must not be duplicated)RetryConfig::none()
// 3 retries, 5 seconds apartRetryConfig::fixed(3, Duration::from_secs(5))
// 5 retries: 1s, 2s, 4s, 8s, 16sRetryConfig::exponential(5, Duration::from_secs(1))Full Example
Section titled “Full Example”A five-step onboarding workflow combining steps, retries, and an event wait:
use zart::{TaskHandler, TaskContext, TaskError, RetryConfig, Scheduler};use async_trait::async_trait;use serde::{Deserialize, Serialize};use std::time::Duration;
#[derive(Debug, Deserialize)]pub struct OnboardingData { pub user_id: String, pub email: String,}
#[derive(Debug, Serialize)]pub struct OnboardingResult { pub customer_id: String,}
pub struct OnboardingTask;
#[async_trait]impl TaskHandler for OnboardingTask { type Data = OnboardingData; type Output = OnboardingResult;
async fn run( &self, ctx: &mut TaskContext<impl Scheduler>, data: OnboardingData, ) -> Result<Self::Output, TaskError> { // Step 1 — send welcome email ctx.step("send-welcome-email", || async { send_welcome_email(&data.email).await }).await?;
// Step 2 — create Stripe customer (retry 3× with exponential backoff) let customer_id: String = ctx.step_with_retry( "create-stripe-customer", RetryConfig::exponential(3, Duration::from_secs(2)), || async { create_stripe_customer(&data.email).await }, ).await?;
// Step 3 — provision cloud resources ctx.step("provision-resources", || async { provision_aws_resources(&data.user_id, &customer_id).await }).await?;
// Step 4 — wait up to 48h for email verification let _: VerificationPayload = ctx.wait_for_event( "email-verified", Some(Duration::from_secs(172_800)), ).await?;
// Step 5 — activate account ctx.step("activate-account", || async { activate_account(&data.user_id).await }).await?;
Ok(OnboardingResult { customer_id }) }
fn max_retries(&self) -> usize { 3 }
fn timeout(&self) -> Option<Duration> { Some(Duration::from_secs(180_000)) // 50h — covers the 48h event wait }}