Skip to content
Prose v0.3.3

Order Processing with Durability

This example extends the order processing pipeline with durability. The same flow now survives a process crash mid-execution: completed steps are not re-run, and a re-execution returns the saved result instead of charging the card twice.

Two things change. The handlers pass ctx.meta.idempotencyKey to any external side effect that supports idempotency, and the caller passes a durability option to execute().

import {
createFlow,
ValidationError,
MemoryDurabilityStore,
} from '@celom/prose';
const processOrder = createFlow<OrderInput, OrderDeps>('process-order')
.validate('validateOrder', (ctx) => {
if (ctx.input.items.length === 0)
throw ValidationError.single(
'items',
'Order must have at least one item'
);
})
.step('calculateTotal', (ctx) => {
const subtotal = ctx.input.items.reduce(
(sum, item) => sum + item.price * item.quantity,
0
);
const tax = subtotal * 0.08;
return { subtotal, tax, total: subtotal + tax };
})
// Stripe accepts an idempotency key — pass ctx.meta.idempotencyKey so a
// resumed run that re-invokes this step doesn't charge the card twice.
.step('chargePayment', async (ctx) => {
const receipt = await stripe.paymentIntents.create(
{
amount: Math.round(ctx.state.total * 100),
currency: 'usd',
customer: ctx.input.userId,
},
{ idempotencyKey: ctx.meta.idempotencyKey }
);
return { receipt };
})
.withRetry({
maxAttempts: 3,
delayMs: 500,
backoffMultiplier: 2,
shouldRetry: (err) => err.code !== 'CARD_DECLINED',
})
.transaction('persistOrder', async (ctx, tx) => {
// Upsert by orderId so a re-run after a crash mid-transaction
// doesn't create a duplicate row.
const persistedOrderId = await tx.upsert('orders', {
id: ctx.input.orderId,
userId: ctx.input.userId,
total: ctx.state.total,
receiptId: ctx.state.receipt.id,
status: 'confirmed',
});
return { persistedOrderId };
})
// No retry on email — the queue itself handles redelivery. The
// idempotencyKey on the queue message protects against duplicates.
.step('sendConfirmation', async (ctx) => {
await mailer.send(ctx.input.userId, {
template: 'order-confirmed',
orderId: ctx.state.persistedOrderId,
idempotencyKey: ctx.meta.idempotencyKey,
});
})
.event('orders', (ctx) => ({
eventType: 'order.confirmed',
orderId: ctx.state.persistedOrderId,
userId: ctx.input.userId,
total: ctx.state.total,
}))
.map((_, state) => ({
orderId: state.persistedOrderId,
total: state.total,
receiptId: state.receipt.id,
status: 'confirmed' as const,
}))
.build();

Use the orderId as the runId. Same business ID → same run.

const store = new MemoryDurabilityStore(); // replace with a persistent adapter in production
const result = await processOrder.execute(
{
orderId: 'ord_abc123',
userId: 'user_42',
items: [
{ sku: 'WIDGET-A', quantity: 2, price: 29.99 },
{ sku: 'GADGET-B', quantity: 1, price: 49.99 },
],
},
{ db, eventPublisher },
{
durability: { store, runId: 'ord_abc123' },
}
);

The process is killed by an OOM after chargePayment succeeds but before persistOrder finishes. The customer was charged but the order is not in your database.

After chargePayment succeeds, the checkpoint in the store looks like this:

await store.load('ord_abc123');
// {
// flowName: 'process-order',
// runId: 'ord_abc123',
// input: { orderId: 'ord_abc123', userId: 'user_42', items: [...] },
// state: { subtotal: 109.97, tax: 8.80, total: 118.77, receipt: { id: 'pi_...' } },
// completedSteps: ['validateOrder', 'calculateTotal', 'chargePayment'],
// status: 'running',
// createdAt: ...,
// updatedAt: ...,
// }

Process restarts. A janitor (or the next webhook call from the upstream system) re-invokes the flow with the same runId:

const result = await processOrder.execute(
{
orderId: 'ord_abc123',
userId: 'user_42',
items: [...],
},
{ db, eventPublisher },
{
durability: { store, runId: 'ord_abc123' },
},
);

On this second call:

  • validateOrder, calculateTotal, and chargePayment are skipped — they’re recorded as completed and their state is already loaded from the checkpoint. The card is not charged again.
  • persistOrder runs. Because it upserts by orderId, any partial transaction from the first attempt is consistent on commit.
  • sendConfirmation runs. The mailer sees idempotencyKey = 'ord_abc123:sendConfirmation' and deduplicates if the first attempt managed to enqueue a message before crashing.
  • event('orders', ...) publishes the confirmation event. Consumers must be idempotent — this event has at-least-once semantics under durability.

After completion the checkpoint becomes:

await store.load('ord_abc123');
// {
// ...
// completedSteps: ['validateOrder', 'calculateTotal', 'chargePayment', 'persistOrder', 'sendConfirmation', 'orders'],
// status: 'completed',
// state: { ..., persistedOrderId: 'ord_abc123' },
// }

A third call to execute() with the same runId returns the saved result without invoking any handler. The card is not charged a third time and the customer doesn’t receive a third email.

When durability is configured, every step handler sees a stable per-step key:

Step namectx.meta.idempotencyKey
chargePayment'ord_abc123:chargePayment'
persistOrder'ord_abc123:persistOrder'
sendConfirmation'ord_abc123:sendConfirmation'

The key is the same on every attempt of the same logical step — that’s the whole point. When the second execute() call re-runs persistOrder, it sees the same idempotencyKey it would have used the first time, so external systems can deduplicate.

When durability is not configured, ctx.meta.idempotencyKey is undefined. Handlers that need a key in both modes should fall back to something else (ctx.meta.correlationId for example).

Completed checkpoints stay in the store until you remove them. For a high-volume flow, delete after the saved result has been observed by the caller — or run a periodic prune for rows older than N days.

const result = await processOrder.execute(input, deps, {
durability: { store, runId: orderId },
});
await store.delete(orderId);
return result;

If you want replay to work for a window (e.g. retry-safe webhooks), keep checkpoints around for the duration of that window before deleting.

  • Skip-ahead checkpointing — completed steps are not re-run on the second execute() call
  • Idempotency keysctx.meta.idempotencyKey propagated to Stripe and the mailer so re-runs don’t double-charge or double-send
  • Upsert by business ID — the transaction step is safe to re-run because it upserts, not inserts
  • At-least-once events — consumers of order.confirmed must be idempotent under durability
  • Replay on completion — a third call with the same runId returns the saved result without re-execution