Skip to content

Control Flow

awaitly provides patterns for controlling how operations execute: sequentially, in parallel, or racing to first success.

By default, steps execute sequentially. Each step waits for the previous one to complete:

const result = await workflow.run(async ({ step, deps }) => {
const user = await step('fetchUser', () => deps.fetchUser('1')); // First
const posts = await step('fetchPosts', () => deps.fetchPosts(user.id)); // Second (after user)
const comments = await step('fetchComments', () => deps.fetchComments(posts[0].id)); // Third (after posts)
return { user, posts, comments };
});
Execution timeline:
fetchUser ─────┐
└──► fetchPosts ─────┐
└──► fetchComments ─────┐
└──► return

Use sequential execution when operations depend on each other’s results.

Use allAsync to run independent operations concurrently:

import { allAsync } from 'awaitly';
const result = await workflow.run(async ({ step, deps }) => {
const [user, notifications, settings] = await step('fetchUserData', () =>
allAsync([
deps.fetchUser('1'),
deps.fetchNotifications('1'),
deps.fetchSettings('1'),
])
);
return { user, notifications, settings };
});
Execution timeline:
┌─ fetchUser ────────────────────┐
│ │
├─ fetchNotifications ───────┐ │
│ │ │
└─ fetchSettings ────────────┴───┴──► all complete ──► return

allAsync stops on the first error:

const result = await allAsync([
fetchUser('1'), // Takes 100ms, succeeds
fetchPosts('999'), // Takes 50ms, fails with 'NOT_FOUND'
fetchComments('1'), // Takes 200ms, never completes (cancelled)
]);
// result.ok === false
// result.error === 'NOT_FOUND'

Use allSettledAsync to run all operations and collect results, even if some fail:

import { allSettledAsync } from 'awaitly';
const result = await allSettledAsync([
fetchUser('1'), // Succeeds
fetchPosts('999'), // Fails
fetchComments('1'), // Succeeds
]);
// Returns ok([values]) only if ALL succeed
// Returns err([SettledError]) if ANY fail
if (result.ok) {
const [user, posts, comments] = result.value;
} else {
for (const error of result.error) {
console.log('Failed:', error.error);
}
}

Separate successes from failures:

import { partition } from 'awaitly';
const results = await Promise.all([
fetchUser('1'),
fetchUser('2'),
fetchUser('999'),
]);
const [successes, failures] = partition(results);
// successes: [User, User]
// failures: ['NOT_FOUND']
console.log(`Found ${successes.length} users, ${failures.length} not found`);

Use anyAsync to get the first successful result:

import { anyAsync } from 'awaitly';
const result = await anyAsync([
fetchFromPrimary(id), // Main API
fetchFromBackup(id), // Backup API
fetchFromCache(id), // Local cache
]);
if (result.ok) {
// Got data from whichever responded first successfully
console.log(result.value);
}
Execution timeline:
┌─ fetchFromPrimary ──────────────────┐
│ │
├─ fetchFromBackup ────────┐ │
│ │ │
└─ fetchFromCache ─┐ │ │
│ │ │
└───────┴──────────┴──► first success wins

Some operations depend on others. Combine sequential and parallel:

const result = await workflow.run(async ({ step, deps }) => {
// First: fetch user
const user = await step('fetchUser', () => deps.fetchUser('1'));
// Then: fetch user's data in parallel
const [posts, friends, settings] = await step('fetchPostsData', () =>
allAsync([
deps.fetchPosts(user.id),
deps.fetchFriends(user.id),
deps.fetchSettings(user.id),
])
);
return { user, posts, friends, settings };
});
Execution timeline:
fetchUser ─────┐
└──► ┌─ fetchPosts ────────────┐
│ │
├─ fetchFriends ──────┐ │
│ │ │
└─ fetchSettings ─────┴───┴──► return

Give parallel groups a name for visualization using step.all(name, ...) (Effect-style) or step.parallel(name, ...):

