Skip to content

Workflows and Steps

Workflows compose multiple Result-returning operations. The step() function unwraps Results automatically. If any step fails, the workflow exits early with that error.

step() is the core of workflow composition. It unwraps a Result, returning the value on success or exiting the workflow on error:

import { run } from 'awaitly/run';
const result = await run(async ({ step }) => {
// If fetchUser returns err('NOT_FOUND'), workflow exits here
const user = await step('fetchUser', () => fetchUser('123'));
// This only runs if fetchUser succeeded
const posts = await step('fetchPosts', () => fetchPosts(user.id));
return { user, posts };
});
if (result.ok) {
console.log(result.value.user.name);
} else {
// result.error is 'NOT_FOUND' | 'FETCH_ERROR'
console.log('Failed:', result.error);
}
Execution flow:
fetchUser('123')
├── ok(user) ──► fetchPosts(user.id)
│ │
│ ├── ok(posts) ──► return { user, posts }
│ │
│ └── err ──► exit workflow
└── err ──► exit workflow

Use run() for simple, one-off workflows. Specify error types as type parameters to preserve them in the result:

import { ok, err, type AsyncResult } from 'awaitly';
import { run } from 'awaitly/run';
const getUser = async (id: string): AsyncResult<User, 'NOT_FOUND'> => {
const user = await db.find(id);
return user ? ok(user) : err('NOT_FOUND');
};
const getOrders = async (userId: string): AsyncResult<Order[], 'FETCH_ERROR'> => {
return ok(await db.orders.findByUser(userId));
};
const result = await run<{ user: User; orders: Order[] }, 'NOT_FOUND' | 'FETCH_ERROR'>(
async ({ step }) => {
const user = await step('getUser', () => getUser('123'));
const orders = await step('getOrders', () => getOrders(user.id));
return { user, orders };
}
);
// result.error is: 'NOT_FOUND' | 'FETCH_ERROR' | UnexpectedError

For reusable workflows with automatic error type inference, use createWorkflow():

import { ok, err, type AsyncResult } from 'awaitly';
import { createWorkflow } from 'awaitly/workflow';
const fetchUser = async (id: string): AsyncResult<User, 'NOT_FOUND'> => { /* ... */ };
const sendEmail = async (to: string): AsyncResult<void, 'EMAIL_FAILED'> => { /* ... */ };
// Error types inferred from dependencies
const workflow = createWorkflow('workflow', { fetchUser, sendEmail });
const result = await workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'));
await step('sendEmail', () => deps.sendEmail(user.email));
return user;
});
// result.error is automatically typed as:
// 'NOT_FOUND' | 'EMAIL_FAILED' | UnexpectedError
ScenarioUse
One-off workflowrun()
Reusable workflowcreateWorkflow()
Need automatic error inferencecreateWorkflow()
Need caching or resumecreateWorkflow()
Testing with mocksrun()
Dependencies known at compile timecreateWorkflow()
Dependencies passed as parametersrun()

Use the first argument (step id) for visualization; use options for caching, retry, and timeout:

const user = await step('Fetch user', () => deps.fetchUser('1'), {
key: 'user:1', // For caching/deduplication
retry: { attempts: 3 }, // Retry on failure
timeout: { ms: 5000 }, // Timeout after 5 seconds
});

Pass a function (thunk) instead of calling directly to enable caching:

// With thunk - can be cached (ID required as first argument)
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });

See Why thunks? for the mental model, plus why step('id', somePromise) can be misleading.

Convert exceptions into typed errors:

const data = await step.try(
'fetchData',
async () => {
const res = await fetch('/api/data');
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return res.json();
},
{ error: 'FETCH_FAILED' as const }
);

The error type is added to the workflow’s error union automatically.

step.fromResult() - Preserve Error Details

Section titled “step.fromResult() - Preserve Error Details”

When your operation returns a Result with rich error objects:

