Streaming
Stream data in real-time within workflows. Perfect for AI token streaming, live updates, and processing large datasets incrementally.
Quick Start
Section titled “Quick Start”import { createWorkflow } from 'awaitly/workflow';import { createMemoryStreamStore, toAsyncIterable } from 'awaitly/streaming';
// 1. Create a stream storeconst streamStore = createMemoryStreamStore();
// 2. Pass it to createWorkflowconst workflow = createWorkflow('workflow', deps, { streamStore });
// 3. Write to streamsconst result = await workflow.run(async ({ step, deps }) => { const writer = step.getWritable<string>({ namespace: 'tokens' });
await writer.write('Hello'); await writer.write(' World'); await writer.close();});Stream Stores
Section titled “Stream Stores”Choose a store based on your needs:
import { createMemoryStreamStore } from 'awaitly/streaming';
const streamStore = createMemoryStreamStore();import { createFileStreamStore } from 'awaitly/streaming';import * as fs from 'node:fs/promises';
const streamStore = createFileStreamStore({ directory: './streams', fs,});Writing to Streams
Section titled “Writing to Streams”Use step.getWritable<T>() to create a writer:
const result = await workflow.run(async ({ step, deps }) => { const writer = step.getWritable<string>({ namespace: 'ai-response' });
// Write items const writeResult = await writer.write('token1'); if (!writeResult.ok) { return err(writeResult.error); }
await writer.write('token2'); await writer.write('token3');
// Always close when done await writer.close();});AI Token Streaming Example
Section titled “AI Token Streaming Example”const result = await workflow.run(async ({ step, deps }) => { const writer = step.getWritable<string>({ namespace: 'ai-tokens' });
await step('generateAI', () => deps.generateAI({ prompt: 'Explain TypeScript', onToken: async (token) => { await writer.write(token); } }), { key: 'generate' });
await writer.close();});Reading from Streams
Section titled “Reading from Streams”Use step.getReadable<T>() to consume a stream:
const result = await workflow.run(async ({ step, deps }) => { const reader = step.getReadable<string>({ namespace: 'tokens' });
let item = await reader.read(); while (item.ok) { console.log(item.value); item = await reader.read(); }
if (item.error.type === 'STREAM_ENDED') { console.log('Stream finished at position', item.error.finalPosition); }});Resume from Position
Section titled “Resume from Position”Resume reading from where you left off:
const reader = step.getReadable<string>({ namespace: 'tokens', startIndex: lastPosition + 1,});Using AsyncIterable
Section titled “Using AsyncIterable”Convert readers to for await...of syntax:
import { toAsyncIterable } from 'awaitly/streaming';
const result = await workflow.run(async ({ step, deps }) => { const reader = step.getReadable<string>({ namespace: 'tokens' });
for await (const token of toAsyncIterable(reader)) { process.stdout.write(token); }});Stream Transformers
Section titled “Stream Transformers”Transform streams with functional operators:
map / filter
Section titled “map / filter”import { map, filter } from 'awaitly/streaming';
const reader = step.getReadable<number>({ namespace: 'numbers' });
// Filter even numbers, then double themconst evens = filter(reader, n => n % 2 === 0);const doubled = map(evens, n => n * 2);
for await (const value of doubled) { console.log(value); // 4, 8, 12, ...}chunk (Batching)
Section titled “chunk (Batching)”import { chunk } from 'awaitly/streaming';
const reader = step.getReadable<string>({ namespace: 'items' });const batches = chunk(reader, 10); // Groups of 10
for await (const batch of batches) { await processBatch(batch); // batch is string[]}take / skip
Section titled “take / skip”import { take, skip, collect } from 'awaitly/streaming';
const reader = step.getReadable<number>({ namespace: 'numbers' });
// Skip first 5, take next 10const skipped = skip(reader, 5);const limited = take(skipped, 10);const items = await collect(limited); // number[]reduce
Section titled “reduce”import { reduce } from 'awaitly/streaming';
const reader = step.getReadable<number>({ namespace: 'numbers' });const sum = await reduce(reader, (acc, n) => acc + n, 0);pipe (Composition)
Section titled “pipe (Composition)”import { pipe, filter, map, take, collect } from 'awaitly/streaming';
const reader = step.getReadable<number>({ namespace: 'numbers' });
const result = await collect( pipe( reader, s => filter(s, n => n % 2 === 0), s => map(s, n => n * 2), s => take(s, 10) ));Batch Processing with streamForEach
Section titled “Batch Processing with streamForEach”Process stream items with concurrency and checkpointing:
const result = await workflow.run(async ({ step, deps }) => { const reader = step.getReadable<Order>({ namespace: 'orders' });
const processed = await step.streamForEach( reader, async (order) => { const result = await deps.processOrder(order); return ok(result); }, { name: 'process-orders', concurrency: 5, // Process 5 in parallel checkpointInterval: 10, // Checkpoint every 10 items } );
if (processed.ok) { console.log(`Processed ${processed.value.processedCount} orders`); }});External Stream Access
Section titled “External Stream Access”Consume streams outside workflows (e.g., HTTP handlers):
import { getStreamReader, toAsyncIterable } from 'awaitly/streaming';
// Express/Fastify handlerapp.get('/stream/:workflowId', async (req, res) => { const reader = getStreamReader<string>({ store: streamStore, workflowId: req.params.workflowId, namespace: 'ai-response', startIndex: 0, pollTimeout: 30000, // Wait up to 30s for new items });
res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive');
for await (const chunk of toAsyncIterable(reader)) { res.write(`data: ${JSON.stringify(chunk)}\n\n`); }
res.end();});Multiple Namespaces
Section titled “Multiple Namespaces”Use namespaces for multiple streams per workflow:
const result = await workflow.run(async ({ step, deps }) => { const tokenWriter = step.getWritable<string>({ namespace: 'tokens' }); const progressWriter = step.getWritable<number>({ namespace: 'progress' });
await tokenWriter.write('Starting...'); await progressWriter.write(0);
// ... do work ...
await progressWriter.write(100); await tokenWriter.write('Done!');
await tokenWriter.close(); await progressWriter.close();});Error Handling
Section titled “Error Handling”All stream operations return Results:
import { isStreamEnded, isStreamWriteError } from 'awaitly/streaming';
// Writingconst writeResult = await writer.write('data');if (!writeResult.ok) { if (isStreamWriteError(writeResult.error)) { switch (writeResult.error.reason) { case 'closed': console.log('Stream already closed'); break; case 'aborted': console.log('Stream was aborted'); break; case 'store_error': console.log('Storage failed:', writeResult.error.cause); break; } }}
// Readingconst readResult = await reader.read();if (!readResult.ok) { if (isStreamEnded(readResult.error)) { console.log('Stream complete at position', readResult.error.finalPosition); }}Aborting a Stream
Section titled “Aborting a Stream”const writer = step.getWritable<string>({ namespace: 'response' });
try { await generateContent(writer);} catch (error) { writer.abort(error); // Signal error to readers}Backpressure
Section titled “Backpressure”Control memory usage when consumers are slower than producers:
const writer = step.getWritable<string>({ namespace: 'tokens', highWaterMark: 16, // Pause after 16 buffered items});
for (const item of largeDataset) { const result = await writer.write(item); if (!result.ok) { break; }}Workflow Events
Section titled “Workflow Events”Stream operations emit events:
const workflow = createWorkflow('workflow', deps, { streamStore, onEvent: (event) => { switch (event.type) { case 'stream_created': console.log(`Stream ${event.namespace} created`); break; case 'stream_write': console.log(`Wrote to ${event.namespace} at position ${event.position}`); break; case 'stream_close': console.log(`Stream ${event.namespace} closed`); break; } },});API Reference
Section titled “API Reference”Step Methods
Section titled “Step Methods”| Method | Description |
|---|---|
step.getWritable<T>(options?) | Create a stream writer |
step.getReadable<T>(options?) | Create a stream reader |
step.streamForEach(source, fn, options?) | Batch process with concurrency |
StreamWriter
Section titled “StreamWriter”| Property/Method | Description |
|---|---|
write(value) | Write item, returns AsyncResult<void, StreamWriteError> |
close() | Close stream |
abort(reason) | Abort with error |
writable | Whether stream accepts writes |
position | Number of items written |
StreamReader
Section titled “StreamReader”| Property/Method | Description |
|---|---|
read() | Read next item, returns AsyncResult<T, StreamReadError> |
close() | Stop reading |
readable | Whether more data may be available |
position | Current read position |
Transformers
Section titled “Transformers”| Function | Description |
|---|---|
toAsyncIterable(reader) | Convert to async iterator |
map(source, fn) | Transform each item |
filter(source, predicate) | Filter items |
chunk(source, size) | Group into batches |
take(source, count) | Take first N items |
skip(source, count) | Skip first N items |
collect(source) | Collect all items into array |
reduce(source, fn, initial) | Reduce to single value |
pipe(source, ...transforms) | Compose transformers |
Complete Example
Section titled “Complete Example”import { createWorkflow } from 'awaitly/workflow';import { createMemoryStreamStore, toAsyncIterable, map, filter, collect,} from 'awaitly/streaming';
const streamStore = createMemoryStreamStore();
const workflow = createWorkflow('workflow', { generateTokens }, { streamStore });
// Producer workflowconst producerResult = await workflow.run(async ({ step, deps }) => { const writer = step.getWritable<{ token: string; score: number }>({ namespace: 'ai-output', });
await step('generateTokens', () => deps.generateTokens({ prompt: 'Explain streaming', onToken: async (token, score) => { await writer.write({ token, score }); }, }), { key: 'generate' });
await writer.close(); return { status: 'complete' };});
// Consumer (can run concurrently or later)const consumerResult = await workflow.run(async ({ step, deps }) => { const reader = step.getReadable<{ token: string; score: number }>({ namespace: 'ai-output', });
// Filter high-confidence tokens and extract text const highConfidence = filter( toAsyncIterable(reader), item => item.score > 0.8 ); const tokens = map(highConfidence, item => item.token); const text = (await collect(tokens)).join('');
return { text };});What’s Next?
Section titled “What’s Next?”You’ve completed the Foundations section. Continue learning with:
- Batch Processing - Process items in bulk with concurrency control
- Human-in-the-Loop - Pause workflows for human approval
- Visualization - Visualize workflow execution