// Effect-style: named results
const result = await workflow.run(async ({ step, deps }) => {
const { user, posts } = await step.all('Fetch user data', {
user: () => deps.fetchUser('1'),
posts: () => deps.fetchPosts('1'),
});
return { user, posts };
});
// Or step.parallel with allAsync
const result = await workflow.run(async ({ step, deps }) => {
const [user, posts] = await step.parallel('Fetch user data', () =>
allAsync([deps.fetchUser('1'), deps.fetchPosts('1')])
);
return { user, posts };
});
// Visualization shows "Fetch user data" as a single step

Add a timeout to parallel operations:

const result = await workflow.run(async ({ step, deps }) => {
const data = await step.withTimeout(
'fetchUserData',
() => allAsync([deps.fetchUser('1'), deps.fetchPosts('1')]),
{ ms: 5000 }
);
return data;
});

Process arrays with typed errors:

const userIds = ['1', '2', '3'];
const results = await allAsync(
userIds.map((id) => fetchUser(id))
);
if (results.ok) {
// results.value is User[]
console.log(results.value.map((u) => u.name));
}

For sequential processing where you need static analysis support (workflow diagrams, path enumeration), use step.forEach():

// ✅ Statically analyzable - awaitly-analyze can enumerate paths
await step.forEach('process-payments', payments, {
stepIdPattern: 'payment-{i}',
run: async (payment) => {
await step('processPayment', () => deps.processPayment(payment));
}
});
// ❌ Dynamic keys - defeats static analysis
for (const payment of payments) {
await step('processPayment', () => deps.processPayment(payment), { key: `payment-${payment.id}` });
}

See step.forEach for full documentation.

Errors from parallel operations preserve their types:

const fetchUser = async (id: string): AsyncResult<User, 'USER_NOT_FOUND'> => { /* ... */ };
const fetchPosts = async (id: string): AsyncResult<Post[], 'POSTS_ERROR'> => { /* ... */ };
const result = await allAsync([fetchUser('1'), fetchPosts('1')]);
// result.error is: 'USER_NOT_FOUND' | 'POSTS_ERROR'

For large sets, limit concurrent operations:

import { processInBatches } from 'awaitly/batch';
const result = await processInBatches(
userIds,
(id) => fetchUser(id),
{
batchSize: 20, // Process 20 at a time
concurrency: 5, // But only 5 concurrent
}
);

See Batch Processing for details.

PatternFunctionBehavior
SequentialDefault step() chainingWait for each to complete
Parallel (fail-fast)allAsync([...])Stop on first error
Parallel (collect all)allSettledAsync([...])Collect all outcomes
Race to firstanyAsync([...])Return first success
Partitionpartition([...])Separate successes/failures
import { allAsync, partition, ok, err, type AsyncResult } from 'awaitly';
import { createWorkflow } from 'awaitly/workflow';
type User = { id: string; name: string };
type Notification = { id: string; message: string };
const fetchUser = async (id: string): AsyncResult<User, 'USER_NOT_FOUND'> => {
const user = await db.users.find(id);
return user ? ok(user) : err('USER_NOT_FOUND');
};
const sendNotification = async (
userId: string,
message: string
): AsyncResult<Notification, 'SEND_FAILED'> => {
try {
const notification = await notificationService.send(userId, message);
return ok(notification);
} catch {
return err('SEND_FAILED');
}
};
const notifyUsers = createWorkflow('workflow', { fetchUser, sendNotification });
const result = await notifyUsers(async ({ step, deps }) => {
const userIds = ['1', '2', '3', '4', '5'];
// Fetch all users in parallel
const usersResult = await step('fetchUsers', () =>
allAsync(userIds.map((id) => deps.fetchUser(id)))
);
// Send notifications in parallel
const notifications = await step('fetchNotifications', () =>
allAsync(
usersResult.map((user) =>
deps.sendNotification(user.id, 'Hello!')
)
)
);
return { notified: notifications.length };
});
if (result.ok) {
console.log(`Notified ${result.value.notified} users`);
}

Learn about Errors and Retries →