Skip to content

flow()

flow() is the smallest workflow API in awaitly. It is designed to feel close to Effect.gen, but with plain async/await.

  • Pass a deps object once
  • Call deps directly inside the body
  • Each dep call becomes a tracked step automatically
  • Errors are inferred from dep return types
import { flow } from 'awaitly/flow';
import { ok, err, type AsyncResult } from 'awaitly';
type User = { id: string; name: string };
type Order = { id: string; userId: string };
const getUser = async (id: string): AsyncResult<User, 'USER_NOT_FOUND'> =>
id === 'missing' ? err('USER_NOT_FOUND') : ok({ id, name: 'Ada' });
const createOrder = async (user: User): AsyncResult<Order, 'ORDER_FAILED'> =>
ok({ id: 'order-1', userId: user.id });
const result = await flow({ getUser, createOrder }, async (d) => {
const user = await d.getUser('u1');
const order = await d.createOrder(user);
return order;
});
// result.error: 'USER_NOT_FOUND' | 'ORDER_FAILED' | UnexpectedError

flow() passes two arguments to your body:

  • d: wrapped deps (functions return unwrapped success values; Err short-circuits the flow)
  • c: flow context for per-call control — optional, only declare it if you need it
// Simple form: deps only
await flow({ getUser }, async (d) => d.getUser('a'));
// With context: custom step ids + parallel scopes
await flow({ getUser }, async (d, c) => {
const userA = await d.getUser('a');
const userB = await c.key('user:b', () => c.raw.getUser('b'));
return { userA, userB };
});

Non-function values on the deps object pass through untouched, so you can colocate constants or helper objects alongside dep functions:

const CONFIG = { maxRetries: 3 } as const;
await flow({ getUser, CONFIG }, async (d) => {
console.log(d.CONFIG.maxRetries); // 3 — non-functions are not wrapped
return d.getUser('u1');
});

Use c when implicit dep-name step IDs are not enough.

Run one operation with a custom step id.

const user = await c.key('user:primary', () => c.raw.getUser('u1'));

Use this when you call the same dep multiple times and want distinct step IDs.

Callback errors are constrained to the flow’s inferred error union (the union of all dep errors), so novel error variants must come from a dep.

Run multiple Result-returning operations in parallel under a single named scope and get named results back. The scope is tracked as one named group (scope_start / scope_end events under that name), while each operation keeps its object key in the returned shape.

const { profile, posts } = await c.all('loadUserBundle', {
profile: () => c.raw.getUser('u1'),
posts: () => c.raw.getPosts('u1'),
});

If any operation returns Err, the flow exits early with that error (fail-fast, mirroring step.all). The name (“loadUserBundle”) appears as a scope_start / scope_end event for visualization and tracing.

c.raw is the original deps object — functions still return Result / AsyncResult. Use c.raw.* inside c.key / c.all callbacks so the engine sees a single intended step boundary. Calling d.* inside a c.key callback would create a nested auto-step and is not the intended use.

Pass onEvent to observe step lifecycle. Each dep call emits step_start / step_success / step_error keyed by the dep name (or by the id you pass to c.key); c.all emits a scope_start / scope_end pair.

await flow(
{ getUser, getPosts },
async (d, c) => {
const user = await d.getUser('u1');
const { posts } = await c.all('fetchUserBundle', {
posts: () => c.raw.getPosts(user.id),
});
return { user, posts };
},
{
onEvent: (event) => {
if (event.type === 'step_start') console.log('', event.stepId);
},
},
);

onEvent is typed as WorkflowEvent<unknown> here to keep the surface small. For fully typed event payloads, use run() directly.

  • Domain errors come from your dep AsyncResult/Result types.
  • Thrown exceptions are converted to UnexpectedError by default.
  • You can map unexpected throws with catchUnexpected.
const result = await flow(
{ getUser },
async (d) => d.getUser('u1'),
{ catchUnexpected: () => 'UNEXPECTED' as const }
);
// result.error: 'USER_NOT_FOUND' | 'UNEXPECTED'

Use flow() when:

  • You want Effect.gen-like ergonomics with plain async/await
  • Your workflow can be expressed as dep calls + optional c.key / c.all
  • You want automatic error inference without full workflow setup

flow() deliberately stays small. Reach for run() or createWorkflow() when you need:

  • Per-call retry, timeout, or cacheflow() has no per-call options object; use step.retry / step.withTimeout / a cache key via step() inside run().
  • step.map / step.race / step.try — only c.key and c.all are surfaced here.
  • Fully typed event payloadsflow()’s onEvent is WorkflowEvent<unknown>.
  • Resume / state persistence — use createWorkflow(...).runWithState().

Both surfaces share the same engine — moving from flow() to run() is a local refactor, not a paradigm shift.