Skip to content

AI Integration Patterns

This guide covers patterns for integrating awaitly with AI SDKs like OpenAI, Anthropic, or Vercel AI SDK. The key challenges are handling streaming responses, multi-phase tool calling, and wrapping tool execution in steps.

AI streams are typically single-use AsyncIterables. You must accumulate content while iterating - you cannot iterate twice.

const result = await workflow.run(async ({ step, deps }) => {
// Stream the AI response
const stream = await step('generateStream', () => deps.generateStream(prompt));
// Accumulate while iterating (streams are single-use!)
let text = '';
for await (const chunk of stream) {
if (chunk.delta) {
text += chunk.delta;
}
}
return { text, finishReason: stream.finishReason };
});

AI models with tool calling often complete in multiple phases:

  1. First phase: Model returns finish_reason: "tool_calls" with tool invocations
  2. Tool execution: Your code runs the requested tools
  3. Second phase: Model continues with tool results, may return more tool calls
  4. Final phase: Model returns finish_reason: "stop" with final response
const result = await workflow.run(async ({ step, deps }) => {
let messages = [{ role: 'user', content: prompt }];
while (true) {
const response = await step('chat', () => deps.chat(messages), {
key: `chat:${messages.length}`
});
// Final response - model is done
if (response.finishReason === 'stop') {
return response.content;
}
// Tool calls - execute and continue
if (response.finishReason === 'tool_calls') {
const toolResults = await executeTools(step, response.toolCalls);
messages = [
...messages,
{ role: 'assistant', tool_calls: response.toolCalls },
...toolResults.map(r => ({ role: 'tool', ...r })),
];
continue;
}
// Unexpected finish reason
throw new Error(`Unexpected finish reason: ${response.finishReason}`);
}
});

When executing tools, wrap each tool call in a step for:

  • Caching: Resume from cached tool results on retry
  • Observability: Track tool execution in workflow events
  • Error handling: Isolate tool failures

The key challenge is generating stable, unique keys for tool calls:

// Most reliable - use the SDK-provided toolCallId
async function executeTools(step, toolCalls) {
return Promise.all(
toolCalls.map(async (call) => {
const result = await step(
`tool:${call.name}`,
() => runTool(call.name, call.arguments),
{
key: `tool:${call.name}:${call.id}`, // SDK's unique ID
name: `Tool: ${call.name}`,
}
);
return { tool_call_id: call.id, content: JSON.stringify(result) };
})
);
}

A pattern for creating step-wrapped tool executors:

// Pattern: Create tool executors that close over step
function createToolExecutors(step: RunStep<AppError>) {
return {
searchDocs: async (query: string, toolCallId?: string) => {
const key = toolCallId
? `search:${toolCallId}`
: `search:${hashArgs({ query })}`;
return step(
'search',
() => searchDocuments(query),
{ key, name: 'Search documents' }
);
},
getWeather: async (location: string, toolCallId?: string) => {
const key = toolCallId
? `weather:${toolCallId}`
: `weather:${hashArgs({ location })}`;
return step(
'weather',
() => fetchWeather(location),
{ key, name: 'Get weather' }
);
},
};
}
// Usage in workflow
const result = await workflow.run(async ({ step, deps }) => {
const tools = createToolExecutors(step);
// Tools are now step-wrapped
const docs = await tools.searchDocs('awaitly patterns', 'call_123');
const weather = await tools.getWeather('San Francisco', 'call_456');
return { docs, weather };
});

Here’s a complete example combining all patterns:

import { ok, err, type AsyncResult } from 'awaitly';
import { createWorkflow } from 'awaitly/workflow';
// Define tool implementations
const toolImplementations = {
searchDocs: async (args: { query: string }): AsyncResult<string[], 'SEARCH_ERROR'> => {
// Your search implementation
return ok(['Result 1', 'Result 2']);
},
getWeather: async (args: { location: string }): AsyncResult<{ temp: number }, 'WEATHER_ERROR'> => {
return ok({ temp: 72 });
},
};
type ToolName = keyof typeof toolImplementations;
type AppError = 'SEARCH_ERROR' | 'WEATHER_ERROR' | 'AI_ERROR';
// Create workflow with dependencies
const chatWorkflow = createWorkflow('workflow', { ...toolImplementations,
chat: async (messages: Message[]): AsyncResult<ChatResponse, 'AI_ERROR'> => {
// Your AI SDK call
return ok({ content: '', finishReason: 'stop', toolCalls: [] });
},
});
// Execute workflow
const result = await chatWorkflow.run(async ({ step, deps }) => {
let messages: Message[] = [{ role: 'user', content: 'What is the weather?' }];
let iterations = 0;
const maxIterations = 10;
while (iterations++ < maxIterations) {
const response = await step(
'chat',
() => deps.chat(messages),
{ key: `chat:${iterations}`, name: `Chat iteration ${iterations}` }
);
if (response.finishReason === 'stop') {
return { content: response.content, iterations };
}
if (response.finishReason === 'tool_calls') {
const toolResults = await Promise.all(
response.toolCalls.map(async (call) => {
const impl = deps[call.name as ToolName];
if (!impl) {
return {
tool_call_id: call.id,
content: JSON.stringify({ error: `Unknown tool: ${call.name}` }),
};
}
const result = await step(
`tool:${call.name}`,
() => impl(call.arguments),
{
key: `tool:${call.name}:${call.id}`,
name: `Tool: ${call.name}`,
}
);
return {
tool_call_id: call.id,
content: JSON.stringify(result),
};
})
);
messages = [
...messages,
{ role: 'assistant', tool_calls: response.toolCalls },
...toolResults.map(r => ({ role: 'tool' as const, ...r })),
];
continue;
}
}
return { content: 'Max iterations reached', iterations };
});
if (result.ok) {
console.log('Response:', result.value.content);
console.log('Iterations:', result.value.iterations);
} else {
console.error('Error:', result.error);
}

When using the Vercel AI SDK with awaitly:

import { streamText, tool } from 'ai';
import { ok, err } from 'awaitly';
import { createWorkflow } from 'awaitly/workflow';
const aiWorkflow = createWorkflow('workflow', { generateText: async (prompt: string): AsyncResult<string, 'AI_ERROR'> => {
try {
const { text } = await generateText({
model: openai('gpt-4o'),
prompt,
});
return ok(text);
} catch (e) {
return err('AI_ERROR', { cause: e });
}
},
});
const result = await aiWorkflow.run(async ({ step, deps }) => {
const response = await step(
'explain',
() => deps.generateText('Explain awaitly in one sentence'),
{ key: 'explain', name: 'Generate explanation' }
);
return response; // Return raw value, not ok(response)!
});

AI APIs can fail in various ways. Use step.try for SDK calls that throw:

const result = await workflow.run(async ({ step, deps }) => {
// For SDK calls that throw on error
const response = await step.try(
'openaiChat',
() => openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: prompt }],
}),
{ error: 'AI_ERROR' as const }
);
return response.choices[0].message.content;
});

Or wrap SDK calls in Result-returning functions for consistent error handling:

async function chat(messages: Message[]): AsyncResult<ChatResponse, 'AI_ERROR' | 'RATE_LIMITED'> {
try {
const response = await openai.chat.completions.create({
model: 'gpt-4o',
messages,
});
return ok(response);
} catch (e) {
if (e instanceof RateLimitError) {
return err('RATE_LIMITED', { cause: e });
}
return err('AI_ERROR', { cause: e });
}
}