Workflows and Steps
Workflows compose multiple Result-returning operations. The step() function unwraps Results automatically. If any step fails, the workflow exits early with that error.
The step() Function
Section titled “The step() Function”step() is the core of workflow composition. It unwraps a Result, returning the value on success or exiting the workflow on error:
import { run } from 'awaitly/run';
const result = await run(async ({ step }) => { // If fetchUser returns err('NOT_FOUND'), workflow exits here const user = await step('fetchUser', () => fetchUser('123'));
// This only runs if fetchUser succeeded const posts = await step('fetchPosts', () => fetchPosts(user.id));
return { user, posts };});
if (result.ok) { console.log(result.value.user.name);} else { // result.error is 'NOT_FOUND' | 'FETCH_ERROR' console.log('Failed:', result.error);}Execution flow:
fetchUser('123') │ ├── ok(user) ──► fetchPosts(user.id) │ │ │ ├── ok(posts) ──► return { user, posts } │ │ │ └── err ──► exit workflow │ └── err ──► exit workflowrun() - One-Off Workflows
Section titled “run() - One-Off Workflows”Use run() for simple, one-off workflows. Specify error types as type parameters to preserve them in the result:
import { ok, err, type AsyncResult } from 'awaitly';import { run } from 'awaitly/run';
const getUser = async (id: string): AsyncResult<User, 'NOT_FOUND'> => { const user = await db.find(id); return user ? ok(user) : err('NOT_FOUND');};
const getOrders = async (userId: string): AsyncResult<Order[], 'FETCH_ERROR'> => { return ok(await db.orders.findByUser(userId));};
const result = await run<{ user: User; orders: Order[] }, 'NOT_FOUND' | 'FETCH_ERROR'>( async ({ step }) => { const user = await step('getUser', () => getUser('123')); const orders = await step('getOrders', () => getOrders(user.id)); return { user, orders }; });// result.error is: 'NOT_FOUND' | 'FETCH_ERROR' | UnexpectedErrorcreateWorkflow() - Reusable Workflows
Section titled “createWorkflow() - Reusable Workflows”For reusable workflows with automatic error type inference, use createWorkflow():
import { ok, err, type AsyncResult } from 'awaitly';import { createWorkflow } from 'awaitly/workflow';
const fetchUser = async (id: string): AsyncResult<User, 'NOT_FOUND'> => { /* ... */ };const sendEmail = async (to: string): AsyncResult<void, 'EMAIL_FAILED'> => { /* ... */ };
// Error types inferred from dependenciesconst workflow = createWorkflow('workflow', { fetchUser, sendEmail });
const result = await workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('1')); await step('sendEmail', () => deps.sendEmail(user.email)); return user;});
// result.error is automatically typed as:// 'NOT_FOUND' | 'EMAIL_FAILED' | UnexpectedError// awaitly extracts error types from your dependencies:
fetchUser: AsyncResult<User, 'NOT_FOUND'> ▲ │ extracted ▼sendEmail: AsyncResult<void, 'EMAIL_FAILED'> ▲ │ extracted ▼workflow error = 'NOT_FOUND' | 'EMAIL_FAILED' | UnexpectedErrorWhen to Use Each
Section titled “When to Use Each”| Scenario | Use |
|---|---|
| One-off workflow | run() |
| Reusable workflow | createWorkflow() |
| Need automatic error inference | createWorkflow() |
| Need caching or resume | createWorkflow() |
| Testing with mocks | run() |
| Dependencies known at compile time | createWorkflow() |
| Dependencies passed as parameters | run() |
Step Options
Section titled “Step Options”Use the first argument (step id) for visualization; use options for caching, retry, and timeout:
const user = await step('Fetch user', () => deps.fetchUser('1'), { key: 'user:1', // For caching/deduplication retry: { attempts: 3 }, // Retry on failure timeout: { ms: 5000 }, // Timeout after 5 seconds});Using Thunks for Caching
Section titled “Using Thunks for Caching”Pass a function (thunk) instead of calling directly to enable caching:
// With thunk - can be cached (ID required as first argument)const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });See Why thunks? for the mental model, plus why step('id', somePromise) can be misleading.
step.try() - Wrap Throwing Code
Section titled “step.try() - Wrap Throwing Code”Convert exceptions into typed errors:
const data = await step.try( 'fetchData', async () => { const res = await fetch('/api/data'); if (!res.ok) throw new Error(`HTTP ${res.status}`); return res.json(); }, { error: 'FETCH_FAILED' as const });The error type is added to the workflow’s error union automatically.
step.fromResult() - Preserve Error Details
Section titled “step.fromResult() - Preserve Error Details”When your operation returns a Result with rich error objects:
type ApiError = { code: number; message: string };
const callApi = async (): AsyncResult<Data, ApiError> => { // Returns err({ code: 429, message: 'Rate limited' })};
const data = await step.fromResult( 'callApi', () => callApi(), { onError: (apiError) => ({ type: 'API_ERROR' as const, ...apiError, }), });// On rate limit: { ok: false, error: { type: 'API_ERROR', code: 429, message: 'Rate limited' } }step.retry() - Retry with Backoff
Section titled “step.retry() - Retry with Backoff”const user = await step.retry( 'fetchUser', () => deps.fetchUser('1'), { attempts: 3, backoff: 'exponential', // 'fixed' | 'linear' | 'exponential' delayMs: 100, // Base delay maxDelayMs: 5000, // Cap for exponential jitter: true, // Add randomness retryOn: (error) => error !== 'NOT_FOUND', // Don't retry NOT_FOUND });Backoff strategies visualized
Section titled “Backoff strategies visualized”Fixed (delayMs: 100):#1: 100ms ████#2: 100ms ████#3: 100ms ████
Linear (delayMs: 100):#1: 100ms ████#2: 200ms ████████#3: 300ms ████████████
Exponential (delayMs: 100):#1: 100ms ████#2: 200ms ████████#3: 400ms ████████████████#4: 800ms ████████████████████████████████step.withTimeout() - Add Timeout
Section titled “step.withTimeout() - Add Timeout”const data = await step.withTimeout( 'slowOp', () => slowOperation(), { ms: 5000 });/*Output (completes in time):data contains the result
Output (times out):{ ok: false, error: StepTimeoutError }*/If the timeout is reached, the workflow gets a StepTimeoutError.
Combining retry and timeout
Section titled “Combining retry and timeout”const data = await step.retry( 'fetchData', () => step.withTimeout('fetchData', () => fetchData(), { ms: 2000 }), { attempts: 3, backoff: 'exponential' });Each attempt has a 2-second timeout. The whole operation retries up to 3 times.
Timeline:├── Attempt 1 ──────────────► timeout at 2s│ wait 100ms├── Attempt 2 ──────────────► timeout at 2s│ wait 200ms├── Attempt 3 ──────────────► timeout at 2s (or success!)step.sleep() - Pause Execution
Section titled “step.sleep() - Pause Execution”Pause execution for a specified duration. Use for intentional delays between operations (rate limiting, polling intervals, debouncing). Respects workflow cancellation.
Basic usage
Section titled “Basic usage”// String durationawait step.sleep("rate-limit-delay", "5s");
// Duration objectimport { seconds, millis } from 'awaitly/workflow';await step.sleep("my-sleep", seconds(5));await step.sleep("short-delay", millis(100));With options
Section titled “With options”await step.sleep("poll-delay", "2s", { key: "poll-delay", // For caching/deduplication (optional) ttl: 5000, // Cache TTL in milliseconds description: "Wait before next poll attempt", signal: controller.signal // Optional AbortSignal for cancellation});Rate limiting example
Section titled “Rate limiting example”const result = await workflow.run(async ({ step, deps }) => { const data = await step('fetchData', () => deps.fetchData());
// Respect rate limits - wait 1 second between requests await step.sleep("rate-limit-delay", "1s");
const moreData = await step('fetchMoreData', () => deps.fetchMoreData()); return { data, moreData };});Polling example
Section titled “Polling example”const result = await workflow.run(async ({ step, deps }) => { let status = await step('checkStatus', () => deps.checkStatus());
// Poll every 2 seconds until complete while (status !== "complete") { await step.sleep("poll-interval", "2s"); status = await step('checkStatus', () => deps.checkStatus()); }
return status;});Cancellation
Section titled “Cancellation”step.sleep supports two cancellation mechanisms:
Workflow cancellation - Cancel the entire workflow:
const controller = new AbortController();const workflow = createWorkflow('workflow', { }, { signal: controller.signal });
const resultPromise = workflow.run(async ({ step, deps }) => { await step.sleep("long-sleep", "10s"); return "completed";});
// Cancel after 1 secondsetTimeout(() => controller.abort(), 1000);
const result = await resultPromise;// result.ok is false, error is WorkflowCancelledErrorUser-provided signal - Cancel a specific sleep without affecting the workflow:
const result = await run(async ({ step }) => { const sleepController = new AbortController();
// Cancel this specific sleep after 1 second setTimeout(() => sleepController.abort(), 1000);
try { await step.sleep("long-sleep", "10s", { signal: sleepController.signal }); return "completed normally"; } catch { // Sleep was cancelled by user signal return "sleep cancelled, continuing workflow"; }});Parallel Steps
Section titled “Parallel Steps”Run multiple steps concurrently:
import { allAsync } from 'awaitly';
const result = await workflow.run(async ({ step, deps }) => { const [user, posts, comments] = await step('fetchUserData', () => allAsync([ deps.fetchUser('1'), deps.fetchPosts('1'), deps.fetchComments('1'), ]) ); return { user, posts, comments };});/*Output (all succeed):{ ok: true, value: { user, posts, comments } }
Output (any fails):{ ok: false, error: <first error> }*/import { allSettled } from 'awaitly';
const result = await workflow.run(async ({ step, deps }) => { const results = await step('fetchAllSettled', () => Promise.all([ deps.fetchUser('1'), deps.fetchPosts('1'), deps.fetchComments('1'), ]).then(allSettled) ); // results contains all outcomes, even failures return results;});Named Steps for Visualization
Section titled “Named Steps for Visualization”Use descriptive step ids (first argument) to see them in workflow diagrams:
const user = await step('Fetch user', () => fetchUser('1'));const posts = await step('Fetch posts', () => fetchPosts(user.id));/*Workflow diagram shows:┌─────────────┐ ┌─────────────┐│ Fetch user │ ──► │ Fetch posts │└─────────────┘ └─────────────┘*/See Visualization for details.
Workflow Options
Section titled “Workflow Options”Enable step caching to avoid re-executing completed steps:
const workflow = createWorkflow('workflow', deps, { cache: new Map(),});
const result = await workflow.run(async ({ step, deps }) => { // This step runs once, even if workflow is called multiple times const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' }); return user;});resumeState
Section titled “resumeState”Resume a workflow from saved state:
const workflow = createWorkflow('workflow', deps, { resumeState: savedState, // From a previous run});
const result = await workflow.run(async ({ step, deps }) => { // Cached steps return their saved values const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' }); return user;});See Persistence for details.
streamStore
Section titled “streamStore”Enable streaming within workflows:
import { createMemoryStreamStore } from 'awaitly/streaming';
const workflow = createWorkflow('workflow', deps, { streamStore: createMemoryStreamStore(),});
const result = await workflow.run(async ({ step, deps }) => { const writer = step.getWritable<string>({ namespace: 'tokens' });
await writer.write('Hello'); await writer.write(' World'); await writer.close();});See Streaming for details.
onEvent
Section titled “onEvent”Subscribe to workflow events for logging or visualization:
const workflow = createWorkflow('workflow', deps, { onEvent: (event) => { switch (event.type) { case 'step_start': console.log('Starting:', event.name); break; case 'step_complete': console.log('Completed:', event.name, event.durationMs, 'ms'); break; case 'step_error': console.error('Failed:', event.name, event.error); break; } },});/*Output:Starting: Fetch userCompleted: Fetch user 45 msStarting: Send emailFailed: Send email EMAIL_FAILED*/catchUnexpected (custom unexpected type)
Section titled “catchUnexpected (custom unexpected type)”By default, thrown exceptions become an UnexpectedError. To replace it with your own type, pass catchUnexpected:
const workflow = createWorkflow('workflow', deps, { catchUnexpected: (thrown) => ({ type: 'UNEXPECTED' as const, message: String(thrown), }),});// result.error is E | { type: 'UNEXPECTED', message: string }This also works with run():
const result = await run<User, 'NOT_FOUND'>( async ({ step }) => { return await step('fetchUser', () => fetchUser('1')); }, { catchUnexpected: (thrown) => ({ type: 'UNEXPECTED' as const, message: String(thrown) }) });// result.error is 'NOT_FOUND' | { type: 'UNEXPECTED', message: string }// Without catchUnexpected: result.error is 'NOT_FOUND' | UnexpectedErrorsignal (Cancellation)
Section titled “signal (Cancellation)”Cancel workflows externally using an AbortSignal:
import { isWorkflowCancelled } from 'awaitly/workflow';
const controller = new AbortController();const workflow = createWorkflow('workflow', deps, { signal: controller.signal,});
// Start workflowconst resultPromise = workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'fetch-user' }); await step('sendEmail', () => deps.sendEmail(user.email), { key: 'send-email' }); return user;});
// Cancel from outside (e.g., timeout, user action)setTimeout(() => controller.abort('timeout'), 5000);
const result = await resultPromise;if (!result.ok && isWorkflowCancelled(result.cause)) { console.log('Cancelled:', result.cause.reason);}/*Output (if cancelled):Cancelled: timeout*/Passing signal to operations:
Steps using step.withTimeout can receive the workflow signal:
const result = await workflow.run(async ({ step, deps }) => { // Signal fires on EITHER timeout OR workflow cancellation const data = await step.withTimeout( 'fetchUrl', (signal) => fetch(url, { signal }), { ms: 3000, signal: true } ); return data;});Or access the signal directly via context:
const result = await workflow.run(async ({ step, deps, ctx }) => { // Manual signal access const response = await fetch(url, { signal: ctx.signal });
// Check if cancelled if (ctx.signal?.aborted) { return err('CANCELLED' as const); } return ok(response);});Execution-Time Options
Section titled “Execution-Time Options”Override options for specific runs with workflow.run(fn, config), or use a named run with workflow.run("run-id", fn, config):
const workflow = createWorkflow('workflow', { fetchUser }, { onEvent: defaultLogger, // Creation-time default});
// Normal run - uses creation-time optionsawait workflow.run(async ({ step, deps }) => { /* ... */ });
// Per-run options override creation-timeawait workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('1')); return user;}, { onEvent: vizualizer.handleEvent, // Override for this run only signal: controller.signal, // Add signal for this run});Override semantics
Section titled “Override semantics”Execution-time options override creation-time options:
const workflow = createWorkflow('workflow', deps, { onEvent: handlerA, // Creation-time signal: signalA,});
// handlerB overrides handlerA for this run// signalA is NOT used (signalB takes over)await workflow.run(fn, { onEvent: handlerB, // Wins signal: signalB, // Wins});
// Back to creation-time defaultsawait workflow.run(fn); // Uses handlerA, signalAAvailable execution options (RunConfig)
Section titled “Available execution options (RunConfig)”| Option | Description |
|---|---|
onEvent | Event handler for this run |
onError | Error handler for this run |
signal | AbortSignal for this run |
createContext | Context factory for this run |
resumeState | Resume state for this run (can be async factory) |
deps | Partial override of creation-time deps |
snapshot | Restore from a previously saved snapshot |
shouldRun | Concurrency control hook |
onBeforeStart | Pre-start hook |
onAfterStep | Post-step hook for checkpointing |
streamStore | Stream store for this run |
Workflow input (closure)
Section titled “Workflow input (closure)”Pass input by capturing it in a closure and calling workflow.run(fn):
const workflow = createWorkflow('workflow', deps);
const userId = '123';const requestId = 'abc';
const result = await workflow.run(async ({ step, deps }) => { console.log('User ID:', userId); const user = await step('fetchUser', () => deps.fetchUser(userId)); return user;});Composing Workflows
Section titled “Composing Workflows”Workflows are just functions. Compose them naturally:
const validateInput = createWorkflow('workflow', { validateEmail, validatePassword });const processPayment = createWorkflow('workflow', { chargeCard, saveReceipt });
const checkout = createWorkflow('workflow', { validateEmail, validatePassword, chargeCard, saveReceipt,});
const result = await checkout.run(async ({ step, deps }) => { // Use steps from all dependencies await step('validateEmail', () => deps.validateEmail(input.email)); await step('validatePassword', () => deps.validatePassword(input.password)); await step('chargeCard', () => deps.chargeCard(input.amount)); await step('saveReceipt', () => deps.saveReceipt(input.orderId));});Step Variants at a Glance
Section titled “Step Variants at a Glance”| Variant | Use when |
|---|---|
step('id', fn) | Operation returns a Result |
step.try(id, fn, opts) | Operation throws exceptions |
step.fromResult(id, fn, opts) | Need to transform Result errors |
step.retry(id, fn, opts) | Operation might fail transiently |
step.withTimeout(id, fn, opts) | Operation might hang |
step.sleep(id, duration, opts) | Need intentional delay (rate limiting, polling) |
Complete Example
Section titled “Complete Example”import { ok, err, type AsyncResult } from 'awaitly';import { createWorkflow } from 'awaitly/workflow';
type User = { id: string; name: string; email: string };type Post = { id: number; title: string };
const fetchUser = async (id: string): AsyncResult<User, 'NOT_FOUND'> => id === '1' ? ok({ id: '1', name: 'Alice', email: 'alice@example.com' }) : err('NOT_FOUND');
const fetchPosts = async (userId: string): AsyncResult<Post[], 'FETCH_ERROR'> => ok([{ id: 1, title: 'Hello World' }]);
const sendWelcome = async (email: string): AsyncResult<void, 'EMAIL_FAILED'> => ok(undefined);
const onboardUser = createWorkflow('workflow', { fetchUser, fetchPosts, sendWelcome });
const result = await onboardUser(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('1')); const posts = await step('fetchPosts', () => deps.fetchPosts(user.id)); await step('sendWelcome', () => deps.sendWelcome(user.email));
return { user, postCount: posts.length };});
if (result.ok) { console.log(`${result.value.user.name} has ${result.value.postCount} posts`);} else { // TypeScript knows all possible errors switch (result.error) { case 'NOT_FOUND': console.log('User not found'); break; case 'FETCH_ERROR': console.log('Failed to fetch posts'); break; case 'EMAIL_FAILED': console.log('Failed to send welcome email'); break; }}