FlowBuilder
The FlowBuilder is the main API for defining flows. It provides a fluent interface — each method returns a new builder with updated types.
.step(name, handler)
Section titled “.step(name, handler)”Add a regular step. The handler receives FlowContext and can return an object to merge into state.
.step('fetchOrder', async (ctx) => { const order = await db.getOrder(ctx.input.orderId); return { order };}).validate(name, handler)
Section titled “.validate(name, handler)”Add a validation step. Runs before processing and is never retried. Throw ValidationError to fail fast.
.validate('checkInput', (ctx) => { if (ctx.input.amount <= 0) throw ValidationError.single('amount', 'Must be positive');}).stepIf(name, condition, handler)
Section titled “.stepIf(name, condition, handler)”Add a conditional step. The handler only runs when condition(ctx) returns true. Skipped steps don’t affect state.
.stepIf('sendSms', (ctx) => ctx.input.phone != null, async (ctx) => { await sms.send(ctx.input.phone, 'Hello!');}).transaction(name, handler)
Section titled “.transaction(name, handler)”Wrap a step in db.transaction(). The handler receives (ctx, tx) where tx is the transaction client.
.transaction('persist', async (ctx, tx) => { const id = await tx.insert('users', ctx.input); return { userId: id };})Requires a db dependency conforming to the DatabaseClient interface.
.parallel(name, strategy, ...handlers)
Section titled “.parallel(name, strategy, ...handlers)”Run multiple handlers concurrently and merge results into state.
.parallel('fetchAll', 'deep', async (ctx) => ({ users: await fetchUsers() }), async (ctx) => ({ posts: await fetchPosts() }),)Merge strategies: 'shallow', 'error-on-conflict', 'deep'
Events
Section titled “Events”.event(channel, builder)
Section titled “.event(channel, builder)”Publish a single event to a named channel.
.event('orders', (ctx) => ({ eventType: 'order.created', orderId: ctx.state.orderId,})).events(channel, builders)
Section titled “.events(channel, builders)”Publish multiple events to a channel in one step.
.events('notifications', [ (ctx) => ({ eventType: 'email.send', to: ctx.input.email }), (ctx) => ({ eventType: 'sms.send', to: ctx.input.phone }),])Flow control
Section titled “Flow control”.breakIf(condition, returnValue?)
Section titled “.breakIf(condition, returnValue?)”Short-circuit the flow. If condition(ctx) returns true, all remaining steps and .map() are skipped.
.breakIf( (ctx) => ctx.state.existing != null, (ctx) => ({ user: ctx.state.existing, created: false })).withRetry(options)
Section titled “.withRetry(options)”Add a retry policy to the last step. See Retries guide for full options.
.withRetry({ maxAttempts: 3, delayMs: 200, backoffMultiplier: 2,})Composition
Section titled “Composition”.pipe(fn)
Section titled “.pipe(fn)”Apply a builder transformation function. Used for reusable sub-flows.
.pipe(withAuth).pipe(withAuditLog)See Sub-flows guide for details.
.map(mapper)
Section titled “.map(mapper)”Transform the final accumulated state into a custom output shape. Receives (input, state).
.map((input, state) => ({ id: state.user.id, displayName: state.user.name,})).build()
Section titled “.build()”Create an executable FlowDefinition. Returns an object with an .execute() method.
const flow = createFlow<Input>('name') .step(...) .build();
const result = await flow.execute(input, deps, options?);