Execution Management
DurableScheduler is the high-level entry point for interacting with durable executions. It wraps the underlying Scheduler and provides execution-aware operations.
Construction
Section titled “Construction”let durable = DurableScheduler::new(scheduler.clone());The scheduler is typically an Arc<PostgresScheduler>. The DurableScheduler borrows it via Arc so it can be shared between your web server (scheduling) and the worker (executing).
Starting an Execution
Section titled “Starting an Execution”Start a new durable execution with raw JSON data.
async fn start( &self, execution_id: &str, task_name: &str, data: serde_json::Value,) -> Result<ScheduleResult, SchedulerError>Use this when the input is already a serde_json::Value or when you’re building tooling on top of Zart.
start_for (Recommended)
Section titled “start_for (Recommended)”Start a new durable execution with typed input data, inferred from the handler.
async fn start_for<H: DurableExecution>( &self, execution_id: &str, task_name: &str, input: &H::Data,) -> Result<ScheduleResult, SchedulerError>The execution_id serves as an idempotency key. If an execution with that ID already exists (and is not terminal), returns SchedulerError::ExecutionAlreadyExists.
durable .start_for::<OnboardingTask>("signup-user-42", "onboarding", &OnboardingData { email: "alice@example.com".into(), }) .await?;Checking Status
Section titled “Checking Status”status
Section titled “status”Get the current status of a durable execution without blocking.
async fn status( &self, execution_id: &str,) -> Result<DurableExecutionStatus, SchedulerError>let record = durable.status("signup-user-42").await?;println!("Status: {:?}", record.status);Block until the execution completes, fails, or the timeout is exceeded. Returns the full execution record.
async fn wait( &self, execution_id: &str, timeout: Duration, poll_interval: Option<Duration>,) -> Result<DurableExecutionStatus, SchedulerError>let record = durable .wait("signup-user-42", Duration::from_secs(60), None) .await?;The poll_interval controls how often the status is checked. Pass None for the default.
wait_completion (Recommended)
Section titled “wait_completion (Recommended)”Block until completion and automatically deserialize the result to your output type. This is the recommended way to wait for results — no manual serde_json::from_value needed.
async fn wait_completion<T: DeserializeOwned>( &self, execution_id: &str, timeout: Duration, poll_interval: Option<Duration>,) -> Result<T, SchedulerError>let output: OnboardingOutput = durable .wait_completion("signup-user-42", Duration::from_secs(60), None) .await?;Returns Err(SchedulerError::Deserialization(_)) if the result can’t be deserialized to T, or if the execution completed without a result.
wait_completion_with_timeout
Section titled “wait_completion_with_timeout”Same as wait_completion but caps the wait at 30 seconds.
let output: OnboardingOutput = durable .wait_completion_with_timeout("signup-user-42", Duration::from_secs(30)) .await?;wait_for
Section titled “wait_for”Wait for completion and automatically deserialize the result to the handler’s output type. Use when you started an execution earlier and now want the typed result.
async fn wait_for<H: DurableExecution>( &self, execution_id: &str, timeout: Duration,) -> Result<H::Output, SchedulerError>let output = durable .wait_for::<OnboardingTask>("signup-user-42", Duration::from_secs(60)) .await?;// output: OnboardingOutput — inferred from OnboardingTask::Outputstart_and_wait_for
Section titled “start_and_wait_for”Start an execution and wait for completion in a single call. Types are inferred from the handler.
let output = durable .start_and_wait_for::<OnboardingTask>("signup-user-42", "onboarding", &input, Duration::from_secs(60)) .await?;// output: OnboardingOutput — inferred from OnboardingTask::OutputCancelling
Section titled “Cancelling”async fn cancel( &self, execution_id: &str,) -> Result<bool, SchedulerError>Returns true if the execution was successfully cancelled, false if it was already terminal (completed, failed, or previously cancelled).
Delivering Events
Section titled “Delivering Events”offer_event
Section titled “offer_event”Deliver an event to a waiting durable execution.
async fn offer_event( &self, execution_id: &str, event_name: &str, payload: serde_json::Value,) -> Result<(), SchedulerError>durable .offer_event( &execution_id, "manager-approval", serde_json::to_value(&decision)?, ) .await?;The event is deserialized into the type expected by the corresponding zart::wait_for_event call inside the execution.
Execution Status
Section titled “Execution Status”DurableExecutionStatus contains:
| Field | Type | Description |
|---|---|---|
name | String | Registered task name |
execution_id | String | Unique execution identifier |
payload | serde_json::Value | Original input |
status | ExecutionStatus | Current state |
result | Option<serde_json::Value> | Output if completed, error info if failed |
scheduled_at | DateTime<Utc> | When the execution was first scheduled |
completed_at | Option<DateTime<Utc>> | When the execution reached a terminal state |
steps | Vec<StepStatusInfo> | Per-step status and results |
ExecutionStatus variants:
| Variant | Meaning |
|---|---|
Pending | Scheduled but not yet picked up by a worker |
Running | Actively being processed |
Waiting | Durably paused (sleep or event wait) |
Completed | Finished successfully |
Failed | Exhausted retries or unhandled error |
Cancelled | Stopped via cancel() |
Listing Executions
Section titled “Listing Executions”async fn list_executions( &self, status_filter: Option<ExecutionStatus>, task_name: Option<String>, limit: u32, offset: u32,) -> Result<Vec<DurableExecutionStatus>, SchedulerError>Query executions with optional filtering by status and task name, with pagination.
let all = durable .list_executions(None, Some("onboarding".into()), 100, 0) .await?;Next Steps
Section titled “Next Steps”- Durable Execution — the concept and lifecycle
- Free Functions —
sleep,wait_for_event,schedule,wait - Error Types —
SchedulerError,TaskError,ExecutionFailure