Skip to content
Prose v0.3.2

Core Concepts

A flow is a named sequence of steps that processes an input and produces an output. Think of it as a typed pipeline for business operations — order processing, user onboarding, payment handling, etc.

import { createFlow } from '@celom/prose';
const flow = createFlow<{ email: string }>('onboard-user');

The generic parameter defines the input type. The string argument is the flow’s name, used in error messages, logging, and observability.

Steps are the building blocks of a flow. Each step receives a context object and can return new state to merge.

const flow = createFlow<{ email: string }>('onboard-user')
.step('createUser', async (ctx) => {
const user = await db.create({ email: ctx.input.email });
return { user };
})
.step('sendWelcome', async (ctx) => {
await mailer.send(ctx.state.user.email, 'Welcome!');
return { welcomed: true };
})
.build();

Prose provides several step types:

MethodPurpose
.step()Regular step — runs always, returns state to merge
.validate()Validation step — runs before processing, never retried
.stepIf()Conditional step — runs only when condition is true
.transaction()Transaction step — wraps handler in deps.db.transaction(), passing the ORM’s native transaction client as tx
.parallel()Parallel step — runs multiple handlers concurrently

The core design of Prose is type-safe state threading. Each step’s return value is shallow-merged into the accumulated state, and TypeScript tracks the types through the entire chain.

createFlow<{ orderId: string }>('process-order')
.step('fetch', async (ctx) => {
// ctx.state is {} (empty)
const order = await db.getOrder(ctx.input.orderId);
return { order }; // state becomes { order: Order }
})
.step('charge', async (ctx) => {
// ctx.state is { order: Order } — fully typed!
const receipt = await payments.charge(ctx.state.order.total);
return { receipt }; // state becomes { order: Order; receipt: Receipt }
})
.step('notify', async (ctx) => {
// ctx.state is { order: Order; receipt: Receipt }
await notify(ctx.state.order.email, ctx.state.receipt.id);
// returning nothing is fine — state unchanged
})
.build();

Steps that return void or undefined don’t modify the state. The state type only grows — it’s never narrowed.

Every step handler receives a FlowContext with four properties:

interface FlowContext<TInput, TDeps, TState> {
readonly input: Readonly<TInput>; // original input, never changes
state: TState; // accumulated state from prior steps
deps: TDeps; // injected dependencies
meta: FlowMeta; // runtime metadata
signal: AbortSignal; // combined abort signal
}

The original input passed to .execute(). It’s Readonly — steps cannot mutate it.

The accumulated state from all prior steps. Each step’s return value is shallow-merged into this object.

Dependencies injected via the second argument to .execute(). Prose defines a base contract (BaseFlowDependencies) with optional db?: DatabaseClient and eventPublisher?: FlowEventPublisher properties — provide them when your flow uses .transaction() or .event() steps. You extend this with your own dependencies (API clients, repositories, etc). See Types and Database Transactions for details.

Runtime metadata about the current execution:

interface FlowMeta {
flowName: string; // the flow's name
startedAt: Date; // when the flow started
currentStep?: string; // current step name
correlationId?: string; // custom, passed via execute options
}

An AbortSignal that combines the flow timeout, step timeout, and any external signal. Pass it to fetch, database queries, or check ctx.signal.aborted for cooperative cancellation.

Flows are constructed using a fluent builder pattern. Chain methods to add steps, then call .build() to create an executable flow definition.

const flow = createFlow<Input, Deps>('name')
.step(...)
.step(...)
.withRetry(...)
.event(...)
.map(...)
.build();

The builder is immutable — each method returns a new builder instance. This makes it safe to branch and reuse partial builders.

By default, .execute() returns the full accumulated state. Use .map() to transform it into a custom output shape:

const flow = createFlow<{ id: string }>('get-user')
.step('fetch', async (ctx) => {
const user = await db.getUser(ctx.input.id);
return { user };
})
.map((input, state) => ({
id: state.user.id,
displayName: state.user.name,
}))
.build();
// result is { id: string; displayName: string }
const result = await flow.execute({ id: 'u_1' }, {});

Extract reusable step sequences as functions and compose them with .pipe():

function withAuth(builder) {
return builder
.step('validateToken', async (ctx) => {
const session = await auth.verify(ctx.input.token);
return { session };
});
}
const flow = createFlow<{ token: string }>('protected')
.pipe(withAuth)
.step('doWork', (ctx) => {
// ctx.state.session is typed
})
.build();

See the Sub-flows guide for more details.