Transactions
Why Transaction Participation Matters
Section titled “Why Transaction Participation Matters”In a typical durable execution system, the framework opens its own database transactions for every state change. This is fine in isolation, but creates atomicity gaps when your business logic also writes to the database:
Your code: INSERT INTO users (id, email) ... ← tx 1 (yours)Framework: INSERT INTO zart_executions ... ← tx 2 (framework)Framework: INSERT INTO zart_tasks ... ← tx 3 (framework)If tx 1 commits but tx 2 fails, you have a user row with no corresponding execution. If tx 2 commits but your app crashes before tx 1, you have a phantom execution.
Zart solves this with two opt-in mechanisms that let your code share transactions with the framework.
Scenario 1: Transactional Scheduling
Section titled “Scenario 1: Transactional Scheduling”Use start_in_tx to start a durable execution within your own transaction. The execution record, initial run row, and root body task are all inserted in the same transaction as your business writes.
Basic Usage
Section titled “Basic Usage”use zart::prelude::*;use zart::DurableScheduler;
async fn register_user( pool: &sqlx::PgPool, sched: &DurableScheduler, user_id: Uuid, email: &str,) -> Result<(), Error> { let mut tx = pool.begin().await?;
// Your business write. sqlx::query( "INSERT INTO users (id, email) VALUES ($1, $2)", ) .bind(user_id) .bind(email) .execute(&mut *tx) .await?;
// Durable execution — same transaction. sched.start_in_tx( &mut tx, &format!("onboard-{user_id}"), "onboarding", serde_json::json!({ "user_id": user_id }), ).await?;
// Both commit atomically — or neither exists. tx.commit().await?; Ok(())}If the transaction rolls back, neither the user row nor the execution record will exist.
Typed Variant: start_for_in_tx
Section titled “Typed Variant: start_for_in_tx”start_for_in_tx infers the input type from the handler, just like start_for:
sched.start_for_in_tx::<OnboardingHandler>( &mut tx, &format!("onboard-{user_id}"), "onboarding", &OnboardingInput { user_id },).await?;Why No start_and_wait_for_in_tx?
Section titled “Why No start_and_wait_for_in_tx?”The transaction must commit before the worker can poll the task. Waiting inside the transaction would deadlock. The correct pattern is:
sched.start_for_in_tx::<H>(&mut tx, id, task, &input).await?;tx.commit().await?; // worker can now poll the tasklet result = sched.wait_for::<H>(id, timeout).await?;Edge Cases
Section titled “Edge Cases”| Scenario | Behavior |
|---|---|
| Existing execution ID | Returns NotSupported — use start() instead for resets |
| Transaction rollback | No execution record or body task exists |
| Crash after commit | Normal recovery — body task is in scheduled, worker picks it up |
Scnario 2: Transactional Step Completion
Section titled “Scnario 2: Transactional Step Completion”When a step performs database writes, those writes normally commit independently of the framework’s step-completion writes. A crash between the two creates a partial state: the business write exists but the step is still marked scheduled, causing a duplicate execution on retry.
zart::trx solves this by registering a transaction that the framework uses for its own completion writes.
How It Works
Section titled “How It Works”Step run(): 1. zart::trx(&pool) → begin tx, store in task-local 2. sqlx::query(...).execute(&mut **tx) → your DB writes 3. return Ok(result) ↓Framework (after run returns): 4. complete_step_and_schedule_body_in_tx(tx, result) → framework writes 5. tx.commit() → everything commits atomicallyIf run() returns Err, the framework rolls back the transaction instead.
use zart::prelude::*;use zart::trx;
struct DeductBalanceStep { account_id: Uuid, amount: i64,}
#[async_trait::async_trait]impl ZartStep for DeductBalanceStep { type Output = i64; type Error = PaymentError;
fn step_name(&self) -> Cow<'static, str> { Cow::Borrowed("deduct-balance") }
async fn run(&self) -> Result<Self::Output, Self::Error> { let mut tx = trx(&self.pool).await .map_err(|e| PaymentError::Framework { reason: e.to_string() })?;
let balance: (i64,) = sqlx::query_as( "UPDATE accounts SET balance = balance - $1 WHERE id = $2 RETURNING balance", ) .bind(self.amount) .bind(self.account_id) .fetch_one(&mut **tx) // ← use the registered transaction .await .map_err(|e| PaymentError::Db { reason: e.to_string() })?;
Ok(balance.0) // Framework commits tx after this returns Ok. // Framework rolls back tx if this returns Err. }}Contract
Section titled “Contract”| Rule | Why |
|---|---|
Call zart::trx at most once per step | Multiple transactions can’t be atomically committed together |
Do not call tx.commit() yourself | The framework owns the lifecycle after trx() returns |
Do not call tx.rollback() yourself | On error, the framework rolls back automatically |
Keep trx() → return fast | Holding a transaction across HTTP calls risks long-lived locks |
Deref Ergonomics
Section titled “Deref Ergonomics”ZartTrx implements Deref and DerefMut targeting sqlx::Transaction. Use &mut **tx to pass it to sqlx:
sqlx::query("UPDATE ...").execute(&mut **tx).await?;sqlx::query_as("SELECT ...").fetch_one(&mut **tx).await?;Error Behavior
Section titled “Error Behavior”run() returns | zart::trx called | Framework action |
|---|---|---|
Ok(value) | Yes | Completes step inside tx, then commits |
Err(e) | Yes | Rolls back tx, proceeds with retry logic |
Ok(value) | No | Opens its own transaction (default path) |
Err(e) | No | Opens its own transaction for metadata (default path) |
The key insight: rollback prevents partial state, but the logical failure still propagates normally. A declined payment is still a declined payment — it just doesn’t leave behind a half-written balance update.
Full Example
Section titled “Full Example”The transactions example demonstrates both features end-to-end:
- Scenario 1: Creates a user row and starts an onboarding execution atomically via
start_in_tx. - Scenario 2: The onboarding handler deducts a welcome bonus using
zart::trx, ensuring the balance update and step completion commit together.
Run it with:
just example-transactionsWhen to Use What
Section titled “When to Use What”| Scenario | Use |
|---|---|
| Starting an execution alongside business writes | start_in_tx / start_for_in_tx |
| Step does DB writes that must be atomic with completion | zart::trx |
| Step makes HTTP calls or external API requests | Don’t use zart::trx — the default path is correct |
| Step is pure computation | Don’t use zart::trx — the default path is correct |
| Retrying an existing execution | start() (non-transactional) |
Both features are opt-in. Existing code using start(), start_for(), and #[zart_step] works identically without changes.