Skip to content

Workflows

You create workflows with createWorkflow (reusable, with dependency inference) or use run() for one-off operations. Execution is always via workflow.run(fn) — the workflow object is not callable.

createWorkflow returns a function that produces:
┌─── Your success return type
│ ┌─── Union of all dependency error types + UnexpectedError
▼ ▼
Result<Value, 'NOT_FOUND' | 'EMAIL_FAILED' | UnexpectedError>
└─── Automatically inferred from dependencies!

Creates a reusable workflow with automatic error type inference from dependencies:

import { ok, err, type AsyncResult } from 'awaitly';
import { createWorkflow } from 'awaitly/workflow';
const fetchUser = async (id: string): AsyncResult<User, 'NOT_FOUND'> => { /* ... */ };
const sendEmail = async (to: string): AsyncResult<void, 'EMAIL_FAILED'> => { /* ... */ };
// Error types inferred from dependencies
const workflow = createWorkflow('workflow', { fetchUser, sendEmail });
const result = await workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'));
await step('sendEmail', () => deps.sendEmail(user.email));
return user;
});
/*
result.error is automatically typed as:
'NOT_FOUND' | 'EMAIL_FAILED' | UnexpectedError
*/
const workflow = createWorkflow('workflow', { fetchUser, sendEmail },
{
cache: new Map(), // Enable step caching
onEvent: (event) => console.log(event), // Event stream
streamStore: createMemoryStreamStore(), // Enable streaming
}
);

The value returned by createWorkflow is a workflow object with a single .run() method. Use workflow.run(fn) or workflow.run(name, fn) to execute; pass per-run options as workflow.run(fn, config).

For one-off workflows where you specify error types manually:

import { run } from 'awaitly/run';
import { type Errors } from 'awaitly';
type RunErrors = Errors<[typeof fetchUser, typeof sendEmail]>;
const result = await run<User, RunErrors>(
async ({ step }) => {
const user = await step('fetchUser', () => fetchUser('1'));
await step('sendEmail', () => sendEmail(user.email));
return user;
}
);
// result.error is: 'NOT_FOUND' | 'EMAIL_FAILED' | UnexpectedError
ScenarioUse
Dependencies known at compile timecreateWorkflow
Dependencies passed as parametersrun
Need step caching or resumecreateWorkflow
Need automatic error inferencecreateWorkflow
One-off workflowrun
Testing with mocksrun

Enable step caching to avoid re-executing completed steps:

const workflow = createWorkflow('workflow', deps, {
cache: new Map(),
});
const result = await workflow.run(async ({ step, deps }) => {
// This step runs once, even if workflow is called multiple times
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });
return user;
});
/*
First call: fetchUser executes
Second call: returns cached result immediately
*/

Resume a workflow from saved state:

const workflow = createWorkflow('workflow', deps, {
resumeState: savedState, // From a previous run
});
const result = await workflow.run(async ({ step, deps }) => {
// Cached steps return their saved values
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'user:1' });
return user;
});

See Persistence for details.

Enable streaming within workflows:

import { createMemoryStreamStore } from 'awaitly/streaming';
const workflow = createWorkflow('workflow', deps, {
streamStore: createMemoryStreamStore(),
});
const result = await workflow.run(async ({ step, deps }) => {
const writer = step.getWritable<string>({ namespace: 'tokens' });
await writer.write('Hello');
await writer.write(' World');
await writer.close();
});

See Streaming for details.

Subscribe to workflow events for logging, visualization, or debugging:

const workflow = createWorkflow('workflow', deps, {
onEvent: (event) => {
switch (event.type) {
case 'step_start':
console.log('Starting:', event.name);
break;
case 'step_complete':
console.log('Completed:', event.name, event.durationMs, 'ms');
break;
case 'step_error':
console.error('Failed:', event.name, event.error);
break;
}
},
});
/*
Output:
Starting: Fetch user
Completed: Fetch user 45 ms
Starting: Send email
Failed: Send email EMAIL_FAILED
*/

By default, thrown exceptions become an UnexpectedError. To replace it with your own type, pass catchUnexpected:

// Default - error union includes UnexpectedError
const workflow = createWorkflow('workflow', deps);
const result = await workflow.run(async ({ step, deps }) => { /* ... */ });
// result.error: 'NOT_FOUND' | 'ORDER_FAILED' | UnexpectedError

With catchUnexpected:

  • Your custom type replaces UnexpectedError in the error union.
  • Use for exhaustive switch statements or when you want a single typed shape for all unexpected cases.

Cancel workflows externally using an AbortSignal:

import { isWorkflowCancelled } from 'awaitly/workflow';
const controller = new AbortController();
const workflow = createWorkflow('workflow', deps, {
signal: controller.signal,
});
// Start workflow
const resultPromise = workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'), { key: 'fetch-user' });
await step('sendEmail', () => deps.sendEmail(user.email), { key: 'send-email' });
return user;
});
// Cancel from outside (e.g., timeout, user action)
setTimeout(() => controller.abort('timeout'), 5000);
const result = await resultPromise;
if (!result.ok && isWorkflowCancelled(result.cause)) {
console.log('Cancelled:', result.cause.reason);
}
/*
Output (if cancelled):
Cancelled: timeout
*/

Passing signal to operations:

Steps using step.withTimeout can receive the workflow signal:

const result = await workflow.run(async ({ step, deps }) => {
// Signal fires on EITHER timeout OR workflow cancellation
const data = await step.withTimeout(
'fetchUrl',
(signal) => fetch(url, { signal }),
{ ms: 3000, signal: true }
);
return data;
});

Or access the signal directly via context:

const result = await workflow.run(async ({ step, deps, ctx }) => {
// Manual signal access
const response = await fetch(url, { signal: ctx.signal });
// Check if cancelled
if (ctx.signal?.aborted) {
return err('CANCELLED' as const);
}
return ok(response);
});

While createWorkflow accepts options at creation time, you may need different options for specific runs. Pass a config as the second argument to workflow.run(fn, config), or use a named run with workflow.run("run-id", fn, config) for logging and tracing:

const workflow = createWorkflow('workflow', { fetchUser }, {
onEvent: defaultLogger, // Creation-time default
});
// Normal run - uses creation-time options
await workflow.run(async ({ step, deps }) => { /* ... */ });
// Per-run options override creation-time
await workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1'));
return user;
}, {
onEvent: vizualizer.handleEvent, // Override for this run only
signal: controller.signal, // Add signal for this run
});
// Named run (workflowId for logging, tracing, resume)
await workflow.run('onboard-user-1', async ({ step, deps }) => {
return await step('fetchUser', () => deps.fetchUser('1'));
});

Execution-time options override creation-time options:

const workflow = createWorkflow('workflow', deps, {
onEvent: handlerA, // Creation-time
signal: signalA,
});
// handlerB overrides handlerA for this run
// signalA is NOT used (signalB takes over)
await workflow.run(fn, {
onEvent: handlerB, // Wins
signal: signalB, // Wins
});
// Back to creation-time defaults
await workflow.run(fn); // Uses handlerA, signalA
OptionDescription
onEventEvent handler for this run
onErrorError handler for this run
signalAbortSignal for this run
createContextContext factory for this run
resumeStateResume state for this run (can be async factory)
depsPartial override of creation-time deps
snapshotRestore from a previously saved snapshot
shouldRunConcurrency control hook
onBeforeStartPre-start hook
onAfterStepPost-step hook for checkpointing
streamStoreStream store for this run

Workflows are just functions. Compose them naturally:

const validateInput = createWorkflow('workflow', { validateEmail, validatePassword });
const processPayment = createWorkflow('workflow', { chargeCard, saveReceipt });
const checkout = createWorkflow('workflow', { validateEmail,
validatePassword,
chargeCard,
saveReceipt,
});
const result = await checkout.run(async ({ step, deps }) => {
// Use steps from all dependencies
await step('validateEmail', () => deps.validateEmail(input.email));
await step('validatePassword', () => deps.validatePassword(input.password));
await step('chargeCard', () => deps.chargeCard(input.amount));
await step('saveReceipt', () => deps.saveReceipt(input.orderId));
});

Pass input to a workflow by capturing it in a closure and calling workflow.run(fn):

const workflow = createWorkflow('workflow', deps);
const userId = '123';
const requestId = 'abc';
const result = await workflow.run(async ({ step, deps }) => {
console.log('User ID:', userId);
const user = await step('fetchUser', () => deps.fetchUser(userId));
return user;
});

Learn about Tagged Errors →