AI & Agents
•10 min read
Streaming
Process agent responses token-by-token with real-time guardrails.
Orchestrator Streaming
The simplest way to stream – use orchestrator.runStream() which wraps the agent run with guardrails, approval checks, and state tracking:
import { createAgentOrchestrator } from '@directive-run/ai';
import type { AgentLike } from '@directive-run/ai';
const orchestrator = createAgentOrchestrator({
runner,
autoApproveToolCalls: true,
});
const agent: AgentLike = {
name: 'assistant',
instructions: 'You are a helpful assistant.',
model: 'gpt-4',
};
// Start a streaming run – returns the stream, a result promise, and an abort handle
const { stream, result, abort } = orchestrator.runStream<string>(agent, 'Explain WebAssembly');
for await (const chunk of stream) {
switch (chunk.type) {
case 'token':
process.stdout.write(chunk.data); // Append each token as it arrives
break;
case 'tool_start':
console.log(`\nCalling tool: ${chunk.tool}`); // Agent is invoking a tool
break;
case 'tool_end':
console.log(`Tool done: ${chunk.result}`); // Tool returned a result
break;
case 'guardrail_triggered':
console.warn(`Guardrail ${chunk.guardrailName}: ${chunk.reason}`); // Safety check fired
break;
case 'approval_required':
// Pause and show UI – call orchestrator.approve(chunk.requestId) to continue
break;
case 'done':
console.log(`\n\nDone: ${chunk.totalTokens} tokens in ${chunk.duration}ms`);
break;
case 'error':
console.error(chunk.error);
break;
}
}
// Await the completed result after the stream closes
const finalResult = await result;
Chunk Types
Every stream chunk has a type discriminant:
| Type | Fields | When |
|---|---|---|
token | data, tokenCount | Each token from the agent |
tool_start | tool, toolCallId, arguments | Agent starts calling a tool |
tool_end | tool, toolCallId, result | Tool call completes |
message | message | Full message added to conversation |
guardrail_triggered | guardrailName, reason, partialOutput, stopped | A guardrail blocked content |
approval_required | requestId, toolName | Tool call needs approval |
approval_resolved | requestId, approved | Approval decision made |
progress | phase, message | Status update (starting, generating, tool_calling, finishing) |
done | totalTokens, duration, droppedTokens | Stream completed |
error | error, partialOutput? | An error occurred |
Cancellation
Abort a stream at any time:
const { stream, result, abort } = orchestrator.runStream(agent, input);
// Cancel after a timeout using the abort handle
setTimeout(() => abort(), 5000);
// Or pass an AbortSignal for external cancellation control
const controller = new AbortController();
const { stream: s2 } = orchestrator.runStream(agent, input, {
signal: controller.signal,
});
// Trigger cancellation from anywhere that holds the controller
controller.abort();
Stack Streaming
The AgentStack offers two streaming methods. stack.stream() yields raw token strings, while stack.streamChunks() yields the same rich StreamChunk types as the orchestrator:
import { createAgentStack } from '@directive-run/ai';
const stack = createAgentStack({
runner,
streaming: { runner: myStreamingRunner },
agents: { chat: { agent: chatAgent, capabilities: ['chat'] } },
});
// Simple stream – yields one raw token string at a time
const tokenStream = stack.stream('chat', 'Hello!');
for await (const token of tokenStream) {
process.stdout.write(token);
}
// Rich stream – yields typed chunks (tokens, tool calls, guardrails, progress)
const { stream, result, abort } = stack.streamChunks('chat', 'Hello!');
for await (const chunk of stream) {
if (chunk.type === 'token') process.stdout.write(chunk.data);
}
const finalResult = await result;
Both methods automatically track tokens, record observability spans, and publish to the message bus.
Provider Streaming Runners
Directive ships pre-built streaming runners for OpenAI and Anthropic. These handle SSE parsing, token extraction, and lifecycle hooks automatically:
OpenAI Streaming
import { createOpenAIRunner, createOpenAIStreamingRunner } from '@directive-run/ai/openai';
const runner = createOpenAIRunner({ apiKey: process.env.OPENAI_API_KEY! });
const streamingRunner = createOpenAIStreamingRunner({
apiKey: process.env.OPENAI_API_KEY!,
hooks: {
onAfterCall: ({ durationMs, tokenUsage }) => {
console.log(`${durationMs}ms – ${tokenUsage.inputTokens}in/${tokenUsage.outputTokens}out`);
},
},
});
const stack = createAgentStack({
runner,
streaming: { runner: streamingRunner },
agents: { chat: { instructions: 'You are a helpful assistant.' } },
});
Anthropic Streaming
import { createAnthropicRunner, createAnthropicStreamingRunner } from '@directive-run/ai/anthropic';
const runner = createAnthropicRunner({ apiKey: process.env.ANTHROPIC_API_KEY! });
const streamingRunner = createAnthropicStreamingRunner({
apiKey: process.env.ANTHROPIC_API_KEY!,
});
const stack = createAgentStack({
runner,
streaming: { runner: streamingRunner },
agents: { chat: { instructions: 'You are a helpful assistant.' } },
});
Both streaming runners return tokenUsage with input/output breakdown and support the same hooks interface as the standard runners.
Standalone Streaming
For streaming outside the orchestrator (e.g., direct agent runs without guardrails/approvals), use createStreamingRunner:
import { createStreamingRunner } from '@directive-run/ai';
import type { StreamRunOptions } from '@directive-run/ai';
// Build a streaming runner by wrapping your LLM SDK's streaming API
const streamRunner = createStreamingRunner(
async (agent, input, callbacks) => {
// Start a streaming completion request
const stream = await openai.chat.completions.create({
model: agent.model ?? 'gpt-4',
messages: [
{ role: 'system', content: agent.instructions ?? '' },
{ role: 'user', content: input },
],
stream: true,
});
const messages = [];
let fullContent = '';
for await (const chunk of stream) {
if (callbacks.signal?.aborted) break; // Stop if the caller cancelled
const token = chunk.choices[0]?.delta?.content ?? '';
if (token) {
callbacks.onToken?.(token); // Push each token to the stream
fullContent += token;
}
}
// Return the final assembled result
return {
output: fullContent,
messages,
toolCalls: [],
totalTokens: Math.ceil(fullContent.length / 4),
};
}
);
// Use the runner with backpressure and guardrail options
const { stream, result, abort } = streamRunner(agent, 'Hello', {
backpressure: 'buffer',
guardrailCheckInterval: 50,
});
Backpressure
Control what happens when the consumer is slower than the producer:
// Buffer – keeps all tokens in memory (lossless, default)
const { stream } = streamRunner(agent, input, {
backpressure: 'buffer',
});
// Drop – discards tokens when the buffer fills up (lossy, but fast)
const { stream: s2 } = streamRunner(agent, input, {
backpressure: 'drop',
bufferSize: 100,
});
// Block – pauses the producer until the consumer catches up (lossless, may slow response)
const { stream: s3 } = streamRunner(agent, input, {
backpressure: 'block',
bufferSize: 500,
});
The done chunk includes droppedTokens count when using the drop strategy.
Streaming Guardrails
Evaluate guardrails on partial output as tokens arrive, without waiting for the full response:
import {
createStreamingRunner,
createLengthStreamingGuardrail,
createPatternStreamingGuardrail,
combineStreamingGuardrails,
} from '@directive-run/ai';
const streamRunner = createStreamingRunner(baseRunner, {
streamingGuardrails: [
// Halt the stream if the output grows too long
createLengthStreamingGuardrail({
maxTokens: 2000,
warnAt: 1500, // Emit a warning chunk at 75%
}),
// Halt the stream when sensitive data patterns appear
createPatternStreamingGuardrail({
patterns: [
{ regex: /\b\d{3}-\d{2}-\d{4}\b/, name: 'SSN' },
{ regex: /\bpassword\s*[:=]/i, name: 'Password leak' },
],
}),
],
});
const { stream } = streamRunner(agent, input, {
guardrailCheckInterval: 50, // Evaluate guardrails every 50 tokens
stopOnGuardrail: true, // Terminate the stream on any guardrail failure
});
for await (const chunk of stream) {
if (chunk.type === 'guardrail_triggered') {
console.warn(`${chunk.guardrailName}: ${chunk.reason}`);
if (chunk.stopped) break; // Stream was halted by the guardrail
}
}
Combining Guardrails
Merge multiple streaming guardrails into one:
import { combineStreamingGuardrails } from '@directive-run/ai';
// Merge multiple streaming guardrails into a single checker
const combined = combineStreamingGuardrails([
createLengthStreamingGuardrail({ maxTokens: 2000 }),
createPatternStreamingGuardrail({ patterns: [...] }),
]);
const streamRunner = createStreamingRunner(baseRunner, {
streamingGuardrails: [combined],
});
Adapting Output Guardrails
Reuse existing output guardrails as streaming guardrails:
import { adaptOutputGuardrail } from '@directive-run/ai';
// Reuse an existing output guardrail as a streaming guardrail
const streamingVersion = adaptOutputGuardrail(
'pii-streaming', // Name for logging and error messages
myOutputGuardrail, // Your existing guardrail function
{ minTokens: 100 }, // Wait for 100 tokens before first check
);
Stream Operators
Transform, filter, and inspect streams with composable operators:
Collect Tokens
Consume an entire stream and return the concatenated text:
import { collectTokens } from '@directive-run/ai';
// Consume the entire stream and return the concatenated text
const { stream } = orchestrator.runStream(agent, input);
const fullOutput = await collectTokens(stream);
Tap
Observe chunks without modifying the stream (logging, metrics):
import { tapStream } from '@directive-run/ai';
const { stream } = orchestrator.runStream(agent, input);
// Observe each chunk for side effects (logging, metrics) without modifying it
const logged = tapStream(stream, (chunk) => {
if (chunk.type === 'token') tokenCount++;
if (chunk.type === 'error') reportError(chunk.error);
});
for await (const chunk of logged) {
// Chunks are unchanged – tap only inspects them
}
Filter
Keep only specific chunk types:
import { filterStream } from '@directive-run/ai';
const { stream } = orchestrator.runStream(agent, input);
// Keep only the chunk types you care about
const tokensOnly = filterStream(stream, ['token', 'done']);
for await (const chunk of tokensOnly) {
// TypeScript narrows chunk.type to 'token' | 'done'
}
Map
Transform chunks:
import { mapStream } from '@directive-run/ai';
const { stream } = orchestrator.runStream(agent, input);
// Transform each chunk as it flows through the stream
const uppercased = mapStream(stream, (chunk) => {
if (chunk.type === 'token') {
return { ...chunk, data: chunk.data.toUpperCase() };
}
return chunk; // Pass non-token chunks through unchanged
});
Framework Integration
The streaming API is framework-agnostic – orchestrator.runStream() works the same everywhere. The framework layer handles reactive UI updates as chunks arrive.
React
import { useState, useCallback } from 'react';
import { useAgentOrchestrator, useFact } from '@directive-run/react';
function ChatStream() {
const orchestrator = useAgentOrchestrator({ runner, autoApproveToolCalls: true });
const agent = useFact(orchestrator.system, '__agent');
const [output, setOutput] = useState('');
const send = useCallback(async (input: string) => {
setOutput(''); // Clear previous output before starting a new stream
const { stream } = orchestrator.runStream(myAgent, input);
// Append each token to state as it arrives
for await (const chunk of stream) {
if (chunk.type === 'token') setOutput((prev) => prev + chunk.data);
}
}, [orchestrator]);
return (
<div>
<p>{output}</p>
{agent?.status === 'running' && <span className="cursor" />}
</div>
);
}
Vue
<script setup>
import { ref, onUnmounted } from 'vue';
import { createAgentOrchestrator } from '@directive-run/ai';
import { useFact } from '@directive-run/vue';
const orchestrator = createAgentOrchestrator({ runner, autoApproveToolCalls: true });
onUnmounted(() => orchestrator.dispose());
const agent = useFact(orchestrator.system, '__agent');
const output = ref('');
async function send(input: string) {
output.value = ''; // Reset before each new stream
const { stream } = orchestrator.runStream(myAgent, input);
for await (const chunk of stream) {
if (chunk.type === 'token') output.value += chunk.data; // Append tokens reactively
}
}
</script>
<template>
<p>{{ output }}</p>
<span v-if="agent?.status === 'running'" class="cursor" />
</template>
Svelte
<script>
import { createAgentOrchestrator } from '@directive-run/ai';
import { useFact } from '@directive-run/svelte';
import { onDestroy } from 'svelte';
const orchestrator = createAgentOrchestrator({ runner, autoApproveToolCalls: true });
onDestroy(() => orchestrator.dispose());
const agent = useFact(orchestrator.system, '__agent');
let output = '';
async function send(input) {
output = ''; // Clear previous response
const { stream } = orchestrator.runStream(myAgent, input);
for await (const chunk of stream) {
if (chunk.type === 'token') output += chunk.data; // Svelte reactively updates the template
}
}
</script>
<p>{output}</p>
{#if $agent?.status === 'running'}<span class="cursor" />{/if}
Solid
import { createSignal } from 'solid-js';
import { createAgentOrchestrator } from '@directive-run/ai';
import { useFact } from '@directive-run/solid';
import { onCleanup } from 'solid-js';
function ChatStream() {
const orchestrator = createAgentOrchestrator({ runner, autoApproveToolCalls: true });
onCleanup(() => orchestrator.dispose());
const agent = useFact(orchestrator.system, '__agent');
const [output, setOutput] = createSignal('');
async function send(input: string) {
setOutput(''); // Reset signal before streaming
const { stream } = orchestrator.runStream(myAgent, input);
for await (const chunk of stream) {
if (chunk.type === 'token') setOutput((prev) => prev + chunk.data);
}
}
return (
<div>
<p>{output()}</p>
{agent()?.status === 'running' && <span class="cursor" />}
</div>
);
}
Lit
import { LitElement, html } from 'lit';
import { createAgentOrchestrator } from '@directive-run/ai';
import { FactController } from '@directive-run/lit';
class ChatStream extends LitElement {
private orchestrator = createAgentOrchestrator({ runner, autoApproveToolCalls: true });
private agent = new FactController(this, this.orchestrator.system, '__agent');
private output = '';
disconnectedCallback() {
super.disconnectedCallback();
this.orchestrator.dispose();
}
async send(input: string) {
this.output = '';
this.requestUpdate(); // Clear the display immediately
const { stream } = this.orchestrator.runStream(myAgent, input);
for await (const chunk of stream) {
if (chunk.type === 'token') {
this.output += chunk.data;
this.requestUpdate(); // Re-render after each token
}
}
}
render() {
return html`
<p>${this.output}</p>
${this.agent.value?.status === 'running' ? html`<span class="cursor"></span>` : ''}
`;
}
}
Next Steps
- Agent Orchestrator – Full orchestrator API
- Guardrails – Input/output validation
- Multi-Agent – Parallel and sequential patterns