type ApiError = { code: number; message: string };
const callApi = async (): AsyncResult<Data, ApiError> => {
// Returns err({ code: 429, message: 'Rate limited' })
};
const data = await step.fromResult(
'callApi',
() => callApi(),
{
onError: (apiError) => ({
type: 'API_ERROR' as const,
...apiError,
}),
}
);
// On rate limit: { ok: false, error: { type: 'API_ERROR', code: 429, message: 'Rate limited' } }
const user = await step.retry(
'fetchUser',
() => deps.fetchUser('1'),
{
attempts: 3,
backoff: 'exponential', // 'fixed' | 'linear' | 'exponential'
delayMs: 100, // Base delay
maxDelayMs: 5000, // Cap for exponential
jitter: true, // Add randomness
retryOn: (error) => error !== 'NOT_FOUND', // Don't retry NOT_FOUND
}
);
Fixed (delayMs: 100):
#1: 100ms ████
#2: 100ms ████
#3: 100ms ████
Linear (delayMs: 100):
#1: 100ms ████
#2: 200ms ████████
#3: 300ms ████████████
Exponential (delayMs: 100):
#1: 100ms ████
#2: 200ms ████████
#3: 400ms ████████████████
#4: 800ms ████████████████████████████████
const data = await step.withTimeout(
'slowOp',
() => slowOperation(),
{ ms: 5000 }
);
/*
Output (completes in time):
data contains the result
Output (times out):
{ ok: false, error: StepTimeoutError }
*/

If the timeout is reached, the workflow gets a StepTimeoutError.

const data = await step.retry(
'fetchData',
() => step.withTimeout('fetchData', () => fetchData(), { ms: 2000 }),
{ attempts: 3, backoff: 'exponential' }
);

Each attempt has a 2-second timeout. The whole operation retries up to 3 times.

Timeline:
├── Attempt 1 ──────────────► timeout at 2s
│ wait 100ms
├── Attempt 2 ──────────────► timeout at 2s
│ wait 200ms
├── Attempt 3 ──────────────► timeout at 2s (or success!)

Pause execution for a specified duration. Use for intentional delays between operations (rate limiting, polling intervals, debouncing). Respects workflow cancellation.

// String duration
await step.sleep("rate-limit-delay", "5s");
// Duration object
import { seconds, millis } from 'awaitly/workflow';
await step.sleep("my-sleep", seconds(5));
await step.sleep("short-delay", millis(100));
await step.sleep("poll-delay", "2s", {
key: "poll-delay", // For caching/deduplication (optional)
ttl: 5000, // Cache TTL in milliseconds
description: "Wait before next poll attempt",
signal: controller.signal // Optional AbortSignal for cancellation
});
const result = await workflow.run(async ({ step, deps }) => {
const data = await step('fetchData', () => deps.fetchData());
// Respect rate limits - wait 1 second between requests
await step.sleep("rate-limit-delay", "1s");
const moreData = await step('fetchMoreData', () => deps.fetchMoreData());
return { data, moreData };
});
const result = await workflow.run(async ({ step, deps }) => {
let status = await step('checkStatus', () => deps.checkStatus());
// Poll every 2 seconds until complete
while (status !== "complete") {
await step.sleep("poll-interval", "2s");
status = await step('checkStatus', () => deps.checkStatus());
}
return status;
});

step.sleep supports two cancellation mechanisms:

Workflow cancellation - Cancel the entire workflow:

const controller = new AbortController();
const workflow = createWorkflow('workflow', { }, { signal: controller.signal });
const resultPromise = workflow.run(async ({ step, deps }) => {
await step.sleep("long-sleep", "10s");
return "completed";
});
// Cancel after 1 second
setTimeout(() => controller.abort(), 1000);
const result = await resultPromise;
// result.ok is false, error is WorkflowCancelledError

User-provided signal - Cancel a specific sleep without affecting the workflow:

const result = await run(async ({ step }) => {
const sleepController = new AbortController();
// Cancel this specific sleep after 1 second
setTimeout(() => sleepController.abort(), 1000);
try {
await step.sleep("long-sleep", "10s", {
signal: sleepController.signal
});
return "completed normally";
} catch {
// Sleep was cancelled by user signal
return "sleep cancelled, continuing workflow";
}
});

Run multiple steps concurrently:

import { allAsync } from 'awaitly';
const result = await workflow.run(async ({ step, deps }) => {
const [user, posts, comments] = await step('fetchUserData', () =>
allAsync([
deps.fetchUser('1'),
deps.fetchPosts('1'),
deps.fetchComments('1'),
])
);
return { user, posts, comments };
});
/*
Output (all succeed):
{ ok: true, value: { user, posts, comments } }
Output (any fails):
{ ok: false, error: <first error> }
*/

