State and Resumption
awaitly can save workflow state and resume from where it left off. This enables crash recovery, long-running workflows, and idempotent operations.
Step Caching
Section titled “Step Caching”Enable caching so steps don’t re-execute:
const workflow = createWorkflow('workflow', deps, { cache: new Map(),});
const result = await workflow.run(async ({ step, deps }) => { // Cached with key 'user:1' const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' }); return user;});Key Requirements
Section titled “Key Requirements”Keys must be:
- Unique per step: Different steps need different keys
- Stable: Same input should produce same key
- Deterministic: No timestamps or random values
// Good keys{ key: 'user:123' }{ key: `posts:${userId}` }{ key: `order:${orderId}:validate` }
// Bad keys{ key: `user:${Date.now()}` } // Changes every call{ key: `user:${Math.random()}` } // RandomThunks Required for Caching
Section titled “Thunks Required for Caching”Pass a function, not the result of calling:
// With thunk - can be cached (ID required as first argument)const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });Cache Scope
Section titled “Cache Scope”The cache persists across workflow runs:
const cache = new Map();const workflow = createWorkflow('workflow', deps, { cache });
// First run - fetches userawait workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' }); return user;});
// Second run - uses cached value (no fetch)await workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' }); return user;});Collecting State for Persistence
Section titled “Collecting State for Persistence”Use createResumeStateCollector to capture step results:
import { createWorkflow, createResumeStateCollector } from 'awaitly/workflow';
const collector = createResumeStateCollector();
const workflow = createWorkflow('workflow', { fetchUser, fetchPosts }, { onEvent: collector.handleEvent });
await workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' }); const posts = await step('fetchPosts', () => deps.fetchPosts(user.id), { key: `posts:${user.id}` }); return { user, posts };});
// Get collected stateconst state = collector.getResumeState();Saving and Loading State
Section titled “Saving and Loading State”Use createResumeStateCollector to capture step results during a run, then persist the returned ResumeState (JSON-serializable). To resume, pass the loaded state to workflow.run(fn, { resumeState }) or to creation options:
const collector = createResumeStateCollector();const workflow = createWorkflow('workflow', deps, { onEvent: collector.handleEvent });
await workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' }); return user;});
const state = collector.getResumeState();await store.save('wf-123', state);To resume, load the state and pass it to the next run:
const savedState = await store.load('wf-123');
const workflow = createWorkflow('workflow', { fetchUser, fetchPosts }, { onEvent: collector.handleEvent });
await workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' }); const posts = await step('fetchPosts', () => deps.fetchPosts(user.id), { key: `posts:${user.id}` }); return { user, posts };}, { resumeState: savedState });You can also pass resumeState at creation time: createWorkflow('workflow', deps, { resumeState: savedState }).
Async state loading
Section titled “Async state loading”Load the state before running:
const savedState = await store.load('wf-123');await workflow.run(fn, { resumeState: savedState });Crash Recovery Pattern
Section titled “Crash Recovery Pattern”Save state after each critical step using the collector:
const store = postgres(process.env.DATABASE_URL!);const collector = createResumeStateCollector();const workflow = createWorkflow('workflow', deps, { onEvent: collector.handleEvent });
const savedState = await store.load('wf-123');
const result = await workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });
const state = collector.getResumeState(); await store.save('wf-123', state);
const posts = await step('fetchPosts', () => deps.fetchPosts(user.id), { key: `posts:${user.id}` }); return { user, posts };}, { resumeState: savedState ?? undefined });
const finalState = collector.getResumeState();await store.save('wf-123', finalState);If the workflow crashes, resume from the last saved state:
const savedState = await store.load('wf-123');
await workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' }); const posts = await step('fetchPosts', () => deps.fetchPosts(user.id), { key: `posts:${user.id}` }); return { user, posts };}, { resumeState: savedState });Checking Step Completion
Section titled “Checking Step Completion”From loaded resume state, check step completion via the steps Map:
const savedState = await store.load('wf-123');
if (savedState?.steps?.get('user:1')) { console.log('User step already completed');}For event streams, use the isStepComplete type guard from awaitly/workflow to narrow event types.
Persistence Store
Section titled “Persistence Store”Use a store (e.g. from postgres(), mongo(), or libsql()) to persist resume state. Save collector.getResumeState() after runs and pass the loaded state to workflow.run(fn, { resumeState }) or creation options:
import { postgres } from 'awaitly-postgres';
const store = postgres(process.env.DATABASE_URL!);const collector = createResumeStateCollector();const workflow = createWorkflow('workflow', deps, { onEvent: collector.handleEvent });
await workflow.run(fn);await store.save('wf-123', collector.getResumeState());
const savedState = await store.load('wf-123');await workflow.run(fn, { resumeState: savedState });See Persistence for the full interface and custom adapters (e.g. Redis).
In-Memory Step Cache
Section titled “In-Memory Step Cache”For process-local caching without persistence, use createMemoryCache from awaitly/persistence:
import { createMemoryCache } from 'awaitly/persistence';
const cache = createMemoryCache({ maxSize: 1000, ttl: 7 * 24 * 60 * 60 });
const workflow = createWorkflow('workflow', deps, { cache });Idempotency Keys
Section titled “Idempotency Keys”Use step keys for idempotent operations:
const result = await workflow.run(async ({ step, deps }) => { // Payment only processes once, even on retry const payment = await step( 'processPayment', () => deps.processPayment(orderId, amount), { key: `payment:${orderId}` } );
return payment;});Error Caching
Section titled “Error Caching”Errors are cached by default. If a step fails, subsequent runs return the same error:
// First run - fetchUser returns err('NOT_FOUND')await workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('999'), { key: 'user:999' }); return user;});// result.error === 'NOT_FOUND'
// Second run - returns cached error (no fetch)await workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('999'), { key: 'user:999' }); return user;});// result.error === 'NOT_FOUND' (from cache)To retry on error, clear the cache key first:
cache.delete('user:999');When to Use Caching
Section titled “When to Use Caching”| Use Case | Caching Helps |
|---|---|
| Idempotent operations | Yes - payments, API calls |
| Resume after crash | Yes - completed steps skipped |
| Expensive computations | Yes - don’t recompute |
| Time-sensitive data | No - data may be stale |
| Non-idempotent operations | Careful - may cause issues |
Complete Example
Section titled “Complete Example”import { createWorkflow, createResumeStateCollector } from 'awaitly/workflow';import { postgres } from 'awaitly-postgres';
const store = postgres(process.env.DATABASE_URL!);
async function runWithRecovery(workflowId: string) { const savedState = await store.load(workflowId); const collector = createResumeStateCollector();
const workflow = createWorkflow('workflow', { fetchUser, processPayment, sendConfirmation }, { onEvent: collector.handleEvent } );
const result = await workflow.run(async ({ step, deps }) => { const user = await step( 'fetchUser', () => deps.fetchUser('1'), { key: 'user:1' } );
// Checkpoint after user fetch await store.save(workflowId, collector.getResumeState());
const payment = await step( 'processPayment', () => deps.processPayment(user.id, 100), { key: `payment:${user.id}` } );
// Checkpoint after payment await store.save(workflowId, collector.getResumeState());
await step( 'sendConfirmation', () => deps.sendConfirmation(user.email, payment.id), { key: `confirm:${payment.id}` } );
return { user, payment }; }, { resumeState: savedState ?? undefined });
// Final save await store.save(workflowId, collector.getResumeState());
return result;}