Skip to content

Zart is in active development — breaking API changes may occur despite our best efforts to keep contracts stable.

Scheduler-Only

This example demonstrates using zart-scheduler as a standalone crate — without the full Zart durable execution engine. It’s perfect for lightweight scheduling needs where you don’t need step checkpointing, retries, or complex workflow state.

Features demonstrated: PostgresTaskScheduler, ScheduledTask trait, task chaining via OnComplete, schedule_now, schedule_at, parallel task execution.

NeedUse zart-schedulerUse full zart
Lightweight fire-and-forget tasks
Task chaining (A → B → C)
Cron-like / recurring schedules
Step-level checkpointing
Automatic retries with backoff
Complex workflows with branching
Admin UI / execution visibility
struct SendWelcomeEmail;
#[async_trait]
impl ScheduledTask for SendWelcomeEmail {
async fn execute(
&self,
instance: &TaskInstance,
) -> Result<Box<dyn CompletionHandler>, SchedulerTaskError> {
let input: WelcomeEmailInput = serde_json::from_value(instance.data.clone())?;
println!(" [send-welcome-email] Sending welcome email to {}", input.email);
sleep(Duration::from_millis(300)).await;
// Chain: schedule cleanup task on completion
let cleanup_id = format!("cleanup-{}", Uuid::new_v4());
let schedule_next = vec![zart_scheduler::ScheduleAtParams {
task_id: cleanup_id,
task_name: "onboarding-cleanup".to_string(),
execution_time: Utc::now(),
data: json!({ "user_id": input.user_id }),
recurrence: None,
metadata: Value::Null,
}];
Ok(Box::new(OnComplete {
result: Some(json!({ "status": "sent" })),
schedule_next,
}))
}
}
struct OnboardingCleanup;
#[async_trait]
impl ScheduledTask for OnboardingCleanup {
async fn execute(
&self,
instance: &TaskInstance,
) -> Result<Box<dyn CompletionHandler>, SchedulerTaskError> {
let input: CleanupInput = serde_json::from_value(instance.data.clone())?;
println!(" [onboarding-cleanup] Running cleanup for user {}", input.user_id);
sleep(Duration::from_millis(200)).await;
// Chain: schedule report generation
let report_id = format!("report-{}", Uuid::new_v4());
let schedule_next = vec![zart_scheduler::ScheduleAtParams {
task_id: report_id,
task_name: "generate-report".to_string(),
execution_time: Utc::now(),
data: json!({ "user_id": input.user_id }),
recurrence: None,
metadata: Value::Null,
}];
Ok(Box::new(OnComplete {
result: Some(json!({ "status": "cleaned" })),
schedule_next,
}))
}
}
struct GenerateReport;
#[async_trait]
impl ScheduledTask for GenerateReport {
async fn execute(
&self,
instance: &TaskInstance,
) -> Result<Box<dyn CompletionHandler>, SchedulerTaskError> {
let input: ReportInput = serde_json::from_value(instance.data.clone())?;
println!(" [generate-report] Generating report for user {}", input.user_id);
sleep(Duration::from_millis(150)).await;
// No more chaining — return OnComplete::done()
Ok(OnComplete::done())
}
}
let pool = sqlx::PgPool::connect(&db_url).await?;
let scheduler = Arc::new(PostgresTaskScheduler::new(pool));
// Register task handlers
let mut registry = TaskRegistry::new();
registry.register("send-welcome-email", SendWelcomeEmail);
registry.register("onboarding-cleanup", OnboardingCleanup);
registry.register("generate-report", GenerateReport);
// Start worker
let config = WorkerConfig {
poll_interval: Duration::from_millis(200),
max_tasks_per_poll: 10,
max_concurrent_tasks: 4,
..Default::default()
};
let worker = Arc::new(Worker::new(scheduler.clone(), Arc::new(registry), config, vec![]));
tokio::spawn(async move { worker.run().await });
let chain_id = format!("chain-{}", Uuid::new_v4());
scheduler
.schedule_now(&chain_id, "send-welcome-email", welcome_data)
.await?;
// -> sends email -> schedules cleanup -> cleanup runs -> schedules report -> report runs
let future_time = Utc::now() + chrono::Duration::seconds(3);
scheduler
.schedule_at(ScheduleAtParams {
task_id: greeting_id,
task_name: "scheduled-greeting".to_string(),
execution_time: future_time,
data: json!({ "name": "Paulo" }),
recurrence: None,
metadata: Value::Null,
})
.await?;
for i in 0..3 {
scheduler
.schedule_now(&format!("parallel-{i}"), "scheduled-greeting", json!({}))
.await?;
}
// All 3 run concurrently (up to max_concurrent_tasks)
Terminal window
just example-scheduler-only
=== Zart Scheduler-Only Example ===
--- Demo1: Task Chaining ---
Scheduling send-welcome-email...
[send-welcome-email] Sending welcome email to alice@example.com
[send-welcome-email] Email sent, scheduled onboarding-cleanup
[onboarding-cleanup] Running cleanup for user user-42
[onboarding-cleanup] Cleanup done, scheduled generate-report
[generate-report] Generating 'onboarding-complete' report
[generate-report] Report generated
--- Demo2: Scheduled Future Task ---
Scheduled greeting for 2024-01-15T10:30:03+00:00 (in 3 seconds)
[scheduled-greeting] Hello, Paulo!
--- Demo3: Independent Parallel Tasks ---
Scheduled 3 parallel greeting tasks
[scheduled-greeting] Hello, User-parallel-0!
[scheduled-greeting] Hello, User-parallel-1!
[scheduled-greeting] Hello, User-parallel-2!
=== All demos completed ===

ScheduledTask trait — Implement execute() to define what happens when the task runs. Return OnComplete to signal completion.

Task chaining via OnComplete::schedule_next — The schedule_next field lets you schedule follow-up tasks atomically as part of completing the current task.

schedule_now vs schedule_atschedule_now runs as soon as a worker picks it up. schedule_at waits until the specified time.

No step checkpointing — Unlike DurableExecution, if the worker crashes mid-task, the task will be picked up again by another worker (or the same worker after restart), but any partial work in the task will be re-done.

Lightweightzart-scheduler has minimal dependencies and no concept of “steps”, “runs”, or “replays”. It’s a straightforward task queue with PostgreSQL persistence.