Steps
step() executes an operation within a workflow. If the operation fails, the workflow exits early.
Basic usage
Section titled “Basic usage”step() requires an explicit string ID as the first argument: step('id', fn, opts) or step('id', result, opts). The string literal is the step ID used in events, diagrams, and caching.
const workflow = createWorkflow('workflow', { fetchUser, fetchPosts });
const result = await workflow.run(async ({ step, deps }) => { const user = await step('fetchUser', () => deps.fetchUser('1')); const posts = await step('fetchPosts', () => deps.fetchPosts(user.id)); return { user, posts };});Why thunks?
Section titled “Why thunks?”You’ll often see step('id', () => fn()) (a thunk) instead of step('id', fn()) (an already-started Promise). This is the same pattern used by TanStack React Query’s queryFn: the orchestrator needs to control when work starts so it can cache, dedupe, retry, and resume correctly.
See Why thunks? for the mental model and the “why step('id', somePromise) may work” nuance.
Using thunks for caching
Section titled “Using thunks for caching”Pass a function (thunk) as the second argument so the step can be cached. The first argument is the required string ID:
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });Step options
Section titled “Step options”Pass options as the third argument to step('id', fn, opts):
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1', // For caching/deduplication retry: { attempts: 3 }, // Retry on failure timeout: { ms: 5000 }, // Timeout after 5 seconds description: 'Load user by ID', // For generated docs});Optional metadata (static analysis & observability)
Section titled “Optional metadata (static analysis & observability)”You can attach optional metadata to steps for diagrams, diagnostics, and observability (e.g. OpenTelemetry, workflow summaries). Three groups:
- Architecture & intent:
intent,domain,owner,tags— for grouping and labeling (e.g.domain: 'payments'). - Effects & dependencies:
stateChanges,emits,calls— what the step mutates, what events it emits, which external services it calls. - Error classification:
errorMeta— map from error tag to{ retryable?, severity?, description? }for diagnostics andstep_errorevents.
See Static Analysis and Documenting workflows for where this metadata is used.
await step('charge', () => deps.charge(orderId), { domain: 'payments', intent: 'charge customer', owner: 'billing-team', calls: ['stripe.charges.create'], errorMeta: { RATE_LIMITED: { retryable: true, severity: 'medium' }, CARD_DECLINED: { retryable: false, severity: 'high' }, },});Step ID and key: label vs instance
Section titled “Step ID and key: label vs instance”The first argument is the step label (what kind of step: 'fetchUser', 'sendEmail'). It must be a string literal and is used in diagrams, events, and static docs.
The key option (optional) is the instance identity (which iteration or entity: user:${id}, payment-${payment.id}). When present, the runtime uses it for cache, snapshots, and events—so per-iteration identity comes from key, not from the first argument.
In loops, use one literal step ID (label) and a per-iteration key (instance) so events and snapshots still get distinct identities per iteration.
step.try - wrap throwing code
Section titled “step.try - wrap throwing code”Convert exceptions into typed errors:
const data = await step.try( 'fetchData', async () => { const res = await fetch('/api/data'); if (!res.ok) throw new Error(`HTTP ${res.status}`); return res.json(); }, { error: 'FETCH_FAILED' as const });/*Output (success):data contains the parsed JSON
Output (failure):Workflow returns { ok: false, error: 'FETCH_FAILED' }*/The error type is added to the workflow’s error union automatically.
step.fromResult - preserve error details
Section titled “step.fromResult - preserve error details”When your operation returns a Result with rich error objects:
type ApiError = { code: number; message: string };
const callApi = async (): AsyncResult<Data, ApiError> => { // Returns err({ code: 429, message: 'Rate limited' })};
const data = await step.fromResult( 'callApi', () => callApi(), { onError: (apiError) => ({ type: 'API_ERROR' as const, ...apiError, }), });/*Output (on rate limit):{ ok: false, error: { type: 'API_ERROR', code: 429, message: 'Rate limited' } }*/step.retry - retry with backoff
Section titled “step.retry - retry with backoff”step.retry requires an ID as the first argument: step.retry(id, operation, options). Use optional key for per-iteration identity (e.g. in loops).
const user = await step.retry( 'fetchUser', () => deps.fetchUser('1'), { attempts: 3, backoff: 'exponential', // 'fixed' | 'linear' | 'exponential' delayMs: 100, // Base delay maxDelayMs: 5000, // Cap for exponential jitter: true, // Add randomness retryOn: (error) => error !== 'NOT_FOUND', // Don't retry NOT_FOUND });Backoff strategies visualized
Section titled “Backoff strategies visualized”Fixed (delayMs: 100):#1: 100ms ████#2: 100ms ████#3: 100ms ████
Linear (delayMs: 100):#1: 100ms ████#2: 200ms ████████#3: 300ms ████████████
Exponential (delayMs: 100):#1: 100ms ████#2: 200ms ████████#3: 400ms ████████████████#4: 800ms ████████████████████████████████step.withTimeout - add timeout
Section titled “step.withTimeout - add timeout”step.withTimeout requires an ID as the first argument: step.withTimeout(id, operation, options). Use optional key for per-iteration identity (e.g. in loops).
const data = await step.withTimeout( 'slowOp', () => slowOperation(), { ms: 5000 });/*Output (completes in time):data contains the result
Output (times out):{ ok: false, error: StepTimeoutError }*/If the timeout is reached, the workflow gets a StepTimeoutError.
Combining retry and timeout
Section titled “Combining retry and timeout”const data = await step.retry( 'fetchData', () => step.withTimeout('fetchData', () => deps.fetchData(), { ms: 2000 }), { attempts: 3, backoff: 'exponential' });Each attempt has a 2-second timeout. The whole operation retries up to 3 times.
Timeline:├── Attempt 1 ──────────────► timeout at 2s│ wait 100ms├── Attempt 2 ──────────────► timeout at 2s│ wait 200ms├── Attempt 3 ──────────────► timeout at 2s (or success!)step.sleep - pause execution
Section titled “step.sleep - pause execution”Pause execution for a specified duration. Use for intentional delays between operations (rate limiting, polling intervals, debouncing). Respects workflow cancellation.
step.sleep requires an ID as the first argument: step.sleep(id, duration, options?). Use optional key for per-iteration identity (e.g. in loops).
Basic usage
Section titled “Basic usage”// String durationawait step.sleep("rate-limit-delay", "5s");
// Duration objectimport { seconds, millis } from 'awaitly/workflow';await step.sleep("my-sleep", seconds(5));await step.sleep("short-delay", millis(100));With options
Section titled “With options”await step.sleep("poll-delay", "2s", { key: "poll-delay", // For caching/deduplication (optional) ttl: 5000, // Cache TTL in milliseconds description: "Wait before next poll attempt", signal: controller.signal // Optional AbortSignal for cancellation});Rate limiting example
Section titled “Rate limiting example”const result = await workflow.run(async ({ step, deps }) => { const data = await step('fetchData', () => deps.fetchData());
// Respect rate limits - wait 1 second between requests await step.sleep("rate-limit-delay", "1s");
const moreData = await step('fetchMoreData', () => deps.fetchMoreData()); return { data, moreData };});Polling example
Section titled “Polling example”const result = await workflow.run(async ({ step, deps }) => { let status = await step('checkStatus', () => deps.checkStatus());
// Poll every 2 seconds until complete while (status !== "complete") { await step.sleep("poll-interval", "2s"); status = await step('checkStatus', () => deps.checkStatus()); }
return status;});Cancellation
Section titled “Cancellation”step.sleep supports two cancellation mechanisms:
Workflow cancellation - Cancel the entire workflow:
const controller = new AbortController();const workflow = createWorkflow('workflow', { }, { signal: controller.signal });
const resultPromise = workflow.run(async ({ step, deps }) => { await step.sleep("long-sleep", "10s"); return "completed";});
// Cancel after 1 secondsetTimeout(() => controller.abort(), 1000);
const result = await resultPromise;// result.ok is false, error is WorkflowCancelledErrorUser-provided signal - Cancel a specific sleep without affecting the workflow:
const result = await run(async ({ step }) => { const sleepController = new AbortController();
// Cancel this specific sleep after 1 second setTimeout(() => sleepController.abort(), 1000);
try { await step.sleep("long-sleep", "10s", { signal: sleepController.signal }); return "completed normally"; } catch { // Sleep was cancelled by user signal return "sleep cancelled, continuing workflow"; }});step.forEach - process collections
Section titled “step.forEach - process collections”Process arrays with automatic indexing, resume support, and static analyzability:
await step.forEach('process-payments', payments, { stepIdPattern: 'payment-{i}', run: async (payment) => { const result = await step('processPayment', () => deps.processPayment(payment)); return result; }});Basic usage
Section titled “Basic usage”// Process items with automatic indexingawait step.forEach('send-emails', users, { stepIdPattern: 'email-{i}', run: async (user) => { await step('sendEmail', () => deps.sendEmail(user.email)); }});Collecting results
Section titled “Collecting results”// Collect all results into an arrayconst results = await step.forEach('fetch-users', userIds, { stepIdPattern: 'user-{i}', collect: 'array', // or 'last' for only the final result run: async (userId) => { return await step('fetchUser', () => deps.fetchUser(userId)); }});// results is User[]Why step.forEach over manual loops?
Section titled “Why step.forEach over manual loops?”When you use a manual for loop, use one literal step ID (label) and a per-iteration key (instance). Prefer step.forEach() when you need full static analyzability.
// Option A: manual loop — one literal ID + key per iterationfor (const payment of payments) { await step('processPayment', () => deps.processPayment(payment), { key: `payment-${payment.id}` });}
// Preferred when static analysis matters: step.forEach() is enumerableawait step.forEach('process-payments', payments, { stepIdPattern: 'payment-{i}', run: (payment) => step('processPayment', () => deps.processPayment(payment))});Manual for loops with dynamic keys like ${item.id}:
- Cannot be enumerated by static analysis tools
- Reduce path generation accuracy for awaitly-analyze
- Make test matrix generation incomplete
Effect-style ergonomics
Section titled “Effect-style ergonomics”Convenience helpers that run through the step engine (events, retry, timeout, and in createWorkflow: cache and onAfterStep). Familiar to Effect users.
step.run - unwrap AsyncResult
Section titled “step.run - unwrap AsyncResult”Unwrap an AsyncResult with a step ID. Use a getter when caching so the operation runs only on cache miss:
// run() - unwraps and exits on errorconst user = await step.run('fetchUser', deps.fetchUser('1'));
// With createWorkflow cache: pass a getter so the second call can hit cacheconst user = await step.run('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });step.andThen - chain AsyncResults
Section titled “step.andThen - chain AsyncResults”Chain a success value into another AsyncResult-returning operation:
const user = { id: '1', name: 'Alice' };const enriched = await step.andThen('enrich', user, (u) => enrichUser(u));step.match - pattern match on Result
Section titled “step.match - pattern match on Result”Handle both success and error with step tracking (emits step_start / step_success for the match step):
const message = await step.match('handleUser', userResult, { ok: async (user) => `Hello ${user.name}`, err: async () => 'nope',});step.all - parallel with named results
Section titled “step.all - parallel with named results”Alias for step.parallel. Runs multiple operations in parallel and returns an object keyed by name. Only caches when you pass an explicit key (omitted key = no cache, matching core).
const { user, posts } = await step.all('fetchAll', { user: () => deps.fetchUser('1'), posts: () => deps.fetchPosts('1'),});step.map - parallel over an array
Section titled “step.map - parallel over an array”Map over an array with parallel execution. Only caches when you pass an explicit key (omitted key = no cache, matching core).
const users = await step.map('fetchUsers', ['1', '2', '3'], (id) => deps.fetchUser(id));step.withFallback - primary with fallback
Section titled “step.withFallback - primary with fallback”Run a primary operation; if it fails, run a fallback. Same early-exit semantics as step()—the workflow’s error union includes both primary and fallback error types. Use a string ID as the first argument; optional on filter and key.
const user = await step.withFallback('getUser', () => deps.fetchUser(id), { fallback: () => deps.getUserFromCache(id), on: (err) => err === 'NOT_FOUND',});step.withResource - acquire / use / release
Section titled “step.withResource - acquire / use / release”Manage a resource with acquire, use, and release. The first argument is the step ID; the second is an options object with acquire, use, and release. Optional key for caching. Release runs on success and on early exit.
const data = await step.withResource('useDb', { acquire: () => connect(), use: (db) => query(db), release: (db) => db.close(),});step.workflow - child workflow as step
Section titled “step.workflow - child workflow as step”Run a child workflow as a step. First argument is the step ID; second is a getter that returns the child workflow run (e.g. () => childWorkflow.run(...)). Optional key. Events and cache go through the step engine; awaitly-analyze detects child workflow refs when the getter calls childWorkflow.run(...) and draws them as workflow-ref nodes.
const result = await step.workflow('child-call', () => childWorkflow.run(async ({ step }) => { return await step('child-step', () => Promise.resolve(ok(42))); }));Parallel steps
Section titled “Parallel steps”Run multiple steps concurrently:
import { allAsync } from 'awaitly';
const result = await workflow.run(async ({ step, deps }) => { const [user, posts, comments] = await step('fetchUserData', () => allAsync([ deps.fetchUser('1'), deps.fetchPosts('1'), deps.fetchComments('1'), ]) ); return { user, posts, comments };});/*Output (all succeed):{ ok: true, value: { user, posts, comments } }
Output (any fails):{ ok: false, error: <first error> }*/import { allSettled } from 'awaitly';
const result = await workflow.run(async ({ step, deps }) => { const results = await step('fetchAllSettled', () => Promise.all([ deps.fetchUser('1'), deps.fetchPosts('1'), deps.fetchComments('1'), ]).then(allSettled) ); // results contains all outcomes, even failures return results;});Named steps for visualization and static docs
Section titled “Named steps for visualization and static docs”The required string ID (first argument) is what appears in workflow diagrams and in statically generated documentation. awaitly-analyze extracts it and uses it in Mermaid diagrams, step tables, and doc generators. You can set description or markdown in options for richer docs. See Documenting workflows and Visualization for details.
const user = await step('fetchUser', () => deps.fetchUser('1'));const posts = await step('fetchPosts', () => deps.fetchPosts(user.id));/*Workflow diagram and generated docs show:┌─────────────┐ ┌─────────────┐│ fetchUser │ ──► │ fetchPosts │└─────────────┘ └─────────────┘*/Step variants at a glance
Section titled “Step variants at a glance”| Variant | Use when |
|---|---|
step('id', fn) | Operation returns a Result |
step.run(id, result, opts?) | Unwrap AsyncResult directly (use getter when caching) |
step.andThen(id, value, fn, opts?) | Chain AsyncResult from a success value |
step.match(id, result, { ok, err }, opts?) | Pattern match on Result with step tracking |
step.all(id, shape, opts?) | Parallel named operations (alias for parallel; cache only with key) |
step.map(id, items, mapper, opts?) | Parallel over array (cache only with key) |
step.try(id, fn, opts) | Operation throws exceptions |
step.fromResult(id, fn, opts) | Need to transform Result errors |
step.retry(id, fn, opts) | Operation might fail transiently |
step.withTimeout(id, fn, opts) | Operation might hang |
step.sleep(id, duration, opts) | Need intentional delay (rate limiting, polling) |
step.forEach(id, items, opts) | Processing collections with static analyzability |
step.withFallback(id, primary, opts?) | Primary fails, run fallback |
step.withResource(id, { acquire, use, release }, opts?) | Resource scope |
step.workflow(id, getter, opts?) | Child workflow as step |