Skip to content
Prose v0.3.3

Durability

Opt-in skip-ahead checkpointing. After every successful step, Prose persists a checkpoint. If the process crashes, calling execute() again with the same runId resumes from the next undone step. Same runId after completion replays the saved result without re-executing.

import { createFlow, MemoryDurabilityStore } from '@celom/prose';
const store = new MemoryDurabilityStore();
const processOrder = createFlow<{ orderId: string }>('process-order')
.step('chargePayment', async (ctx) => {
const receipt = await payments.charge({
amount: 100,
idempotencyKey: ctx.meta.idempotencyKey,
});
return { receipt };
})
.step('persistOrder', async (ctx) => {
await db.orders.insert({
id: ctx.input.orderId,
receiptId: ctx.state.receipt.id,
});
})
.build();
// First call — process dies after chargePayment, before persistOrder
await processOrder.execute(
{ orderId: 'ord_42' },
{ db },
{
durability: { store, runId: 'ord_42' },
}
);
// Second call (after restart) with same runId — chargePayment is skipped,
// persistOrder runs to completion.
await processOrder.execute(
{ orderId: 'ord_42' },
{ db },
{
durability: { store, runId: 'ord_42' },
}
);

After each successful step, Prose writes a checkpoint containing the original input, accumulated state, and the names of completed steps. On entry, execute() loads any existing checkpoint and behaves according to its status:

Stored statusexecute() behavior
no checkpointFresh run — every step executes.
running or failedResume — completed steps are skipped, state is loaded from the checkpoint, execution continues at the first undone step.
completedReplay — the saved result is returned without invoking any handler.

Checkpoints are persisted before observer notifications fire, so a crash in observer code can’t lose the completion record.

StoreUse case
MemoryDurabilityStoreBuilt-in. Tests, local dev, single-process scripts. Not production — state is lost when the process exits.
Custom adapterImplement DurabilityStore over your own persistence layer (SQLite, Postgres, Redis, S3).

Dedicated @celom/prose-store-sqlite and @celom/prose-store-postgres packages are on the roadmap. Until then, implement the three-method DurabilityStore interface yourself — see Writing a custom store.

The runId is the identity of a single run. Pass the same runId across processes and you get the same run.

  • Derive it from a business identifier — order ID, signup ID, message ID — so the second call after a crash is the obvious thing to write.
  • Don’t use a random UUID generated inside the request handler. The next request won’t know it.
await processOrder.execute(input, deps, {
durability: { store, runId: input.orderId },
});

A step may run twice. If the process dies between handler success and the checkpoint write, the next resume re-runs that step. Handlers must therefore be safe to invoke more than once for the same logical operation.

When durability is configured, Prose exposes ctx.meta.idempotencyKey — a stable per-step key of the form ${runId}:${stepName}. Pass it to any external API that supports idempotency.

.step('chargePayment', async (ctx) => {
const receipt = await stripe.paymentIntents.create(
{ amount: ctx.state.total, currency: 'usd' },
{ idempotencyKey: ctx.meta.idempotencyKey },
);
return { receipt };
})

For databases: use upserts keyed by ${runId}:${stepName} or by a business identifier the handler can derive deterministically. For queues: send with a deduplication ID. For things you can’t deduplicate (sending email, calling a webhook that has no idempotency support), accept that at-least-once is the contract you’re working with — or move that side effect into an outbox table that a separate worker drains.

There is no automatic resumer. Something must call execute() again with the same runId. Common patterns:

  • A web handler keyed by the business ID — every retry of the same logical request hits the same runId.
  • A cron / queue worker that scans the store for status === 'failed' checkpoints and re-invokes the flow.

Inside step handlers, ctx.meta.isResuming is true when this execution loaded an existing checkpoint. Use it to skip work that’s only needed on a fresh run (sending an initial acknowledgement email, for example).

.step('sendStartedEmail', async (ctx) => {
if (ctx.meta.isResuming) return;
await mailer.send(ctx.input.email, 'We started processing your order');
})

The checkpoint stores:

  • input — the original argument from the first execute() call, frozen.
  • state — accumulated state at the moment of the last successful step.
  • completedSteps — the names of finished (or condition-skipped) steps.
  • status'running', 'completed', or 'failed'.
  • breakValue — the return value when the flow short-circuited via breakIf.
  • failedStep{ name, error } when the run failed after exhausting retries.
  • createdAt / updatedAt.

