Skip to content

TaskHandler Trait

TaskHandler is the trait you implement to define a durable workflow. Zart calls run each time a worker claims your execution. Completed steps are skipped automatically — only the next pending step executes.

#[async_trait]
pub trait TaskHandler: Send + Sync {
type Data: serde::de::DeserializeOwned + Send + Sync;
type Output: serde::Serialize + Send + Sync;
async fn run(
&self,
ctx: &mut TaskContext<impl Scheduler>,
data: Self::Data,
) -> Result<Self::Output, TaskError>;
/// Retry the full execution this many times on task-level failure. Default: 0.
fn max_retries(&self) -> usize { 0 }
/// Wall-clock timeout for the entire execution. Default: None.
fn timeout(&self) -> Option<Duration> { None }
}

Both Data and Output must implement serde traits — Zart serializes them to the database.

Register all handlers before starting a worker. The string name is how you reference the workflow when scheduling.

let mut registry = TaskRegistry::new();
registry.register("onboarding", OnboardingTask);
registry.register("checkout", CheckoutTask);

ctx.step() and ctx.step_with_retry() are what you’ll use in almost every workflow. Both follow the same rule: if a result is already stored in the database, the closure is not called.

let customer_id: String = ctx.step("create-customer", || async {
crm.create_customer(&data.email).await
}).await?;
  • The step name must be unique within the execution.
  • The closure return type must implement Serialize + DeserializeOwned.
  • On failure, the step can be retried by re-running the execution (see max_retries).

Apply a retry policy directly to the step, independent of the task-level max_retries. Use this for calls to external APIs or any operation that can transiently fail.

let order_id: String = ctx.step_with_retry(
"place-order",
RetryConfig::exponential(5, Duration::from_secs(1)),
|| async { payments_api.charge(&data.card).await },
).await?;

See RetryConfig below for all available policies.

Use these methods to pause a workflow for a fixed duration or until a specific time. The worker releases the task during the wait — it does not hold a database connection or thread.

// Pause for 30 minutes, then continue
ctx.sleep(Duration::from_secs(1800)).await?;
// Resume at a specific timestamp
ctx.sleep_until(next_billing_date).await?;

wait_for_event suspends the workflow until an external signal arrives — a human approval, a webhook callback, or a message from another system.

// Suspend until "manager-approval" event is delivered, or fail after 24h
let decision: ApprovalPayload = ctx.wait_for_event(
"manager-approval",
Some(Duration::from_secs(86_400)),
).await?;

Deliver the event from anywhere using DurableScheduler::offer_event() or via the HTTP API:

Terminal window
POST /api/v1/events/{execution_id}/manager-approval
{"approved": true, "reviewer": "bob@example.com"}

Returns Err(TaskError::EventTimeout) if the timeout elapses before the event arrives. See Wait for Event for full patterns.

Fan out independent work across multiple concurrent sub-tasks. Each schedule_step call becomes a separate row in the scheduler — sub-tasks can run on different workers simultaneously.

// Schedule three steps to run in parallel
let h1 = ctx.schedule_step("send-email", || async { send_email().await });
let h2 = ctx.schedule_step("setup-billing", || async { setup_billing().await });
let h3 = ctx.schedule_step("provision", || async { provision_resources().await });
// Durably wait for all three — survives restarts
let [email_ok, billing_ok, provision_ok] = ctx.wait_all(vec![h1, h2, h3]).await?;

See Parallel Steps for error handling and dynamic fan-out patterns.

Returns the current execution’s ID string. Useful for building idempotency keys for external API calls that must not be duplicated.

let key = format!("charge-{}", ctx.execution_id());
stripe.charge_with_idempotency_key(&card, amount, &key).await?;

A mutable JSON blob persisted alongside the execution. Use it to accumulate state across steps when you need values that aren’t step return values.

// Read current progress (default if not yet set)
let mut progress: Progress = serde_json::from_value(ctx.data().clone())
.unwrap_or_default();
progress.items_processed += 1;
ctx.set_data(serde_json::to_value(&progress)?);

Controls retry behaviour for step_with_retry and the task-level max_retries. All three constructors share the same fields: number of attempts, initial delay, and backoff multiplier.

ConstructorDelay between retries
RetryConfig::none()No retries — fail immediately
RetryConfig::fixed(n, delay)Constant delay between each attempt
RetryConfig::exponential(n, initial)Doubles each time: initial, , , …
// No retries (idempotent steps that must not be duplicated)
RetryConfig::none()
// 3 retries, 5 seconds apart
RetryConfig::fixed(3, Duration::from_secs(5))
// 5 retries: 1s, 2s, 4s, 8s, 16s
RetryConfig::exponential(5, Duration::from_secs(1))

A five-step onboarding workflow combining steps, retries, and an event wait:

use zart::{TaskHandler, TaskContext, TaskError, RetryConfig, Scheduler};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Deserialize)]
pub struct OnboardingData {
pub user_id: String,
pub email: String,
}
#[derive(Debug, Serialize)]
pub struct OnboardingResult {
pub customer_id: String,
}
pub struct OnboardingTask;
#[async_trait]
impl TaskHandler for OnboardingTask {
type Data = OnboardingData;
type Output = OnboardingResult;
async fn run(
&self,
ctx: &mut TaskContext<impl Scheduler>,
data: OnboardingData,
) -> Result<Self::Output, TaskError> {
// Step 1 — send welcome email
ctx.step("send-welcome-email", || async {
send_welcome_email(&data.email).await
}).await?;
// Step 2 — create Stripe customer (retry 3× with exponential backoff)
let customer_id: String = ctx.step_with_retry(
"create-stripe-customer",
RetryConfig::exponential(3, Duration::from_secs(2)),
|| async { create_stripe_customer(&data.email).await },
).await?;
// Step 3 — provision cloud resources
ctx.step("provision-resources", || async {
provision_aws_resources(&data.user_id, &customer_id).await
}).await?;
// Step 4 — wait up to 48h for email verification
let _: VerificationPayload = ctx.wait_for_event(
"email-verified",
Some(Duration::from_secs(172_800)),
).await?;
// Step 5 — activate account
ctx.step("activate-account", || async {
activate_account(&data.user_id).await
}).await?;
Ok(OnboardingResult { customer_id })
}
fn max_retries(&self) -> usize { 3 }
fn timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(180_000)) // 50h — covers the 48h event wait
}
}