Skip to content

Streaming

Stream data in real-time within workflows. Perfect for AI token streaming, live updates, and processing large datasets incrementally.

import { createWorkflow } from 'awaitly/workflow';
import { createMemoryStreamStore, toAsyncIterable } from 'awaitly/streaming';
// 1. Create a stream store
const streamStore = createMemoryStreamStore();
// 2. Pass it to createWorkflow
const workflow = createWorkflow('workflow', deps, { streamStore });
// 3. Write to streams
const result = await workflow.run(async ({ step, deps }) => {
const writer = step.getWritable<string>({ namespace: 'tokens' });
await writer.write('Hello');
await writer.write(' World');
await writer.close();
});

Choose a store based on your needs:

import { createMemoryStreamStore } from 'awaitly/streaming';
const streamStore = createMemoryStreamStore();

Use step.getWritable<T>() to create a writer:

const result = await workflow.run(async ({ step, deps }) => {
const writer = step.getWritable<string>({ namespace: 'ai-response' });
// Write items
const writeResult = await writer.write('token1');
if (!writeResult.ok) {
return err(writeResult.error);
}
await writer.write('token2');
await writer.write('token3');
// Always close when done
await writer.close();
});
const result = await workflow.run(async ({ step, deps }) => {
const writer = step.getWritable<string>({ namespace: 'ai-tokens' });
await step('generateAI', () => deps.generateAI({
prompt: 'Explain TypeScript',
onToken: async (token) => {
await writer.write(token);
}
}), { key: 'generate' });
await writer.close();
});

Use step.getReadable<T>() to consume a stream:

const result = await workflow.run(async ({ step, deps }) => {
const reader = step.getReadable<string>({ namespace: 'tokens' });
let item = await reader.read();
while (item.ok) {
console.log(item.value);
item = await reader.read();
}
if (item.error.type === 'STREAM_ENDED') {
console.log('Stream finished at position', item.error.finalPosition);
}
});

Resume reading from where you left off:

const reader = step.getReadable<string>({
namespace: 'tokens',
startIndex: lastPosition + 1,
});

Convert readers to for await...of syntax:

import { toAsyncIterable } from 'awaitly/streaming';
const result = await workflow.run(async ({ step, deps }) => {
const reader = step.getReadable<string>({ namespace: 'tokens' });
for await (const token of toAsyncIterable(reader)) {
process.stdout.write(token);
}
});

Transform streams with functional operators:

import { map, filter } from 'awaitly/streaming';
const reader = step.getReadable<number>({ namespace: 'numbers' });
// Filter even numbers, then double them
const evens = filter(reader, n => n % 2 === 0);
const doubled = map(evens, n => n * 2);
for await (const value of doubled) {
console.log(value); // 4, 8, 12, ...
}
import { chunk } from 'awaitly/streaming';
const reader = step.getReadable<string>({ namespace: 'items' });
const batches = chunk(reader, 10); // Groups of 10
for await (const batch of batches) {
await processBatch(batch); // batch is string[]
}
import { take, skip, collect } from 'awaitly/streaming';
const reader = step.getReadable<number>({ namespace: 'numbers' });
// Skip first 5, take next 10
const skipped = skip(reader, 5);
const limited = take(skipped, 10);
const items = await collect(limited); // number[]
import { reduce } from 'awaitly/streaming';
const reader = step.getReadable<number>({ namespace: 'numbers' });
const sum = await reduce(reader, (acc, n) => acc + n, 0);
import { pipe, filter, map, take, collect } from 'awaitly/streaming';
const reader = step.getReadable<number>({ namespace: 'numbers' });
const result = await collect(
pipe(
reader,
s => filter(s, n => n % 2 === 0),
s => map(s, n => n * 2),
s => take(s, 10)
)
);

Process stream items with concurrency and checkpointing:

const result = await workflow.run(async ({ step, deps }) => {
const reader = step.getReadable<Order>({ namespace: 'orders' });
const processed = await step.streamForEach(
reader,
async (order) => {
const result = await deps.processOrder(order);
return ok(result);
},
{
name: 'process-orders',
concurrency: 5, // Process 5 in parallel
checkpointInterval: 10, // Checkpoint every 10 items
}
);
if (processed.ok) {
console.log(`Processed ${processed.value.processedCount} orders`);
}
});

Consume streams outside workflows (e.g., HTTP handlers):

