Skip to content

Zart is in active development — breaking API changes may occur despite our best efforts to keep contracts stable.

Execution Management

DurableScheduler is the high-level entry point for interacting with durable executions. It wraps the underlying Scheduler and provides execution-aware operations.

let durable = DurableScheduler::new(scheduler.clone());

The scheduler is typically an Arc<PostgresScheduler>. The DurableScheduler borrows it via Arc so it can be shared between your web server (scheduling) and the worker (executing).

Start a new durable execution with raw JSON data.

async fn start(
&self,
execution_id: &str,
task_name: &str,
data: serde_json::Value,
) -> Result<ScheduleResult, SchedulerError>

Use this when the input is already a serde_json::Value or when you’re building tooling on top of Zart.

Start a new durable execution with typed input data, inferred from the handler.

async fn start_for<H: DurableExecution>(
&self,
execution_id: &str,
task_name: &str,
input: &H::Data,
) -> Result<ScheduleResult, SchedulerError>

The execution_id serves as an idempotency key. If an execution with that ID already exists (and is not terminal), returns SchedulerError::ExecutionAlreadyExists.

durable
.start_for::<OnboardingTask>("signup-user-42", "onboarding", &OnboardingData {
email: "alice@example.com".into(),
})
.await?;

Get the current status of a durable execution without blocking.

async fn status(
&self,
execution_id: &str,
) -> Result<DurableExecutionStatus, SchedulerError>
let record = durable.status("signup-user-42").await?;
println!("Status: {:?}", record.status);

Block until the execution completes, fails, or the timeout is exceeded. Returns the full execution record.

async fn wait(
&self,
execution_id: &str,
timeout: Duration,
poll_interval: Option<Duration>,
) -> Result<DurableExecutionStatus, SchedulerError>
let record = durable
.wait("signup-user-42", Duration::from_secs(60), None)
.await?;

The poll_interval controls how often the status is checked. Pass None for the default.

Block until completion and automatically deserialize the result to your output type. This is the recommended way to wait for results — no manual serde_json::from_value needed.

async fn wait_completion<T: DeserializeOwned>(
&self,
execution_id: &str,
timeout: Duration,
poll_interval: Option<Duration>,
) -> Result<T, SchedulerError>
let output: OnboardingOutput = durable
.wait_completion("signup-user-42", Duration::from_secs(60), None)
.await?;

Returns Err(SchedulerError::Deserialization(_)) if the result can’t be deserialized to T, or if the execution completed without a result.

Same as wait_completion but caps the wait at 30 seconds.

let output: OnboardingOutput = durable
.wait_completion_with_timeout("signup-user-42", Duration::from_secs(30))
.await?;

Wait for completion and automatically deserialize the result to the handler’s output type. Use when you started an execution earlier and now want the typed result.

async fn wait_for<H: DurableExecution>(
&self,
execution_id: &str,
timeout: Duration,
) -> Result<H::Output, SchedulerError>
let output = durable
.wait_for::<OnboardingTask>("signup-user-42", Duration::from_secs(60))
.await?;
// output: OnboardingOutput — inferred from OnboardingTask::Output

Start an execution and wait for completion in a single call. Types are inferred from the handler.

let output = durable
.start_and_wait_for::<OnboardingTask>("signup-user-42", "onboarding", &input, Duration::from_secs(60))
.await?;
// output: OnboardingOutput — inferred from OnboardingTask::Output
async fn cancel(
&self,
execution_id: &str,
) -> Result<bool, SchedulerError>

Returns true if the execution was successfully cancelled, false if it was already terminal (completed, failed, or previously cancelled).

Deliver an event to a waiting durable execution.

async fn offer_event(
&self,
execution_id: &str,
event_name: &str,
payload: serde_json::Value,
) -> Result<(), SchedulerError>
durable
.offer_event(
&execution_id,
"manager-approval",
serde_json::to_value(&decision)?,
)
.await?;

The event is deserialized into the type expected by the corresponding zart::wait_for_event call inside the execution.

DurableExecutionStatus contains:

FieldTypeDescription
nameStringRegistered task name
execution_idStringUnique execution identifier
payloadserde_json::ValueOriginal input
statusExecutionStatusCurrent state
resultOption<serde_json::Value>Output if completed, error info if failed
scheduled_atDateTime<Utc>When the execution was first scheduled
completed_atOption<DateTime<Utc>>When the execution reached a terminal state
stepsVec<StepStatusInfo>Per-step status and results

ExecutionStatus variants:

VariantMeaning
PendingScheduled but not yet picked up by a worker
RunningActively being processed
WaitingDurably paused (sleep or event wait)
CompletedFinished successfully
FailedExhausted retries or unhandled error
CancelledStopped via cancel()
async fn list_executions(
&self,
status_filter: Option<ExecutionStatus>,
task_name: Option<String>,
limit: u32,
offset: u32,
) -> Result<Vec<DurableExecutionStatus>, SchedulerError>

Query executions with optional filtering by status and task name, with pagination.

let all = durable
.list_executions(None, Some("onboarding".into()), 100, 0)
.await?;