AI Integration Patterns
This guide covers patterns for integrating awaitly with AI SDKs like OpenAI, Anthropic, or Vercel AI SDK. The key challenges are handling streaming responses, multi-phase tool calling, and wrapping tool execution in steps.
Streaming Output Collection
Section titled “Streaming Output Collection”AI streams are typically single-use AsyncIterables. You must accumulate content while iterating - you cannot iterate twice.
const result = await workflow.run(async ({ step, deps }) => { // Stream the AI response const stream = await step('generateStream', () => deps.generateStream(prompt));
// Accumulate while iterating (streams are single-use!) let text = ''; for await (const chunk of stream) { if (chunk.delta) { text += chunk.delta; } }
return { text, finishReason: stream.finishReason };});// WRONG: Can't iterate a stream twiceconst stream = await step('generateStream', () => deps.generateStream(prompt));
// First iteration consumes the streamfor await (const chunk of stream) { console.log(chunk);}
// Second iteration gets nothing!let text = '';for await (const chunk of stream) { text += chunk.delta; // Never executes}Tool Calling Phases
Section titled “Tool Calling Phases”AI models with tool calling often complete in multiple phases:
- First phase: Model returns
finish_reason: "tool_calls"with tool invocations - Tool execution: Your code runs the requested tools
- Second phase: Model continues with tool results, may return more tool calls
- Final phase: Model returns
finish_reason: "stop"with final response
const result = await workflow.run(async ({ step, deps }) => { let messages = [{ role: 'user', content: prompt }];
while (true) { const response = await step('chat', () => deps.chat(messages), { key: `chat:${messages.length}` });
// Final response - model is done if (response.finishReason === 'stop') { return response.content; }
// Tool calls - execute and continue if (response.finishReason === 'tool_calls') { const toolResults = await executeTools(step, response.toolCalls); messages = [ ...messages, { role: 'assistant', tool_calls: response.toolCalls }, ...toolResults.map(r => ({ role: 'tool', ...r })), ]; continue; }
// Unexpected finish reason throw new Error(`Unexpected finish reason: ${response.finishReason}`); }});Tool Execution in Steps
Section titled “Tool Execution in Steps”When executing tools, wrap each tool call in a step for:
- Caching: Resume from cached tool results on retry
- Observability: Track tool execution in workflow events
- Error handling: Isolate tool failures
Stable Key Generation
Section titled “Stable Key Generation”The key challenge is generating stable, unique keys for tool calls:
// Most reliable - use the SDK-provided toolCallIdasync function executeTools(step, toolCalls) { return Promise.all( toolCalls.map(async (call) => { const result = await step( `tool:${call.name}`, () => runTool(call.name, call.arguments), { key: `tool:${call.name}:${call.id}`, // SDK's unique ID name: `Tool: ${call.name}`, } ); return { tool_call_id: call.id, content: JSON.stringify(result) }; }) );}// When toolCallId isn't available, hash the argumentsimport { createHash } from 'crypto';
function hashArgs(args: unknown): string { return createHash('sha256') .update(JSON.stringify(args)) .digest('hex') .slice(0, 8);}
async function executeTools(step, toolCalls) { return Promise.all( toolCalls.map(async (call) => { const result = await step( `tool:${call.name}`, () => runTool(call.name, call.arguments), { key: `tool:${call.name}:${hashArgs(call.arguments)}`, name: `Tool: ${call.name}`, } ); return { tool_call_id: call.id, content: JSON.stringify(result) }; }) );}Wrapping Tools with Steps
Section titled “Wrapping Tools with Steps”A pattern for creating step-wrapped tool executors:
// Pattern: Create tool executors that close over stepfunction createToolExecutors(step: RunStep<AppError>) { return { searchDocs: async (query: string, toolCallId?: string) => { const key = toolCallId ? `search:${toolCallId}` : `search:${hashArgs({ query })}`;
return step( 'search', () => searchDocuments(query), { key, name: 'Search documents' } ); },
getWeather: async (location: string, toolCallId?: string) => { const key = toolCallId ? `weather:${toolCallId}` : `weather:${hashArgs({ location })}`;
return step( 'weather', () => fetchWeather(location), { key, name: 'Get weather' } ); }, };}
// Usage in workflowconst result = await workflow.run(async ({ step, deps }) => { const tools = createToolExecutors(step);
// Tools are now step-wrapped const docs = await tools.searchDocs('awaitly patterns', 'call_123'); const weather = await tools.getWeather('San Francisco', 'call_456');
return { docs, weather };});Complete Example: Chat with Tools
Section titled “Complete Example: Chat with Tools”Here’s a complete example combining all patterns:
import { ok, err, type AsyncResult } from 'awaitly';import { createWorkflow } from 'awaitly/workflow';
// Define tool implementationsconst toolImplementations = { searchDocs: async (args: { query: string }): AsyncResult<string[], 'SEARCH_ERROR'> => { // Your search implementation return ok(['Result 1', 'Result 2']); }, getWeather: async (args: { location: string }): AsyncResult<{ temp: number }, 'WEATHER_ERROR'> => { return ok({ temp: 72 }); },};
type ToolName = keyof typeof toolImplementations;type AppError = 'SEARCH_ERROR' | 'WEATHER_ERROR' | 'AI_ERROR';
// Create workflow with dependenciesconst chatWorkflow = createWorkflow('workflow', { ...toolImplementations, chat: async (messages: Message[]): AsyncResult<ChatResponse, 'AI_ERROR'> => { // Your AI SDK call return ok({ content: '', finishReason: 'stop', toolCalls: [] }); },});
// Execute workflowconst result = await chatWorkflow.run(async ({ step, deps }) => { let messages: Message[] = [{ role: 'user', content: 'What is the weather?' }]; let iterations = 0; const maxIterations = 10;
while (iterations++ < maxIterations) { const response = await step( 'chat', () => deps.chat(messages), { key: `chat:${iterations}`, name: `Chat iteration ${iterations}` } );
if (response.finishReason === 'stop') { return { content: response.content, iterations }; }
if (response.finishReason === 'tool_calls') { const toolResults = await Promise.all( response.toolCalls.map(async (call) => { const impl = deps[call.name as ToolName]; if (!impl) { return { tool_call_id: call.id, content: JSON.stringify({ error: `Unknown tool: ${call.name}` }), }; }
const result = await step( `tool:${call.name}`, () => impl(call.arguments), { key: `tool:${call.name}:${call.id}`, name: `Tool: ${call.name}`, } );
return { tool_call_id: call.id, content: JSON.stringify(result), }; }) );
messages = [ ...messages, { role: 'assistant', tool_calls: response.toolCalls }, ...toolResults.map(r => ({ role: 'tool' as const, ...r })), ]; continue; } }
return { content: 'Max iterations reached', iterations };});
if (result.ok) { console.log('Response:', result.value.content); console.log('Iterations:', result.value.iterations);} else { console.error('Error:', result.error);}Vercel AI SDK Integration
Section titled “Vercel AI SDK Integration”When using the Vercel AI SDK with awaitly:
import { streamText, tool } from 'ai';import { ok, err } from 'awaitly';import { createWorkflow } from 'awaitly/workflow';
const aiWorkflow = createWorkflow('workflow', { generateText: async (prompt: string): AsyncResult<string, 'AI_ERROR'> => { try { const { text } = await generateText({ model: openai('gpt-4o'), prompt, }); return ok(text); } catch (e) { return err('AI_ERROR', { cause: e }); } },});
const result = await aiWorkflow.run(async ({ step, deps }) => { const response = await step( 'explain', () => deps.generateText('Explain awaitly in one sentence'), { key: 'explain', name: 'Generate explanation' } );
return response; // Return raw value, not ok(response)!});Error Handling for AI Calls
Section titled “Error Handling for AI Calls”AI APIs can fail in various ways. Use step.try for SDK calls that throw:
const result = await workflow.run(async ({ step, deps }) => { // For SDK calls that throw on error const response = await step.try( 'openaiChat', () => openai.chat.completions.create({ model: 'gpt-4o', messages: [{ role: 'user', content: prompt }], }), { error: 'AI_ERROR' as const } );
return response.choices[0].message.content;});Or wrap SDK calls in Result-returning functions for consistent error handling:
async function chat(messages: Message[]): AsyncResult<ChatResponse, 'AI_ERROR' | 'RATE_LIMITED'> { try { const response = await openai.chat.completions.create({ model: 'gpt-4o', messages, }); return ok(response); } catch (e) { if (e instanceof RateLimitError) { return err('RATE_LIMITED', { cause: e }); } return err('AI_ERROR', { cause: e }); }}Related
Section titled “Related”- Retries & Timeouts - Handling transient AI API failures
- Streaming - Working with awaitly’s stream store
- Troubleshooting - Common issues and solutions