PostgreSQL Persistence
The awaitly-postgres package provides a ready-to-use PostgreSQL persistence adapter for awaitly workflows.
Source code: GitHub
Installation
Section titled “Installation”npm install awaitly-postgres pg# orpnpm add awaitly-postgres pg# oryarn add awaitly-postgres pgQuick Start
Section titled “Quick Start”import { postgres } from 'awaitly-postgres';import { createWorkflow, createResumeStateCollector } from 'awaitly/workflow';
// One-liner setupconst store = postgres('postgresql://localhost/mydb');const collector = createResumeStateCollector();
const workflow = createWorkflow('workflow', { fetchUser, createOrder }, { onEvent: collector.handleEvent });
await workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('123'), { key: 'fetch-user' }); const order = await step('createOrder', () => deps.createOrder(user), { key: 'create-order' }); return order;});
await store.save('checkout-123', collector.getResumeState());
// Later: restore + resumeconst savedState = await store.load('checkout-123');await workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('123'), { key: 'fetch-user' }); const order = await step('createOrder', () => deps.createOrder(user), { key: 'create-order' }); return order;}, { resumeState: savedState ?? undefined });Configuration
Section titled “Configuration”String Shorthand
Section titled “String Shorthand”const store = postgres('postgresql://user:password@localhost:5432/dbname');Object Options
Section titled “Object Options”const store = postgres({ url: 'postgresql://localhost/mydb', table: 'my_workflow_snapshots', // Default: 'awaitly_snapshots' prefix: 'orders:', // Default: '' autoCreateTable: true, // Default: true lock: { lockTableName: 'my_workflow_locks' }, // Optional: cross-process locking});Bring Your Own Pool
Section titled “Bring Your Own Pool”import { Pool } from 'pg';import { postgres } from 'awaitly-postgres';
const pool = new Pool({ connectionString: process.env.DATABASE_URL });const store = postgres({ url: 'postgresql://localhost/mydb', pool: pool,});Store Interface
Section titled “Store Interface”The store implements the SnapshotStore interface:
interface SnapshotStore { save(id: string, snapshot: WorkflowSnapshot): Promise<void>; load(id: string): Promise<WorkflowSnapshot | null>; delete(id: string): Promise<void>; list(options?: { prefix?: string; limit?: number }): Promise<Array<{ id: string; updatedAt: string }>>; close(): Promise<void>;}Usage Examples
Section titled “Usage Examples”// Save snapshotawait store.save('wf-123', collector.getResumeState());
// Load snapshot (returns null if not found)const snapshot = await store.load('wf-123');
// Delete snapshotawait store.delete('wf-123');
// List recent workflowsconst workflows = await store.list({ limit: 100 });// [{ id: 'wf-123', updatedAt: '2024-01-15T10:30:00Z' }, ...]
// List with prefix filterconst orderWorkflows = await store.list({ prefix: 'orders:', limit: 50 });
// Clean shutdownawait store.close();Table Schema
Section titled “Table Schema”The adapter automatically creates a table with the following schema:
CREATE TABLE IF NOT EXISTS awaitly_snapshots ( id TEXT PRIMARY KEY, snapshot JSONB NOT NULL, updated_at TIMESTAMPTZ DEFAULT NOW());
CREATE INDEX IF NOT EXISTS awaitly_snapshots_updated_at_idxON awaitly_snapshots (updated_at DESC);With Durable Execution
Section titled “With Durable Execution”Use the same postgres() store with durable.run:
import { postgres } from 'awaitly-postgres';import { durable } from 'awaitly/durable';
const store = postgres(process.env.DATABASE_URL!);
const result = await durable.run( { fetchUser, createOrder }, async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('123'), { key: 'fetch-user' }); const order = await step('createOrder', () => deps.createOrder(user), { key: 'create-order' }); return order; }, { id: 'checkout-123', store });For cross-process locking, pass lock when creating the store so only one process runs a given workflow ID at a time.
Features
Section titled “Features”- ✅ One-liner setup - Just pass a connection string
- ✅ Automatic table creation - No manual schema setup required
- ✅ JSONB storage - Native PostgreSQL JSON support
- ✅ Connection pooling - Efficient connection management
- ✅ Pattern matching - List workflows by prefix
- ✅ Timestamps - Automatic
updated_attracking
Production Considerations
Section titled “Production Considerations”Connection Pooling
Section titled “Connection Pooling”The pool is managed automatically. For high-load scenarios, bring your own pool with custom settings:
const pool = new Pool({ connectionString: process.env.DATABASE_URL, max: 20, min: 5, idleTimeoutMillis: 30000,});
const store = postgres({ url: process.env.DATABASE_URL!, pool });Cleanup
Section titled “Cleanup”List and delete completed workflows:
const completed = await store.list({ limit: 1000 });for (const { id } of completed) { const snapshot = await store.load(id); if (snapshot?.execution.status === 'completed') { await store.delete(id); }}Monitoring
Section titled “Monitoring”-- Check table sizeSELECT pg_size_pretty(pg_total_relation_size('awaitly_snapshots'));
-- Count by statusSELECT (snapshot->'execution')->>'status' AS status, COUNT(*)FROM awaitly_snapshotsGROUP BY 1;Requirements
Section titled “Requirements”- Node.js >= 22
- PostgreSQL >= 12
pgpackage