Use descriptive step ids (first argument) to see them in workflow diagrams:

const user = await step('Fetch user', () => fetchUser('1'));
const posts = await step('Fetch posts', () => fetchPosts(user.id));
/*
Workflow diagram shows:
┌─────────────┐ ┌─────────────┐
│ Fetch user │ ──► │ Fetch posts │
└─────────────┘ └─────────────┘
*/

See Visualization for details.

Enable step caching to avoid re-executing completed steps:

const workflow = createWorkflow('workflow', deps, {
cache: new Map(),
});
const result = await workflow.run(async ({ step, deps }) => {
// This step runs once, even if workflow is called multiple times
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });
return user;
});

Resume a workflow from saved state:

const workflow = createWorkflow('workflow', deps, {
resumeState: savedState, // From a previous run
});
const result = await workflow.run(async ({ step, deps }) => {
// Cached steps return their saved values
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });
return user;
});

See Persistence for details.

Enable streaming within workflows:

import { createMemoryStreamStore } from 'awaitly/streaming';
const workflow = createWorkflow('workflow', deps, {
streamStore: createMemoryStreamStore(),
});
const result = await workflow.run(async ({ step, deps }) => {
const writer = step.getWritable<string>({ namespace: 'tokens' });
await writer.write('Hello');
await writer.write(' World');
await writer.close();
});

See Streaming for details.

Subscribe to workflow events for logging or visualization:

const workflow = createWorkflow('workflow', deps, {
onEvent: (event) => {
switch (event.type) {
case 'step_start':
console.log('Starting:', event.name);
break;
case 'step_complete':
console.log('Completed:', event.name, event.durationMs, 'ms');
break;
case 'step_error':
console.error('Failed:', event.name, event.error);
break;
}
},
});
/*
Output:
Starting: Fetch user
Completed: Fetch user 45 ms
Starting: Send email
Failed: Send email EMAIL_FAILED
*/

By default, thrown exceptions become an UnexpectedError. To replace it with your own type, pass catchUnexpected:

const workflow = createWorkflow('workflow', deps, {
catchUnexpected: (thrown) => ({
type: 'UNEXPECTED' as const,
message: String(thrown),
}),
});
// result.error is E | { type: 'UNEXPECTED', message: string }

This also works with run():

const result = await run<User, 'NOT_FOUND'>(
async ({ step }) => {
return await step('fetchUser', () => fetchUser('1'));
},
{ catchUnexpected: (thrown) => ({ type: 'UNEXPECTED' as const, message: String(thrown) }) }
);
// result.error is 'NOT_FOUND' | { type: 'UNEXPECTED', message: string }
// Without catchUnexpected: result.error is 'NOT_FOUND' | UnexpectedError

Cancel workflows externally using an AbortSignal:

import { isWorkflowCancelled } from 'awaitly/workflow';
const controller = new AbortController();
const workflow = createWorkflow('workflow', deps, {
signal: controller.signal,
});
// Start workflow
const resultPromise = workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'fetch-user' });
await step('sendEmail', () => deps.sendEmail(user.email), { key: 'send-email' });
return user;
});
// Cancel from outside (e.g., timeout, user action)
setTimeout(() => controller.abort('timeout'), 5000);
const result = await resultPromise;
if (!result.ok && isWorkflowCancelled(result.cause)) {
console.log('Cancelled:', result.cause.reason);
}
/*
Output (if cancelled):
Cancelled: timeout
*/

Passing signal to operations:

Steps using step.withTimeout can receive the workflow signal:

const result = await workflow.run(async ({ step, deps }) => {
// Signal fires on EITHER timeout OR workflow cancellation
const data = await step.withTimeout(
'fetchUrl',
(signal) => fetch(url, { signal }),
{ ms: 3000, signal: true }
);
return data;
});

Or access the signal directly via context:

const result = await workflow.run(async ({ step, deps, ctx }) => {
// Manual signal access
const response = await fetch(url, { signal: ctx.signal });
// Check if cancelled
if (ctx.signal?.aborted) {
return err('CANCELLED' as const);
}
return ok(response);
});

