Skip to content

State and Resumption

awaitly can save workflow state and resume from where it left off. This enables crash recovery, long-running workflows, and idempotent operations.

Enable caching so steps don’t re-execute:

const workflow = createWorkflow('workflow', deps, {
cache: new Map(),
});
const result = await workflow.run(async ({ step, deps }) => {
// Cached with key 'user:1'
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });
return user;
});

Keys must be:

  • Unique per step: Different steps need different keys
  • Stable: Same input should produce same key
  • Deterministic: No timestamps or random values
// Good keys
{ key: 'user:123' }
{ key: `posts:${userId}` }
{ key: `order:${orderId}:validate` }
// Bad keys
{ key: `user:${Date.now()}` } // Changes every call
{ key: `user:${Math.random()}` } // Random

Pass a function, not the result of calling:

// With thunk - can be cached (ID required as first argument)
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });

The cache persists across workflow runs:

const cache = new Map();
const workflow = createWorkflow('workflow', deps, { cache });
// First run - fetches user
await workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });
return user;
});
// Second run - uses cached value (no fetch)
await workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });
return user;
});

Use createResumeStateCollector to capture step results:

import { createWorkflow, createResumeStateCollector } from 'awaitly/workflow';
const collector = createResumeStateCollector();
const workflow = createWorkflow('workflow', { fetchUser, fetchPosts },
{ onEvent: collector.handleEvent }
);
await workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });
const posts = await step('fetchPosts', () => deps.fetchPosts(user.id), { key: `posts:${user.id}` });
return { user, posts };
});
// Get collected state
const state = collector.getResumeState();

Use createResumeStateCollector to capture step results during a run, then persist the returned ResumeState (JSON-serializable). To resume, pass the loaded state to workflow.run(fn, { resumeState }) or to creation options:

const collector = createResumeStateCollector();
const workflow = createWorkflow('workflow', deps, { onEvent: collector.handleEvent });
await workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });
return user;
});
const state = collector.getResumeState();
await store.save('wf-123', state);

To resume, load the state and pass it to the next run:

const savedState = await store.load('wf-123');
const workflow = createWorkflow('workflow', { fetchUser, fetchPosts },
{ onEvent: collector.handleEvent }
);
await workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });
const posts = await step('fetchPosts', () => deps.fetchPosts(user.id), { key: `posts:${user.id}` });
return { user, posts };
}, { resumeState: savedState });

You can also pass resumeState at creation time: createWorkflow('workflow', deps, { resumeState: savedState }).

Load the state before running:

const savedState = await store.load('wf-123');
await workflow.run(fn, { resumeState: savedState });

Save state after each critical step using the collector:

const store = postgres(process.env.DATABASE_URL!);
const collector = createResumeStateCollector();
const workflow = createWorkflow('workflow', deps, { onEvent: collector.handleEvent });
const savedState = await store.load('wf-123');
const result = await workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });
const state = collector.getResumeState();
await store.save('wf-123', state);
const posts = await step('fetchPosts', () => deps.fetchPosts(user.id), { key: `posts:${user.id}` });
return { user, posts };
}, { resumeState: savedState ?? undefined });
const finalState = collector.getResumeState();
await store.save('wf-123', finalState);

If the workflow crashes, resume from the last saved state:

const savedState = await store.load('wf-123');
await workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });
const posts = await step('fetchPosts', () => deps.fetchPosts(user.id), { key: `posts:${user.id}` });
return { user, posts };
}, { resumeState: savedState });

From loaded resume state, check step completion via the steps Map:

const savedState = await store.load('wf-123');
if (savedState?.steps?.get('user:1')) {
console.log('User step already completed');
}

For event streams, use the isStepComplete type guard from awaitly/workflow to narrow event types.

Use a store (e.g. from postgres(), mongo(), or libsql()) to persist resume state. Save collector.getResumeState() after runs and pass the loaded state to workflow.run(fn, { resumeState }) or creation options:

import { postgres } from 'awaitly-postgres';
const store = postgres(process.env.DATABASE_URL!);
const collector = createResumeStateCollector();
const workflow = createWorkflow('workflow', deps, { onEvent: collector.handleEvent });
await workflow.run(fn);
await store.save('wf-123', collector.getResumeState());
const savedState = await store.load('wf-123');
await workflow.run(fn, { resumeState: savedState });

See Persistence for the full interface and custom adapters (e.g. Redis).

For process-local caching without persistence, use createMemoryCache from awaitly/persistence:

import { createMemoryCache } from 'awaitly/persistence';
const cache = createMemoryCache({ maxSize: 1000, ttl: 7 * 24 * 60 * 60 });
const workflow = createWorkflow('workflow', deps, { cache });

Use step keys for idempotent operations:

const result = await workflow.run(async ({ step, deps }) => {
// Payment only processes once, even on retry
const payment = await step(
'processPayment',
() => deps.processPayment(orderId, amount),
{ key: `payment:${orderId}` }
);
return payment;
});

Errors are cached by default. If a step fails, subsequent runs return the same error:

// First run - fetchUser returns err('NOT_FOUND')
await workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('999'), { key: 'user:999' });
return user;
});
// result.error === 'NOT_FOUND'
// Second run - returns cached error (no fetch)
await workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('999'), { key: 'user:999' });
return user;
});
// result.error === 'NOT_FOUND' (from cache)

To retry on error, clear the cache key first:

cache.delete('user:999');
Use CaseCaching Helps
Idempotent operationsYes - payments, API calls
Resume after crashYes - completed steps skipped
Expensive computationsYes - don’t recompute
Time-sensitive dataNo - data may be stale
Non-idempotent operationsCareful - may cause issues
import { createWorkflow, createResumeStateCollector } from 'awaitly/workflow';
import { postgres } from 'awaitly-postgres';
const store = postgres(process.env.DATABASE_URL!);
async function runWithRecovery(workflowId: string) {
const savedState = await store.load(workflowId);
const collector = createResumeStateCollector();
const workflow = createWorkflow('workflow', { fetchUser, processPayment, sendConfirmation },
{ onEvent: collector.handleEvent }
);
const result = await workflow.run(async ({ step, deps }) => {
const user = await step(
'fetchUser',
() => deps.fetchUser('1'),
{ key: 'user:1' }
);
// Checkpoint after user fetch
await store.save(workflowId, collector.getResumeState());
const payment = await step(
'processPayment',
() => deps.processPayment(user.id, 100),
{ key: `payment:${user.id}` }
);
// Checkpoint after payment
await store.save(workflowId, collector.getResumeState());
await step(
'sendConfirmation',
() => deps.sendConfirmation(user.email, payment.id),
{ key: `confirm:${payment.id}` }
);
return { user, payment };
}, { resumeState: savedState ?? undefined });
// Final save
await store.save(workflowId, collector.getResumeState());
return result;
}

Learn about Streaming →