Order Processing with Durability
This example extends the order processing pipeline with durability. The same flow now survives a process crash mid-execution: completed steps are not re-run, and a re-execution returns the saved result instead of charging the card twice.
The same flow, with durability
Section titled “The same flow, with durability”Two things change. The handlers pass ctx.meta.idempotencyKey to any external side effect that supports idempotency, and the caller passes a durability option to execute().
import { createFlow, ValidationError, MemoryDurabilityStore,} from '@celom/prose';
const processOrder = createFlow<OrderInput, OrderDeps>('process-order') .validate('validateOrder', (ctx) => { if (ctx.input.items.length === 0) throw ValidationError.single( 'items', 'Order must have at least one item' ); })
.step('calculateTotal', (ctx) => { const subtotal = ctx.input.items.reduce( (sum, item) => sum + item.price * item.quantity, 0 ); const tax = subtotal * 0.08; return { subtotal, tax, total: subtotal + tax }; })
// Stripe accepts an idempotency key — pass ctx.meta.idempotencyKey so a // resumed run that re-invokes this step doesn't charge the card twice. .step('chargePayment', async (ctx) => { const receipt = await stripe.paymentIntents.create( { amount: Math.round(ctx.state.total * 100), currency: 'usd', customer: ctx.input.userId, }, { idempotencyKey: ctx.meta.idempotencyKey } ); return { receipt }; }) .withRetry({ maxAttempts: 3, delayMs: 500, backoffMultiplier: 2, shouldRetry: (err) => err.code !== 'CARD_DECLINED', })
.transaction('persistOrder', async (ctx, tx) => { // Upsert by orderId so a re-run after a crash mid-transaction // doesn't create a duplicate row. const persistedOrderId = await tx.upsert('orders', { id: ctx.input.orderId, userId: ctx.input.userId, total: ctx.state.total, receiptId: ctx.state.receipt.id, status: 'confirmed', }); return { persistedOrderId }; })
// No retry on email — the queue itself handles redelivery. The // idempotencyKey on the queue message protects against duplicates. .step('sendConfirmation', async (ctx) => { await mailer.send(ctx.input.userId, { template: 'order-confirmed', orderId: ctx.state.persistedOrderId, idempotencyKey: ctx.meta.idempotencyKey, }); })
.event('orders', (ctx) => ({ eventType: 'order.confirmed', orderId: ctx.state.persistedOrderId, userId: ctx.input.userId, total: ctx.state.total, }))
.map((_, state) => ({ orderId: state.persistedOrderId, total: state.total, receiptId: state.receipt.id, status: 'confirmed' as const, })) .build();Running with crash recovery
Section titled “Running with crash recovery”Use the orderId as the runId. Same business ID → same run.
const store = new MemoryDurabilityStore(); // replace with a persistent adapter in production
const result = await processOrder.execute( { orderId: 'ord_abc123', userId: 'user_42', items: [ { sku: 'WIDGET-A', quantity: 2, price: 29.99 }, { sku: 'GADGET-B', quantity: 1, price: 49.99 }, ], }, { db, eventPublisher }, { durability: { store, runId: 'ord_abc123' }, });A concrete crash scenario
Section titled “A concrete crash scenario”The process is killed by an OOM after chargePayment succeeds but before persistOrder finishes. The customer was charged but the order is not in your database.
After chargePayment succeeds, the checkpoint in the store looks like this:
await store.load('ord_abc123');// {// flowName: 'process-order',// runId: 'ord_abc123',// input: { orderId: 'ord_abc123', userId: 'user_42', items: [...] },// state: { subtotal: 109.97, tax: 8.80, total: 118.77, receipt: { id: 'pi_...' } },// completedSteps: ['validateOrder', 'calculateTotal', 'chargePayment'],// status: 'running',// createdAt: ...,// updatedAt: ...,// }Process restarts. A janitor (or the next webhook call from the upstream system) re-invokes the flow with the same runId:
const result = await processOrder.execute( { orderId: 'ord_abc123', userId: 'user_42', items: [...], }, { db, eventPublisher }, { durability: { store, runId: 'ord_abc123' }, },);On this second call:
validateOrder,calculateTotal, andchargePaymentare skipped — they’re recorded as completed and their state is already loaded from the checkpoint. The card is not charged again.persistOrderruns. Because it upserts byorderId, any partial transaction from the first attempt is consistent on commit.sendConfirmationruns. The mailer seesidempotencyKey = 'ord_abc123:sendConfirmation'and deduplicates if the first attempt managed to enqueue a message before crashing.event('orders', ...)publishes the confirmation event. Consumers must be idempotent — this event has at-least-once semantics under durability.
After completion the checkpoint becomes:
await store.load('ord_abc123');// {// ...// completedSteps: ['validateOrder', 'calculateTotal', 'chargePayment', 'persistOrder', 'sendConfirmation', 'orders'],// status: 'completed',// state: { ..., persistedOrderId: 'ord_abc123' },// }A third call to execute() with the same runId returns the saved result without invoking any handler. The card is not charged a third time and the customer doesn’t receive a third email.
What ctx.meta.idempotencyKey looks like
Section titled “What ctx.meta.idempotencyKey looks like”When durability is configured, every step handler sees a stable per-step key:
| Step name | ctx.meta.idempotencyKey |
|---|---|
chargePayment | 'ord_abc123:chargePayment' |
persistOrder | 'ord_abc123:persistOrder' |
sendConfirmation | 'ord_abc123:sendConfirmation' |
The key is the same on every attempt of the same logical step — that’s the whole point. When the second execute() call re-runs persistOrder, it sees the same idempotencyKey it would have used the first time, so external systems can deduplicate.
When durability is not configured, ctx.meta.idempotencyKey is undefined. Handlers that need a key in both modes should fall back to something else (ctx.meta.correlationId for example).
Cleanup
Section titled “Cleanup”Completed checkpoints stay in the store until you remove them. For a high-volume flow, delete after the saved result has been observed by the caller — or run a periodic prune for rows older than N days.
const result = await processOrder.execute(input, deps, { durability: { store, runId: orderId },});
await store.delete(orderId);return result;If you want replay to work for a window (e.g. retry-safe webhooks), keep checkpoints around for the duration of that window before deleting.
What this demonstrates
Section titled “What this demonstrates”- Skip-ahead checkpointing — completed steps are not re-run on the second
execute()call - Idempotency keys —
ctx.meta.idempotencyKeypropagated to Stripe and the mailer so re-runs don’t double-charge or double-send - Upsert by business ID — the transaction step is safe to re-run because it upserts, not inserts
- At-least-once events — consumers of
order.confirmedmust be idempotent under durability - Replay on completion — a third call with the same
runIdreturns the saved result without re-execution