import { getStreamReader, toAsyncIterable } from 'awaitly/streaming';
// Express/Fastify handler
app.get('/stream/:workflowId', async (req, res) => {
const reader = getStreamReader<string>({
store: streamStore,
workflowId: req.params.workflowId,
namespace: 'ai-response',
startIndex: 0,
pollTimeout: 30000, // Wait up to 30s for new items
});
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
for await (const chunk of toAsyncIterable(reader)) {
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
res.end();
});

Use namespaces for multiple streams per workflow:

const result = await workflow.run(async ({ step, deps }) => {
const tokenWriter = step.getWritable<string>({ namespace: 'tokens' });
const progressWriter = step.getWritable<number>({ namespace: 'progress' });
await tokenWriter.write('Starting...');
await progressWriter.write(0);
// ... do work ...
await progressWriter.write(100);
await tokenWriter.write('Done!');
await tokenWriter.close();
await progressWriter.close();
});

All stream operations return Results:

import { isStreamEnded, isStreamWriteError } from 'awaitly/streaming';
// Writing
const writeResult = await writer.write('data');
if (!writeResult.ok) {
if (isStreamWriteError(writeResult.error)) {
switch (writeResult.error.reason) {
case 'closed':
console.log('Stream already closed');
break;
case 'aborted':
console.log('Stream was aborted');
break;
case 'store_error':
console.log('Storage failed:', writeResult.error.cause);
break;
}
}
}
// Reading
const readResult = await reader.read();
if (!readResult.ok) {
if (isStreamEnded(readResult.error)) {
console.log('Stream complete at position', readResult.error.finalPosition);
}
}
const writer = step.getWritable<string>({ namespace: 'response' });
try {
await generateContent(writer);
} catch (error) {
writer.abort(error); // Signal error to readers
}

Control memory usage when consumers are slower than producers:

const writer = step.getWritable<string>({
namespace: 'tokens',
highWaterMark: 16, // Pause after 16 buffered items
});
for (const item of largeDataset) {
const result = await writer.write(item);
if (!result.ok) {
break;
}
}

Stream operations emit events:

const workflow = createWorkflow('workflow', deps, {
streamStore,
onEvent: (event) => {
switch (event.type) {
case 'stream_created':
console.log(`Stream ${event.namespace} created`);
break;
case 'stream_write':
console.log(`Wrote to ${event.namespace} at position ${event.position}`);
break;
case 'stream_close':
console.log(`Stream ${event.namespace} closed`);
break;
}
},
});
MethodDescription
step.getWritable<T>(options?)Create a stream writer
step.getReadable<T>(options?)Create a stream reader
step.streamForEach(source, fn, options?)Batch process with concurrency
Property/MethodDescription
write(value)Write item, returns AsyncResult<void, StreamWriteError>
close()Close stream
abort(reason)Abort with error
writableWhether stream accepts writes
positionNumber of items written
Property/MethodDescription
read()Read next item, returns AsyncResult<T, StreamReadError>
close()Stop reading
readableWhether more data may be available
positionCurrent read position
FunctionDescription
toAsyncIterable(reader)Convert to async iterator
map(source, fn)Transform each item
filter(source, predicate)Filter items
chunk(source, size)Group into batches
take(source, count)Take first N items
skip(source, count)Skip first N items
collect(source)Collect all items into array
reduce(source, fn, initial)Reduce to single value
pipe(source, ...transforms)Compose transformers
import { createWorkflow } from 'awaitly/workflow';
import {
createMemoryStreamStore,
toAsyncIterable,
map,
filter,
collect,
} from 'awaitly/streaming';
const streamStore = createMemoryStreamStore();
const workflow = createWorkflow('workflow', { generateTokens },
{ streamStore }
);
// Producer workflow
const producerResult = await workflow.run(async ({ step, deps }) => {
const writer = step.getWritable<{ token: string; score: number }>({
namespace: 'ai-output',
});
await step('generateTokens', () => deps.generateTokens({
prompt: 'Explain streaming',
onToken: async (token, score) => {
await writer.write({ token, score });
},
}), { key: 'generate' });
await writer.close();
return { status: 'complete' };
});
// Consumer (can run concurrently or later)
const consumerResult = await workflow.run(async ({ step, deps }) => {
const reader = step.getReadable<{ token: string; score: number }>({
namespace: 'ai-output',
});
// Filter high-confidence tokens and extract text
const highConfidence = filter(
toAsyncIterable(reader),
item => item.score > 0.8
);
const tokens = map(highConfidence, item => item.token);
const text = (await collect(tokens)).join('');
return { text };
});

You’ve completed the Foundations section. Continue learning with: