Type-safe State Threading
Each step’s return type merges into ctx.state, giving full autocomplete
and compile-time checks across the entire pipeline.
Type-safe State Threading
Each step’s return type merges into ctx.state, giving full autocomplete
and compile-time checks across the entire pipeline.
Zero Dependencies
Runs in-process with no external infrastructure. Just async/await.
Retries & Backoff
Per-step retry policies with exponential backoff, delay caps, and conditional predicates.
Timeouts & Cancellation
Flow-level and step-level timeouts backed by AbortSignal with cooperative
cancellation.
Database Transactions
Wrap steps in db.transaction() — works with Drizzle, Knex, Prisma, or any
ORM.
Event Publishing
Emit domain events to named channels with automatic correlation IDs.
Parallel Execution
Run independent steps concurrently with configurable merge strategies.
Observability Hooks
Plug in logging, metrics, or tracing through the observer interface.
Durable Execution
Opt-in skip-ahead checkpointing — survive process crashes without re-running completed steps.
import { createFlow, ValidationError } from '@celom/prose';
const onboardUser = createFlow<{ email: string; name: string }>('onboard-user') .validate('checkEmail', (ctx) => { if (!ctx.input.email.includes('@')) throw ValidationError.single('email', 'Invalid email'); }) .step('createAccount', async (ctx) => { const user = await db.createUser(ctx.input); return { user }; }) .withRetry({ maxAttempts: 3, delayMs: 200, backoffMultiplier: 2 }) .step('sendWelcome', async (ctx) => { await mailer.send(ctx.state.user.email, 'Welcome!'); }) .event('users', (ctx) => ({ eventType: 'user.onboarded', userId: ctx.state.user.id, })) .build();
const result = await onboardUser.execute( { email: 'alice@example.com', name: 'Alice' }, { db, eventPublisher });npm install @celom/prose