Skip to content

Steps

step() executes an operation within a workflow. If the operation fails, the workflow exits early.

step() requires an explicit string ID as the first argument: step('id', fn, opts) or step('id', result, opts). The string literal is the step ID used in events, diagrams, and caching.

const workflow = createWorkflow('workflow', { fetchUser, fetchPosts });
const result = await workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'));
const posts = await step('fetchPosts', () => deps.fetchPosts(user.id));
return { user, posts };
});

You’ll often see step('id', () => fn()) (a thunk) instead of step('id', fn()) (an already-started Promise). This is the same pattern used by TanStack React Query’s queryFn: the orchestrator needs to control when work starts so it can cache, dedupe, retry, and resume correctly.

See Why thunks? for the mental model and the “why step('id', somePromise) may work” nuance.

Pass a function (thunk) as the second argument so the step can be cached. The first argument is the required string ID:

const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });

Pass options as the third argument to step('id', fn, opts):

const user = await step('fetchUser', () => deps.fetchUser('1'), {
key: 'user:1', // For caching/deduplication
retry: { attempts: 3 }, // Retry on failure
timeout: { ms: 5000 }, // Timeout after 5 seconds
description: 'Load user by ID', // For generated docs
});

Optional metadata (static analysis & observability)

Section titled “Optional metadata (static analysis & observability)”

You can attach optional metadata to steps for diagrams, diagnostics, and observability (e.g. OpenTelemetry, workflow summaries). Three groups:

  • Architecture & intent: intent, domain, owner, tags — for grouping and labeling (e.g. domain: 'payments').
  • Effects & dependencies: stateChanges, emits, calls — what the step mutates, what events it emits, which external services it calls.
  • Error classification: errorMeta — map from error tag to { retryable?, severity?, description? } for diagnostics and step_error events.

See Static Analysis and Documenting workflows for where this metadata is used.

await step('charge', () => deps.charge(orderId), {
domain: 'payments',
intent: 'charge customer',
owner: 'billing-team',
calls: ['stripe.charges.create'],
errorMeta: {
RATE_LIMITED: { retryable: true, severity: 'medium' },
CARD_DECLINED: { retryable: false, severity: 'high' },
},
});

The first argument is the step label (what kind of step: 'fetchUser', 'sendEmail'). It must be a string literal and is used in diagrams, events, and static docs.

The key option (optional) is the instance identity (which iteration or entity: user:${id}, payment-${payment.id}). When present, the runtime uses it for cache, snapshots, and events—so per-iteration identity comes from key, not from the first argument.

In loops, use one literal step ID (label) and a per-iteration key (instance) so events and snapshots still get distinct identities per iteration.

Convert exceptions into typed errors:

const data = await step.try(
'fetchData',
async () => {
const res = await fetch('/api/data');
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return res.json();
},
{ error: 'FETCH_FAILED' as const }
);
/*
Output (success):
data contains the parsed JSON
Output (failure):
Workflow returns { ok: false, error: 'FETCH_FAILED' }
*/

The error type is added to the workflow’s error union automatically.

When your operation returns a Result with rich error objects:

type ApiError = { code: number; message: string };
const callApi = async (): AsyncResult<Data, ApiError> => {
// Returns err({ code: 429, message: 'Rate limited' })
};
const data = await step.fromResult(
'callApi',
() => callApi(),
{
onError: (apiError) => ({
type: 'API_ERROR' as const,
...apiError,
}),
}
);
/*
Output (on rate limit):
{ ok: false, error: { type: 'API_ERROR', code: 429, message: 'Rate limited' } }
*/

step.retry requires an ID as the first argument: step.retry(id, operation, options). Use optional key for per-iteration identity (e.g. in loops).

const user = await step.retry(
'fetchUser',
() => deps.fetchUser('1'),
{
attempts: 3,
backoff: 'exponential', // 'fixed' | 'linear' | 'exponential'
delayMs: 100, // Base delay
maxDelayMs: 5000, // Cap for exponential
jitter: true, // Add randomness
retryOn: (error) => error !== 'NOT_FOUND', // Don't retry NOT_FOUND
}
);
Fixed (delayMs: 100):
#1: 100ms ████
#2: 100ms ████
#3: 100ms ████
Linear (delayMs: 100):
#1: 100ms ████
#2: 200ms ████████
#3: 300ms ████████████
Exponential (delayMs: 100):
#1: 100ms ████
#2: 200ms ████████
#3: 400ms ████████████████
#4: 800ms ████████████████████████████████

step.withTimeout requires an ID as the first argument: step.withTimeout(id, operation, options). Use optional key for per-iteration identity (e.g. in loops).

