Type-safe State Threading
Each step’s return type merges into ctx.state, giving full autocomplete and compile-time checks across the entire pipeline.
Type-safe State Threading
Each step’s return type merges into ctx.state, giving full autocomplete and compile-time checks across the entire pipeline.
Zero Dependencies
Runs in-process with no external infrastructure. Just async/await.
Retries & Backoff
Per-step retry policies with exponential backoff, delay caps, and conditional predicates.
Timeouts & Cancellation
Flow-level and step-level timeouts backed by AbortSignal with cooperative cancellation.
Database Transactions
Wrap steps in db.transaction() — works with Drizzle, Knex, Prisma, or any ORM.
Event Publishing
Emit domain events to named channels with automatic correlation IDs.
Parallel Execution
Run independent steps concurrently with configurable merge strategies.
Observability Hooks
Plug in logging, metrics, or tracing through the observer interface.
import { createFlow, ValidationError } from '@celom/prose';
const onboardUser = createFlow<{ email: string; name: string }>('onboard-user') .validate('checkEmail', (ctx) => { if (!ctx.input.email.includes('@')) throw ValidationError.single('email', 'Invalid email'); }) .step('createAccount', async (ctx) => { const user = await db.createUser(ctx.input); return { user }; }) .withRetry({ maxAttempts: 3, delayMs: 200, backoffMultiplier: 2 }) .step('sendWelcome', async (ctx) => { await mailer.send(ctx.state.user.email, 'Welcome!'); }) .event('users', (ctx) => ({ eventType: 'user.onboarded', userId: ctx.state.user.id, })) .build();
const result = await onboardUser.execute( { email: 'alice@example.com', name: 'Alice' }, { db, eventPublisher });npm install @celom/prose