Skip to content
Prose v0.3.2

Order Processing

This example demonstrates a realistic order processing pipeline that validates input, charges a payment, persists data in a transaction, sends a confirmation, and publishes domain events.

import { createFlow, ValidationError } from '@celom/prose';
interface OrderInput {
orderId: string;
userId: string;
items: Array<{ sku: string; quantity: number; price: number }>;
}
interface OrderDeps {
db: DatabaseClient;
eventPublisher: FlowEventPublisher;
}
const processOrder = createFlow<OrderInput, OrderDeps>('process-order')
// 1. Validate the order
.validate('validateOrder', (ctx) => {
if (ctx.input.items.length === 0)
throw ValidationError.single('items', 'Order must have at least one item');
const invalidItems = ctx.input.items.filter((i) => i.quantity <= 0 || i.price <= 0);
if (invalidItems.length > 0)
throw ValidationError.multiple(
invalidItems.map((i) => ({ field: `item.${i.sku}`, message: 'Invalid quantity or price' }))
);
})
// 2. Calculate the total
.step('calculateTotal', (ctx) => {
const subtotal = ctx.input.items.reduce(
(sum, item) => sum + item.price * item.quantity, 0
);
const tax = subtotal * 0.08;
return { subtotal, tax, total: subtotal + tax };
})
// 3. Charge payment (with retries for transient failures)
.step('chargePayment', async (ctx) => {
const receipt = await payments.charge({
userId: ctx.input.userId,
amount: ctx.state.total,
});
return { receipt };
})
.withRetry({
maxAttempts: 3,
delayMs: 500,
backoffMultiplier: 2,
shouldRetry: (err) => err.code !== 'CARD_DECLINED',
})
// 4. Persist in a database transaction
.transaction('persistOrder', async (ctx, tx) => {
const orderId = await tx.insert('orders', {
id: ctx.input.orderId,
userId: ctx.input.userId,
total: ctx.state.total,
receiptId: ctx.state.receipt.id,
status: 'confirmed',
});
await Promise.all(
ctx.input.items.map((item) =>
tx.insert('order_items', { orderId, ...item })
)
);
return { persistedOrderId: orderId };
})
// 5. Send confirmation email (don't retry — not critical)
.step('sendConfirmation', async (ctx) => {
await mailer.send(ctx.input.userId, {
template: 'order-confirmed',
orderId: ctx.state.persistedOrderId,
total: ctx.state.total,
});
})
// 6. Publish domain events
.event('orders', (ctx) => ({
eventType: 'order.confirmed',
orderId: ctx.state.persistedOrderId,
userId: ctx.input.userId,
total: ctx.state.total,
}))
// 7. Shape the output
.map((input, state) => ({
orderId: state.persistedOrderId,
total: state.total,
receiptId: state.receipt.id,
status: 'confirmed' as const,
}))
.build();
import { PinoFlowObserver } from '@celom/prose';
const result = await processOrder.execute(
{
orderId: 'ord_abc123',
userId: 'user_42',
items: [
{ sku: 'WIDGET-A', quantity: 2, price: 29.99 },
{ sku: 'GADGET-B', quantity: 1, price: 49.99 },
],
},
{ db, eventPublisher },
{
timeout: 30_000,
observer: new PinoFlowObserver(logger),
}
);
// result: { orderId: string; total: number; receiptId: string; status: 'confirmed' }
  • Validation — fail fast before doing any work
  • State threadingtotal, receipt, and persistedOrderId flow through steps with full type safety
  • Retries — payment charging retries transient errors but not card declines
  • Transactions — order and items are persisted atomically
  • Events — domain event published with automatic correlationId
  • Output mapping.map() shapes the result to exactly what the caller needs
  • Observability — Pino observer provides structured logging for every step