const data = await step.withTimeout(
'slowOp',
() => slowOperation(),
{ ms: 5000 }
);
/*
Output (completes in time):
data contains the result
Output (times out):
{ ok: false, error: StepTimeoutError }
*/

If the timeout is reached, the workflow gets a StepTimeoutError.

const data = await step.retry(
'fetchData',
() => step.withTimeout('fetchData', () => deps.fetchData(), { ms: 2000 }),
{ attempts: 3, backoff: 'exponential' }
);

Each attempt has a 2-second timeout. The whole operation retries up to 3 times.

Timeline:
├── Attempt 1 ──────────────► timeout at 2s
│ wait 100ms
├── Attempt 2 ──────────────► timeout at 2s
│ wait 200ms
├── Attempt 3 ──────────────► timeout at 2s (or success!)

Pause execution for a specified duration. Use for intentional delays between operations (rate limiting, polling intervals, debouncing). Respects workflow cancellation.

step.sleep requires an ID as the first argument: step.sleep(id, duration, options?). Use optional key for per-iteration identity (e.g. in loops).

// String duration
await step.sleep("rate-limit-delay", "5s");
// Duration object
import { seconds, millis } from 'awaitly/workflow';
await step.sleep("my-sleep", seconds(5));
await step.sleep("short-delay", millis(100));
await step.sleep("poll-delay", "2s", {
key: "poll-delay", // For caching/deduplication (optional)
ttl: 5000, // Cache TTL in milliseconds
description: "Wait before next poll attempt",
signal: controller.signal // Optional AbortSignal for cancellation
});
const result = await workflow.run(async ({ step, deps }) => {
const data = await step('fetchData', () => deps.fetchData());
// Respect rate limits - wait 1 second between requests
await step.sleep("rate-limit-delay", "1s");
const moreData = await step('fetchMoreData', () => deps.fetchMoreData());
return { data, moreData };
});
const result = await workflow.run(async ({ step, deps }) => {
let status = await step('checkStatus', () => deps.checkStatus());
// Poll every 2 seconds until complete
while (status !== "complete") {
await step.sleep("poll-interval", "2s");
status = await step('checkStatus', () => deps.checkStatus());
}
return status;
});

step.sleep supports two cancellation mechanisms:

Workflow cancellation - Cancel the entire workflow:

const controller = new AbortController();
const workflow = createWorkflow('workflow', { }, { signal: controller.signal });
const resultPromise = workflow.run(async ({ step, deps }) => {
await step.sleep("long-sleep", "10s");
return "completed";
});
// Cancel after 1 second
setTimeout(() => controller.abort(), 1000);
const result = await resultPromise;
// result.ok is false, error is WorkflowCancelledError

User-provided signal - Cancel a specific sleep without affecting the workflow:

const result = await run(async ({ step }) => {
const sleepController = new AbortController();
// Cancel this specific sleep after 1 second
setTimeout(() => sleepController.abort(), 1000);
try {
await step.sleep("long-sleep", "10s", {
signal: sleepController.signal
});
return "completed normally";
} catch {
// Sleep was cancelled by user signal
return "sleep cancelled, continuing workflow";
}
});

Process arrays with automatic indexing, resume support, and static analyzability:

await step.forEach('process-payments', payments, {
stepIdPattern: 'payment-{i}',
run: async (payment) => {
const result = await step('processPayment', () => deps.processPayment(payment));
return result;
}
});
// Process items with automatic indexing
await step.forEach('send-emails', users, {
stepIdPattern: 'email-{i}',
run: async (user) => {
await step('sendEmail', () => deps.sendEmail(user.email));
}
});
// Collect all results into an array
const results = await step.forEach('fetch-users', userIds, {
stepIdPattern: 'user-{i}',
collect: 'array', // or 'last' for only the final result
run: async (userId) => {
return await step('fetchUser', () => deps.fetchUser(userId));
}
});
// results is User[]

When you use a manual for loop, use one literal step ID (label) and a per-iteration key (instance). Prefer step.forEach() when you need full static analyzability.

// Option A: manual loop — one literal ID + key per iteration
for (const payment of payments) {
await step('processPayment', () => deps.processPayment(payment), { key: `payment-${payment.id}` });
}
// Preferred when static analysis matters: step.forEach() is enumerable
await step.forEach('process-payments', payments, {
stepIdPattern: 'payment-{i}',
run: (payment) => step('processPayment', () => deps.processPayment(payment))
});

Manual for loops with dynamic keys like ${item.id}:

  • Cannot be enumerated by static analysis tools
  • Reduce path generation accuracy for awaitly-analyze
  • Make test matrix generation incomplete

Convenience helpers that run through the step engine (events, retry, timeout, and in createWorkflow: cache and onAfterStep). Familiar to Effect users.

