Skip to content

Persistence

Save workflow state and resume later. Completed steps return their cached results without re-executing.

Preferred: Use runWithState to run and get resume state in one call, then persist. Restore with store.loadResumeState(id) or toResumeState(await store.load(id)) and pass to workflow.run(fn, { resumeState }).

import { createWorkflow, toResumeState } from 'awaitly/workflow';
import { postgres } from 'awaitly-postgres';
const store = postgres('postgresql://localhost/mydb');
const workflow = createWorkflow('workflow', { fetchUser, fetchPosts });
// Run and persist
const { result, resumeState } = await workflow.runWithState(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });
const posts = await step('fetchPosts', () => deps.fetchPosts(user.id), { key: `posts:${user.id}` });
return { user, posts };
});
await store.save('wf-123', resumeState);
// Later: restore and resume (two explicit flows)
const loaded = await store.loadResumeState('wf-123');
if (loaded) {
await workflow.run(async ({ step, deps }) => { /* same fn */ }, { resumeState: loaded });
}
// Or: const loaded = await store.load('wf-123'); const resumeState = toResumeState(loaded);

Collector pattern when you need custom event handling (e.g. logging) alongside state capture: use createResumeStateCollector, pass handleEvent to onEvent, then collector.getResumeState(). Serialize with serializeResumeState(state) before persisting; use deserializeResumeState(parsed) when loading JSON. Restore with workflow.run(fn, { resumeState }) — do not pass a WorkflowSnapshot to resumeState.

Use the official persistence adapters for production:

// PostgreSQL
import { postgres } from 'awaitly-postgres';
const store = postgres('postgresql://localhost/mydb');
// MongoDB
import { mongo } from 'awaitly-mongo';
const store = mongo('mongodb://localhost:27017/mydb');
// libSQL / SQLite
import { libsql } from 'awaitly-libsql';
const store = libsql('file:./workflow.db');

Then run and persist with runWithState, or use the collector when you need custom onEvent:

// One-line style: runWithState + store
const { result, resumeState } = await workflow.runWithState(myWorkflowFn);
await store.save('wf-123', resumeState);
const loaded = await store.loadResumeState('wf-123');
if (loaded) await workflow.run(myWorkflowFn, { resumeState: loaded });

Collected state from createResumeStateCollector().getResumeState() is a ResumeState (JSON-serializable): { steps: Map<string, ResumeStateEntry> }. Some adapters or run configs also support WorkflowSnapshot for restore. The snapshot is a JSON-serializable object:

interface WorkflowSnapshot {
formatVersion: 1;
steps: Record<string, StepResult>;
execution: {
status: 'running' | 'completed' | 'failed';
lastUpdated: string;
completedAt?: string;
currentStepId?: string;
};
metadata?: {
workflowId?: string;
definitionHash?: string;
input?: JSONValue;
[key: string]: JSONValue | undefined;
};
warnings?: Array<{
type: 'lossy_value';
stepId: string;
path: string;
reason: 'non-json' | 'circular' | 'encode-failed';
}>;
}
// StepResult is a discriminated union
type StepResult =
| { ok: true; value: JSONValue }
| { ok: false; error: JSONValue; cause: SerializedCause; meta?: { origin: 'result' | 'throw' } };

You can use JSON.stringify() and JSON.parse() directly - no special serialization needed.

Use the collector’s handleEvent in onEvent and save after the run, or save inside onEvent on each step_complete for incremental checkpoints:

const collector = createResumeStateCollector();
const workflow = createWorkflow('workflow', { fetchUser }, {
onEvent: (event) => {
collector.handleEvent(event);
if (event.type === 'step_complete') {
// Optional: save after each step for fine-grained recovery
store.save('wf-123', collector.getResumeState());
}
},
});
await workflow.run(myWorkflowFn);
await store.save('wf-123', collector.getResumeState());

Resume from ResumeState (recommended): Use store.loadResumeState(id) or toResumeState(await store.load(id)), then workflow.run(fn, { resumeState }). Do not pass a WorkflowSnapshot to resumeState — that is a different shape.

Resume from snapshot: If your store returns a WorkflowSnapshot (e.g. from durable execution or legacy data), use workflow.run(fn, { snapshot: loadedSnapshot }). Do not conflate the two in docs or code.

// Flow 1: Resume from ResumeState (from runWithState or createResumeStateCollector)
const resumeState = await store.loadResumeState('wf-123');
if (resumeState) await workflow.run(fn, { resumeState });
// Flow 2: Resume from WorkflowSnapshot (e.g. durable / snapshot format)
const snapshot = await store.load('wf-123');
if (snapshot && isWorkflowSnapshot(snapshot)) {
await workflow.run(fn, { snapshot });
}
// Creation-time resume state
const workflow = createWorkflow('workflow', deps, { resumeState: savedState });
await workflow.run(fn);

Check if an object is a valid snapshot:

import {
looksLikeWorkflowSnapshot,
validateSnapshot,
assertValidSnapshot,
} from 'awaitly/persistence';
// Quick check
if (looksLikeWorkflowSnapshot(obj)) {
// Probably a snapshot
}
// Full validation with errors
const result = validateSnapshot(obj);
if (result.valid) {
const snapshot = result.snapshot;
} else {
console.error(result.errors);
}
// Throwing helper
const snapshot = assertValidSnapshot(obj); // throws SnapshotFormatError

For incremental persistence:

import { mergeSnapshots } from 'awaitly/persistence';
const merged = mergeSnapshots(baseSnapshot, deltaSnapshot);
// delta.steps overwrites base.steps
// execution from delta
// metadata shallow merged

From loaded resume state, check step completion via the steps Map:

const savedState = await store.load('wf-123');
if (savedState?.steps?.get('user:1')) {
console.log('User step already completed');
}

For event streams, use the isStepComplete type guard from awaitly/workflow to narrow event types.

Store adapters persist workflow state (e.g. ResumeState or WorkflowSnapshot). A typical interface:

interface SnapshotStore {
save(id: string, state: ResumeState | WorkflowSnapshot): Promise<void>;
load(id: string): Promise<ResumeState | WorkflowSnapshot | null>;
delete(id: string): Promise<void>;
list(options?: { prefix?: string; limit?: number }): Promise<Array<{ id: string; updatedAt: string }>>;
close(): Promise<void>;
}
import { postgres } from 'awaitly-postgres';
// One-liner
const store = postgres('postgresql://localhost/mydb');
// With options
const store = postgres({
url: 'postgresql://localhost/mydb',
table: 'my_workflow_snapshots',
prefix: 'orders:',
});

Learn more about PostgreSQL persistence →

import { mongo } from 'awaitly-mongo';
// One-liner
const store = mongo('mongodb://localhost:27017/mydb');
// With options
const store = mongo({
url: 'mongodb://localhost:27017',
database: 'myapp',
collection: 'my_workflow_snapshots',
prefix: 'orders:',
});

Learn more about MongoDB persistence →

import { libsql } from 'awaitly-libsql';
// Local SQLite
const store = libsql('file:./workflow.db');
// Remote Turso
const store = libsql({
url: process.env.TURSO_URL!,
authToken: process.env.TURSO_AUTH_TOKEN,
});

Implement the SnapshotStore interface:

function myStore(): SnapshotStore {
return {
async save(id, snapshot) {
await redis.set(id, JSON.stringify(snapshot));
},
async load(id) {
const data = await redis.get(id);
return data ? JSON.parse(data) : null;
},
async delete(id) {
await redis.del(id);
},
async list(options) {
// Implementation
},
async close() {
// Cleanup
},
};
}

Learn about Human-in-the-Loop →