Order Processing
This example demonstrates a realistic order processing pipeline that validates input, charges a payment, persists data in a transaction, sends a confirmation, and publishes domain events.
The flow
Section titled “The flow”import { createFlow, ValidationError } from '@celom/prose';
interface OrderInput { orderId: string; userId: string; items: Array<{ sku: string; quantity: number; price: number }>;}
interface OrderDeps { db: DatabaseClient; eventPublisher: FlowEventPublisher;}
const processOrder = createFlow<OrderInput, OrderDeps>('process-order')
// 1. Validate the order .validate('validateOrder', (ctx) => { if (ctx.input.items.length === 0) throw ValidationError.single('items', 'Order must have at least one item');
const invalidItems = ctx.input.items.filter((i) => i.quantity <= 0 || i.price <= 0); if (invalidItems.length > 0) throw ValidationError.multiple( invalidItems.map((i) => ({ field: `item.${i.sku}`, message: 'Invalid quantity or price' })) ); })
// 2. Calculate the total .step('calculateTotal', (ctx) => { const subtotal = ctx.input.items.reduce( (sum, item) => sum + item.price * item.quantity, 0 ); const tax = subtotal * 0.08; return { subtotal, tax, total: subtotal + tax }; })
// 3. Charge payment (with retries for transient failures) .step('chargePayment', async (ctx) => { const receipt = await payments.charge({ userId: ctx.input.userId, amount: ctx.state.total, }); return { receipt }; }) .withRetry({ maxAttempts: 3, delayMs: 500, backoffMultiplier: 2, shouldRetry: (err) => err.code !== 'CARD_DECLINED', })
// 4. Persist in a database transaction .transaction('persistOrder', async (ctx, tx) => { const orderId = await tx.insert('orders', { id: ctx.input.orderId, userId: ctx.input.userId, total: ctx.state.total, receiptId: ctx.state.receipt.id, status: 'confirmed', });
await Promise.all( ctx.input.items.map((item) => tx.insert('order_items', { orderId, ...item }) ) );
return { persistedOrderId: orderId }; })
// 5. Send confirmation email (don't retry — not critical) .step('sendConfirmation', async (ctx) => { await mailer.send(ctx.input.userId, { template: 'order-confirmed', orderId: ctx.state.persistedOrderId, total: ctx.state.total, }); })
// 6. Publish domain events .event('orders', (ctx) => ({ eventType: 'order.confirmed', orderId: ctx.state.persistedOrderId, userId: ctx.input.userId, total: ctx.state.total, }))
// 7. Shape the output .map((input, state) => ({ orderId: state.persistedOrderId, total: state.total, receiptId: state.receipt.id, status: 'confirmed' as const, })) .build();Running the flow
Section titled “Running the flow”import { PinoFlowObserver } from '@celom/prose';
const result = await processOrder.execute( { orderId: 'ord_abc123', userId: 'user_42', items: [ { sku: 'WIDGET-A', quantity: 2, price: 29.99 }, { sku: 'GADGET-B', quantity: 1, price: 49.99 }, ], }, { db, eventPublisher }, { timeout: 30_000, observer: new PinoFlowObserver(logger), });
// result: { orderId: string; total: number; receiptId: string; status: 'confirmed' }What this demonstrates
Section titled “What this demonstrates”- Validation — fail fast before doing any work
- State threading —
total,receipt, andpersistedOrderIdflow through steps with full type safety - Retries — payment charging retries transient errors but not card declines
- Transactions — order and items are persisted atomically
- Events — domain event published with automatic
correlationId - Output mapping —
.map()shapes the result to exactly what the caller needs - Observability — Pino observer provides structured logging for every step