Unwrap an AsyncResult with a step ID. Use a getter when caching so the operation runs only on cache miss:

// run() - unwraps and exits on error
const user = await step.run('fetchUser', deps.fetchUser('1'));
// With createWorkflow cache: pass a getter so the second call can hit cache
const user = await step.run('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });

Chain a success value into another AsyncResult-returning operation:

const user = { id: '1', name: 'Alice' };
const enriched = await step.andThen('enrich', user, (u) => enrichUser(u));

Handle both success and error with step tracking (emits step_start / step_success for the match step):

const message = await step.match('handleUser', userResult, {
ok: async (user) => `Hello ${user.name}`,
err: async () => 'nope',
});

Alias for step.parallel. Runs multiple operations in parallel and returns an object keyed by name. Only caches when you pass an explicit key (omitted key = no cache, matching core).

const { user, posts } = await step.all('fetchAll', {
user: () => deps.fetchUser('1'),
posts: () => deps.fetchPosts('1'),
});

Map over an array with parallel execution. Only caches when you pass an explicit key (omitted key = no cache, matching core).

const users = await step.map('fetchUsers', ['1', '2', '3'], (id) => deps.fetchUser(id));

Run a primary operation; if it fails, run a fallback. Same early-exit semantics as step()—the workflow’s error union includes both primary and fallback error types. Use a string ID as the first argument; optional on filter and key.

const user = await step.withFallback('getUser', () => deps.fetchUser(id), {
fallback: () => deps.getUserFromCache(id),
on: (err) => err === 'NOT_FOUND',
});

step.withResource - acquire / use / release

Section titled “step.withResource - acquire / use / release”

Manage a resource with acquire, use, and release. The first argument is the step ID; the second is an options object with acquire, use, and release. Optional key for caching. Release runs on success and on early exit.

const data = await step.withResource('useDb', {
acquire: () => connect(),
use: (db) => query(db),
release: (db) => db.close(),
});

Run a child workflow as a step. First argument is the step ID; second is a getter that returns the child workflow run (e.g. () => childWorkflow.run(...)). Optional key. Events and cache go through the step engine; awaitly-analyze detects child workflow refs when the getter calls childWorkflow.run(...) and draws them as workflow-ref nodes.

const result = await step.workflow('child-call', () =>
childWorkflow.run(async ({ step }) => {
return await step('child-step', () => Promise.resolve(ok(42)));
})
);

Run multiple steps concurrently:

import { allAsync } from 'awaitly';
const result = await workflow.run(async ({ step, deps }) => {
const [user, posts, comments] = await step('fetchUserData', () =>
allAsync([
deps.fetchUser('1'),
deps.fetchPosts('1'),
deps.fetchComments('1'),
])
);
return { user, posts, comments };
});
/*
Output (all succeed):
{ ok: true, value: { user, posts, comments } }
Output (any fails):
{ ok: false, error: <first error> }
*/

Named steps for visualization and static docs

Section titled “Named steps for visualization and static docs”

The required string ID (first argument) is what appears in workflow diagrams and in statically generated documentation. awaitly-analyze extracts it and uses it in Mermaid diagrams, step tables, and doc generators. You can set description or markdown in options for richer docs. See Documenting workflows and Visualization for details.

const user = await step('fetchUser', () => deps.fetchUser('1'));
const posts = await step('fetchPosts', () => deps.fetchPosts(user.id));
/*
Workflow diagram and generated docs show:
┌─────────────┐ ┌─────────────┐
│ fetchUser │ ──► │ fetchPosts │
└─────────────┘ └─────────────┘
*/
VariantUse when
step('id', fn)Operation returns a Result
step.run(id, result, opts?)Unwrap AsyncResult directly (use getter when caching)
step.andThen(id, value, fn, opts?)Chain AsyncResult from a success value
step.match(id, result, { ok, err }, opts?)Pattern match on Result with step tracking
step.all(id, shape, opts?)Parallel named operations (alias for parallel; cache only with key)
step.map(id, items, mapper, opts?)Parallel over array (cache only with key)
step.try(id, fn, opts)Operation throws exceptions
step.fromResult(id, fn, opts)Need to transform Result errors
step.retry(id, fn, opts)Operation might fail transiently
step.withTimeout(id, fn, opts)Operation might hang
step.sleep(id, duration, opts)Need intentional delay (rate limiting, polling)
step.forEach(id, items, opts)Processing collections with static analyzability
step.withFallback(id, primary, opts?)Primary fails, run fallback
step.withResource(id, { acquire, use, release }, opts?)Resource scope
step.workflow(id, getter, opts?)Child workflow as step

Learn about Workflows →