What does not get persisted:

  • deps — you pass these on every call. They are not part of the run identity.
  • In-flight retry attempts inside a step — the checkpoint only updates after retries succeed.
  • Observer notifications — observers are local to each execute() call.
  • The input argument to the second execute() call on a resume — the saved input is authoritative. Pass any input you want; Prose will use the checkpoint’s copy.
FeatureBehavior under durability
.parallel(...)Atomic checkpoint — all handlers in the block re-run if any one fails. The parallel block as a whole is what’s marked complete, not the individual handlers.
.withRetry(...)Retries happen within the step. The checkpoint is written once, after retries succeed.
.breakIf(...)The break value is persisted in the checkpoint. Replay returns it without invoking any handler.
.stepIf(...) / .step({ condition })The decision to skip is recorded. Resume does not re-evaluate the condition.
.transaction(...)The transaction runs again if it failed before the checkpoint write. Database-level idempotency (upsert, unique constraints) is your responsibility.
.event(...)At-least-once: an event may be re-published if the crash falls between publisher.publish() and the checkpoint write. Consumers must be idempotent.
.validate(...)Re-runs on resume only if it threw on the previous attempt. Successful validations are recorded as completed.
  • No long sleeps that survive process death. There is no await sleep(7.days). If the process dies during a wait, the wait is gone.
  • No automatic resumer. Something outside Prose has to call execute() again. A cron, a queue retry, the next request — whatever fits your shape.
  • No distributed worker claim or lease. Two workers calling execute() with the same runId at the same time will both run the flow. If that matters to you, build a claim mechanism in your custom store or upstream of it.
  • No workflow versioning. Changing the shape of a flow while a checkpoint for it exists is undefined behavior. If you reorder steps, rename them, or change handler logic in a way that depends on state shape, drop the old checkpoint (store.delete(runId)) before deploying.

Make the runId equal to the business ID. Every request for the same order hits the same run.

app.post('/orders/:id/process', async (req, res) => {
const result = await processOrder.execute(
{ orderId: req.params.id },
{ db, payments, mailer },
{ durability: { store, runId: req.params.id } }
);
res.json(result);
});

If the previous attempt crashed mid-flow, this call resumes from the next undone step. If it completed, this call returns the saved result without re-charging or re-emailing.

For runs that don’t have an obvious caller to retry them, scan the store from a cron.

// Custom store extension — exposing a scan() is up to the adapter.
async function retryFailedRuns(store: ScanningStore) {
const failed = await store.scanByStatus('failed');
for (const cp of failed) {
try {
await processOrder.execute({} as never, deps, {
durability: { store, runId: cp.runId },
});
} catch (err) {
logger.error({ runId: cp.runId, err }, 'janitor: retry failed');
}
}
}

The first-call input is ignored on resume, so the janitor doesn’t need to recover it.

Prose does not delete completed checkpoints. The store grows monotonically until you do something about it.

const result = await processOrder.execute(input, deps, {
durability: { store, runId },
});
await store.delete(runId);

Or implement a TTL in your custom store. Or run a periodic prune job over rows older than N days. There’s no one-size-fits-all answer here — it depends on whether you want to replay completed runs, how long you need an audit trail for, etc.

Implement the DurabilityStore interface — three methods:

interface DurabilityStore {
load(runId: string): Promise<FlowCheckpoint | null>;
save(checkpoint: FlowCheckpoint): Promise<void>;
delete(runId: string): Promise<void>;
}

MemoryDurabilityStore is around 45 lines and serves as a reference implementation. It clones on both read and write so that the executor never mutates stored state — a good invariant to uphold in adapters that don’t serialize to bytes.

A canonical contract test suite lives at packages/prose/src/lib/__tests__/store-conformance.ts. It is not re-exported from the package entry point and is not included in the published dist/. Adapter authors can either:

  • Copy or vendor the file into your own adapter package (recommended for external adapters).
  • Import it directly from source if your adapter lives inside this monorepo.

Implementation notes:

  • save() must be atomic — partial writes defeat the purpose of checkpointing.
  • load() should return null (not throw) for unknown runIds.
  • delete() of an unknown runId is a no-op, not an error.
  • Cloning on read is encouraged for non-serializing stores so that callers can’t accidentally mutate stored state.