Override options for specific runs with workflow.run(fn, config), or use a named run with workflow.run("run-id", fn, config):

const workflow = createWorkflow('workflow', { fetchUser }, {
onEvent: defaultLogger, // Creation-time default
});
// Normal run - uses creation-time options
await workflow.run(async ({ step, deps }) => { /* ... */ });
// Per-run options override creation-time
await workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'));
return user;
}, {
onEvent: vizualizer.handleEvent, // Override for this run only
signal: controller.signal, // Add signal for this run
});

Execution-time options override creation-time options:

const workflow = createWorkflow('workflow', deps, {
onEvent: handlerA, // Creation-time
signal: signalA,
});
// handlerB overrides handlerA for this run
// signalA is NOT used (signalB takes over)
await workflow.run(fn, {
onEvent: handlerB, // Wins
signal: signalB, // Wins
});
// Back to creation-time defaults
await workflow.run(fn); // Uses handlerA, signalA
OptionDescription
onEventEvent handler for this run
onErrorError handler for this run
signalAbortSignal for this run
createContextContext factory for this run
resumeStateResume state for this run (can be async factory)
depsPartial override of creation-time deps
snapshotRestore from a previously saved snapshot
shouldRunConcurrency control hook
onBeforeStartPre-start hook
onAfterStepPost-step hook for checkpointing
streamStoreStream store for this run

Pass input by capturing it in a closure and calling workflow.run(fn):

const workflow = createWorkflow('workflow', deps);
const userId = '123';
const requestId = 'abc';
const result = await workflow.run(async ({ step, deps }) => {
console.log('User ID:', userId);
const user = await step('fetchUser', () => deps.fetchUser(userId));
return user;
});

Workflows are just functions. Compose them naturally:

const validateInput = createWorkflow('workflow', { validateEmail, validatePassword });
const processPayment = createWorkflow('workflow', { chargeCard, saveReceipt });
const checkout = createWorkflow('workflow', { validateEmail,
validatePassword,
chargeCard,
saveReceipt,
});
const result = await checkout.run(async ({ step, deps }) => {
// Use steps from all dependencies
await step('validateEmail', () => deps.validateEmail(input.email));
await step('validatePassword', () => deps.validatePassword(input.password));
await step('chargeCard', () => deps.chargeCard(input.amount));
await step('saveReceipt', () => deps.saveReceipt(input.orderId));
});
VariantUse when
step('id', fn)Operation returns a Result
step.try(id, fn, opts)Operation throws exceptions
step.fromResult(id, fn, opts)Need to transform Result errors
step.retry(id, fn, opts)Operation might fail transiently
step.withTimeout(id, fn, opts)Operation might hang
step.sleep(id, duration, opts)Need intentional delay (rate limiting, polling)
import { ok, err, type AsyncResult } from 'awaitly';
import { createWorkflow } from 'awaitly/workflow';
type User = { id: string; name: string; email: string };
type Post = { id: number; title: string };
const fetchUser = async (id: string): AsyncResult<User, 'NOT_FOUND'> =>
id === '1' ? ok({ id: '1', name: 'Alice', email: 'alice@example.com' }) : err('NOT_FOUND');
const fetchPosts = async (userId: string): AsyncResult<Post[], 'FETCH_ERROR'> =>
ok([{ id: 1, title: 'Hello World' }]);
const sendWelcome = async (email: string): AsyncResult<void, 'EMAIL_FAILED'> =>
ok(undefined);
const onboardUser = createWorkflow('workflow', { fetchUser, fetchPosts, sendWelcome });
const result = await onboardUser(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'));
const posts = await step('fetchPosts', () => deps.fetchPosts(user.id));
await step('sendWelcome', () => deps.sendWelcome(user.email));
return { user, postCount: posts.length };
});
if (result.ok) {
console.log(`${result.value.user.name} has ${result.value.postCount} posts`);
} else {
// TypeScript knows all possible errors
switch (result.error) {
case 'NOT_FOUND':
console.log('User not found');
break;
case 'FETCH_ERROR':
console.log('Failed to fetch posts');
break;
case 'EMAIL_FAILED':
console.log('Failed to send welcome email');
break;
}
}

Learn about Control Flow patterns →