Workflows
You create workflows with createWorkflow (reusable, with dependency inference) or use run() for one-off operations. Execution is always via workflow.run(fn) — the workflow object is not callable.
Type Structure
Section titled “Type Structure”createWorkflow returns a function that produces:
┌─── Your success return type │ ┌─── Union of all dependency error types + UnexpectedError ▼ ▼Result<Value, 'NOT_FOUND' | 'EMAIL_FAILED' | UnexpectedError> ▲ └─── Automatically inferred from dependencies!createWorkflow
Section titled “createWorkflow”Creates a reusable workflow with automatic error type inference from dependencies:
import { ok, err, type AsyncResult } from 'awaitly';import { createWorkflow } from 'awaitly/workflow';
const fetchUser = async (id: string): AsyncResult<User, 'NOT_FOUND'> => { /* ... */ };const sendEmail = async (to: string): AsyncResult<void, 'EMAIL_FAILED'> => { /* ... */ };
// Error types inferred from dependenciesconst workflow = createWorkflow('workflow', { fetchUser, sendEmail });
const result = await workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('1')); await step('sendEmail', () => deps.sendEmail(user.email)); return user;});/*result.error is automatically typed as:'NOT_FOUND' | 'EMAIL_FAILED' | UnexpectedError*/// awaitly extracts error types from your dependencies:
fetchUser: AsyncResult<User, 'NOT_FOUND'> ▲ │ extracted ▼sendEmail: AsyncResult<void, 'EMAIL_FAILED'> ▲ │ extracted ▼workflow error = 'NOT_FOUND' | 'EMAIL_FAILED' | UnexpectedErrorWith options
Section titled “With options”const workflow = createWorkflow('workflow', { fetchUser, sendEmail }, { cache: new Map(), // Enable step caching onEvent: (event) => console.log(event), // Event stream streamStore: createMemoryStreamStore(), // Enable streaming });The value returned by createWorkflow is a workflow object with a single .run() method. Use workflow.run(fn) or workflow.run(name, fn) to execute; pass per-run options as workflow.run(fn, config).
For one-off workflows where you specify error types manually:
import { run } from 'awaitly/run';import { type Errors } from 'awaitly';
type RunErrors = Errors<[typeof fetchUser, typeof sendEmail]>;const result = await run<User, RunErrors>( async ({ step }) => { const user = await step('fetchUser', () => fetchUser('1')); await step('sendEmail', () => sendEmail(user.email)); return user; });// result.error is: 'NOT_FOUND' | 'EMAIL_FAILED' | UnexpectedErrorimport { run } from 'awaitly/run';
const result = await run<User, 'NOT_FOUND' | 'EMAIL_FAILED'>( async ({ step }) => { const user = await step('fetchUser', () => fetchUser('1')); await step('sendEmail', () => sendEmail(user.email)); return user; }, { catchUnexpected: (thrown) => ({ type: 'UNEXPECTED' as const, message: String(thrown) }) });// result.error is: 'NOT_FOUND' | 'EMAIL_FAILED' | { type: 'UNEXPECTED', message: string }import { run } from 'awaitly/run';
const result = await run(async ({ step }) => { const user = await step('fetchUser', () => fetchUser('1')); await step('sendEmail', () => sendEmail(user.email)); return user;});// result.error is: UnexpectedError (step error types not preserved at compile time)When to use each
Section titled “When to use each”| Scenario | Use |
|---|---|
| Dependencies known at compile time | createWorkflow |
| Dependencies passed as parameters | run |
| Need step caching or resume | createWorkflow |
| Need automatic error inference | createWorkflow |
| One-off workflow | run |
| Testing with mocks | run |
// Production: dependencies known ahead of timeconst checkout = createWorkflow('workflow', { validateOrder, processPayment, sendConfirmation,});
// Can reuse with different inputsawait checkout.run(async ({ step, deps }) => { /* ... */ });await checkout.run(async ({ step, deps }) => { /* different flow */ });// Testing: dependencies injectedconst mockFetch = async () => ok({ id: '1', name: 'Test' });
const result = await run<User, 'NOT_FOUND'>(async ({ step }) => { return await step('mockFetch', () => mockFetch());});Workflow options
Section titled “Workflow options”Enable step caching to avoid re-executing completed steps:
const workflow = createWorkflow('workflow', deps, { cache: new Map(),});
const result = await workflow.run(async ({ step, deps }) => { // This step runs once, even if workflow is called multiple times const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' }); return user;});/*First call: fetchUser executesSecond call: returns cached result immediately*/resumeState
Section titled “resumeState”Resume a workflow from saved state:
const workflow = createWorkflow('workflow', deps, { resumeState: savedState, // From a previous run});
const result = await workflow.run(async ({ step, deps }) => { // Cached steps return their saved values const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' }); return user;});See Persistence for details.
streamStore
Section titled “streamStore”Enable streaming within workflows:
import { createMemoryStreamStore } from 'awaitly/streaming';
const workflow = createWorkflow('workflow', deps, { streamStore: createMemoryStreamStore(),});
const result = await workflow.run(async ({ step, deps }) => { const writer = step.getWritable<string>({ namespace: 'tokens' });
await writer.write('Hello'); await writer.write(' World'); await writer.close();});See Streaming for details.
onEvent
Section titled “onEvent”Subscribe to workflow events for logging, visualization, or debugging:
const workflow = createWorkflow('workflow', deps, { onEvent: (event) => { switch (event.type) { case 'step_start': console.log('Starting:', event.name); break; case 'step_complete': console.log('Completed:', event.name, event.durationMs, 'ms'); break; case 'step_error': console.error('Failed:', event.name, event.error); break; } },});/*Output:Starting: Fetch userCompleted: Fetch user 45 msStarting: Send emailFailed: Send email EMAIL_FAILED*/catchUnexpected (custom unexpected type)
Section titled “catchUnexpected (custom unexpected type)”By default, thrown exceptions become an UnexpectedError. To replace it with your own type, pass catchUnexpected:
// Default - error union includes UnexpectedErrorconst workflow = createWorkflow('workflow', deps);
const result = await workflow.run(async ({ step, deps }) => { /* ... */ });// result.error: 'NOT_FOUND' | 'ORDER_FAILED' | UnexpectedError// Custom unexpected - your shape replaces UnexpectedErrorconst workflow = createWorkflow('workflow', deps, { catchUnexpected: (cause) => ({ type: 'UNEXPECTED' as const, cause })});
const result = await workflow.run(async ({ step, deps }) => { /* ... */ });// result.error: 'NOT_FOUND' | 'ORDER_FAILED' | { type: 'UNEXPECTED', cause: unknown }// Works with run() tooconst result = await run<User, 'NOT_FOUND' | 'EMAIL_FAILED'>( async ({ step }) => { const user = await step('fetchUser', () => fetchUser('1')); await step('sendEmail', () => sendEmail(user.email)); return user; }, { catchUnexpected: (cause) => ({ type: 'UNEXPECTED' as const, cause }) });// result.error: 'NOT_FOUND' | 'EMAIL_FAILED' | { type: 'UNEXPECTED', cause: unknown }With catchUnexpected:
- Your custom type replaces
UnexpectedErrorin the error union. - Use for exhaustive switch statements or when you want a single typed shape for all unexpected cases.
signal (Cancellation)
Section titled “signal (Cancellation)”Cancel workflows externally using an AbortSignal:
import { isWorkflowCancelled } from 'awaitly/workflow';
const controller = new AbortController();const workflow = createWorkflow('workflow', deps, { signal: controller.signal,});
// Start workflowconst resultPromise = workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'fetch-user' }); await step('sendEmail', () => deps.sendEmail(user.email), { key: 'send-email' }); return user;});
// Cancel from outside (e.g., timeout, user action)setTimeout(() => controller.abort('timeout'), 5000);
const result = await resultPromise;if (!result.ok && isWorkflowCancelled(result.cause)) { console.log('Cancelled:', result.cause.reason);}/*Output (if cancelled):Cancelled: timeout*/Passing signal to operations:
Steps using step.withTimeout can receive the workflow signal:
const result = await workflow.run(async ({ step, deps }) => { // Signal fires on EITHER timeout OR workflow cancellation const data = await step.withTimeout( 'fetchUrl', (signal) => fetch(url, { signal }), { ms: 3000, signal: true } ); return data;});Or access the signal directly via context:
const result = await workflow.run(async ({ step, deps, ctx }) => { // Manual signal access const response = await fetch(url, { signal: ctx.signal });
// Check if cancelled if (ctx.signal?.aborted) { return err('CANCELLED' as const); } return ok(response);});Execution-time options
Section titled “Execution-time options”While createWorkflow accepts options at creation time, you may need different options for specific runs. Pass a config as the second argument to workflow.run(fn, config), or use a named run with workflow.run("run-id", fn, config) for logging and tracing:
const workflow = createWorkflow('workflow', { fetchUser }, { onEvent: defaultLogger, // Creation-time default});
// Normal run - uses creation-time optionsawait workflow.run(async ({ step, deps }) => { /* ... */ });
// Per-run options override creation-timeawait workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('1')); return user;}, { onEvent: vizualizer.handleEvent, // Override for this run only signal: controller.signal, // Add signal for this run});
// Named run (workflowId for logging, tracing, resume)await workflow.run('onboard-user-1', async ({ step, deps }) => { return await step('fetchUser', () => deps.fetchUser('1'));});Override semantics
Section titled “Override semantics”Execution-time options override creation-time options:
const workflow = createWorkflow('workflow', deps, { onEvent: handlerA, // Creation-time signal: signalA,});
// handlerB overrides handlerA for this run// signalA is NOT used (signalB takes over)await workflow.run(fn, { onEvent: handlerB, // Wins signal: signalB, // Wins});
// Back to creation-time defaultsawait workflow.run(fn); // Uses handlerA, signalAAvailable execution options (RunConfig)
Section titled “Available execution options (RunConfig)”| Option | Description |
|---|---|
onEvent | Event handler for this run |
onError | Error handler for this run |
signal | AbortSignal for this run |
createContext | Context factory for this run |
resumeState | Resume state for this run (can be async factory) |
deps | Partial override of creation-time deps |
snapshot | Restore from a previously saved snapshot |
shouldRun | Concurrency control hook |
onBeforeStart | Pre-start hook |
onAfterStep | Post-step hook for checkpointing |
streamStore | Stream store for this run |
Composing workflows
Section titled “Composing workflows”Workflows are just functions. Compose them naturally:
const validateInput = createWorkflow('workflow', { validateEmail, validatePassword });const processPayment = createWorkflow('workflow', { chargeCard, saveReceipt });
const checkout = createWorkflow('workflow', { validateEmail, validatePassword, chargeCard, saveReceipt,});
const result = await checkout.run(async ({ step, deps }) => { // Use steps from all dependencies await step('validateEmail', () => deps.validateEmail(input.email)); await step('validatePassword', () => deps.validatePassword(input.password)); await step('chargeCard', () => deps.chargeCard(input.amount)); await step('saveReceipt', () => deps.saveReceipt(input.orderId));});Workflow input (closure)
Section titled “Workflow input (closure)”Pass input to a workflow by capturing it in a closure and calling workflow.run(fn):
const workflow = createWorkflow('workflow', deps);
const userId = '123';const requestId = 'abc';
const result = await workflow.run(async ({ step, deps }) => { console.log('User ID:', userId); const user = await step('fetchUser', () => deps.fetchUser(userId)); return user;});