Skip to content

Parallel Operations

Execute multiple operations in parallel while maintaining typed error handling.

Use step.all (Effect-style) or allAsync to run operations concurrently:

import { createWorkflow } from 'awaitly/workflow';
const workflow = createWorkflow('workflow', { fetchUser, fetchPosts, fetchComments });
// Preferred: step.all — named results, step tracking, cache when key provided
const result = await workflow.run(async ({ step }) => {
const { user, posts, comments } = await step.all('fetchAll', {
user: () => fetchUser('1'),
posts: () => fetchPosts('1'),
comments: () => fetchComments('1'),
});
return { user, posts, comments };
});

Or with allAsync and a single step:

import { allAsync } from 'awaitly';
const result = await workflow.run(async ({ step }) => {
const [user, posts, comments] = await step('fetchUserData', () =>
allAsync([fetchUser('1'), fetchPosts('1'), fetchComments('1')])
);
return { user, posts, comments };
});

For mapping over an array in parallel, use step.map:

const users = await step.map('fetchUsers', ['1', '2', '3'], (id) => fetchUser(id));

allAsync stops on the first error:

const result = await allAsync([
fetchUser('1'), // Takes 100ms, succeeds
fetchPosts('999'), // Takes 50ms, fails with 'NOT_FOUND'
fetchComments('1'), // Takes 200ms, never completes
]);
// result.ok === false
// result.error === 'NOT_FOUND'

Use allSettledAsync to run all operations and collect all errors (if any fail):

import { allSettledAsync } from 'awaitly';
const result = await allSettledAsync([
fetchUser('1'), // Succeeds
fetchPosts('999'), // Fails
fetchComments('1'), // Succeeds
]);
// Returns ok([values]) only if ALL succeed
// Returns err([SettledError]) if ANY fail, collecting all errors
if (result.ok) {
// All operations succeeded
const [user, posts, comments] = result.value;
console.log('All succeeded:', user, posts, comments);
} else {
// At least one failed - result.error is SettledError[]
for (const settled of result.error) {
console.log('Failed:', settled.error);
}
}

Note: Unlike Promise.allSettled(), this returns a Result - ok if all succeed, err if any fail. This is consistent with awaitly’s philosophy that all functions return Results. If you need partial success, use partition() after Promise.all() of Results.

import { partition } from 'awaitly';
const results = await Promise.all([
fetchUser('1'),
fetchUser('2'),
fetchUser('999'),
]);
const [successes, failures] = partition(results);
// successes: [User, User]
// failures: ['NOT_FOUND']

Give parallel groups a name for visualization using step.all (Effect-style, named results) or step.parallel:

// step.all — named results
const result = await workflow.run(async ({ step }) => {
const { user, posts } = await step.all('Fetch user data', {
user: () => fetchUser('1'),
posts: () => fetchPosts('1'),
});
return { user, posts };
});
// step.parallel + allAsync
const result = await workflow.run(async ({ step }) => {
const [user, posts] = await step.parallel('Fetch user data', () =>
allAsync([fetchUser('1'), fetchPosts('1')])
);
return { user, posts };
});

Use anyAsync to get the first successful result:

import { anyAsync } from 'awaitly';
// Try multiple API endpoints, use first to respond
const result = await anyAsync([
fetchFromPrimary(id),
fetchFromBackup(id),
fetchFromCache(id),
]);
if (result.ok) {
// Got data from whichever responded first
console.log(result.value);
}

For large sets, use processInBatches:

import { processInBatches } from 'awaitly/batch';
const result = await processInBatches(
userIds,
(id) => fetchUser(id),
{ batchSize: 20, concurrency: 5 }
);

See Batch Processing for details.

Some operations depend on others:

const result = await workflow.run(async ({ step }) => {
// Fetch user first
const user = await step('fetchUser', () => fetchUser('1'));
// Then fetch user's data in parallel
const [posts, friends, settings] = await step('fetchUserData', () =>
allAsync([
fetchPosts(user.id),
fetchFriends(user.id),
fetchSettings(user.id),
])
);
return { user, posts, friends, settings };
});

Errors from parallel operations are typed:

const fetchUser = async (id: string): AsyncResult<User, 'USER_NOT_FOUND'> => { ... };
const fetchPosts = async (id: string): AsyncResult<Post[], 'POSTS_FETCH_ERROR'> => { ... };
const result = await allAsync([fetchUser('1'), fetchPosts('1')]);
// result.error is: 'USER_NOT_FOUND' | 'POSTS_FETCH_ERROR'

Process an array with typed errors:

const userIds = ['1', '2', '3'];
const results = await allAsync(
userIds.map((id) => fetchUser(id))
);
if (results.ok) {
// results.value is User[]
console.log(results.value.map((u) => u.name));
}

Add a timeout to parallel operations:

const result = await workflow.run(async ({ step }) => {
const data = await step.withTimeout(
'fetchUserData',
() => allAsync([fetchUser('1'), fetchPosts('1')]),
{ ms: 5000 }
);
return data;
});
import {
allAsync,
partition,
ok,
err,
type AsyncResult,
} from 'awaitly';
import { createWorkflow } from 'awaitly/workflow';
type User = { id: string; name: string };
type Notification = { id: string; message: string };
const fetchUser = async (id: string): AsyncResult<User, 'USER_NOT_FOUND'> => {
const user = await db.users.find(id);
return user ? ok(user) : err('USER_NOT_FOUND');
};
const sendNotification = async (
userId: string,
message: string
): AsyncResult<Notification, 'SEND_FAILED'> => {
try {
const notification = await notificationService.send(userId, message);
return ok(notification);
} catch {
return err('SEND_FAILED');
}
};
const notifyUsers = createWorkflow('workflow', { fetchUser, sendNotification });
const result = await notifyUsers(async ({ step }) => {
const userIds = ['1', '2', '3', '4', '5'];
// Fetch all users in parallel
const usersResult = await step('fetchUsers', () =>
allAsync(userIds.map((id) => fetchUser(id)))
);
// Send notifications in parallel
const notifications = await step('sendNotifications', () =>
allAsync(
usersResult.map((user) =>
sendNotification(user.id, 'Hello!')
)
)
);
return {
notified: notifications.length,
};
});