Skip to main content

Infrastructure

8 min read

SSE Transport

Turn any streamable source into a Server-Sent Events HTTP response.


Overview

createSSETransport converts token streams into a standard SSE byte stream. It accepts any object implementing the SSEStreamable interface (an object with a .stream() method). It works with any WinterCG-compatible runtime (Node 18+, Deno, Bun, Cloudflare Workers, Next.js App Router).

import { createSSETransport, createStreamingRunner, createAgentOrchestrator } from '@directive-run/ai';

const transport = createSSETransport({
  maxResponseChars: 10_000,
  heartbeatIntervalMs: 15_000,
  errorMessages: {
    INPUT_GUARDRAIL_FAILED: 'Your message was flagged by our safety filter.',
  },
});

// Create a streamable wrapper that SSE transport can use
const streamRunner = createStreamingRunner(myStreamingCallbackRunner);

const docsAgent = {
  name: 'docs-qa',
  instructions: 'Answer questions using the provided documentation context.',
  model: 'claude-sonnet-4-5-20250929',
};

// SSE transport accepts any SSEStreamable – an object with a .stream(agentOrInput, input?) method
const streamable = {
  stream: (input: string) => streamRunner(docsAgent, input),
};

// Next.js route handler
export async function POST(request: Request) {
  const { message } = await request.json();

  return transport.toResponse(streamable, message);
}

API

createSSETransport(config?)

Returns an SSETransport with two methods: toResponse() and toStream().

SSETransportConfig

PropertyTypeDefaultDescription
maxResponseCharsnumberInfinityTruncate the response after this many characters
truncationMessagestring'\n\n*[Response truncated]*'Text appended when truncation occurs
heartbeatIntervalMsnumber0 (disabled)Send a heartbeat event at this interval to keep the connection alive
errorMessagesRecord<string, string> | (error) => stringMap error codes to user-facing messages, or provide a function
headersRecord<string, string>Extra headers merged into the SSE response

SSEEvent

The transport emits a discriminated union of five event types. Import for client-side type safety:

import type { SSEEvent } from '@directive-run/ai';
type SSEEvent =
  | { type: 'text'; text: string }
  | { type: 'truncated'; text: string }
  | { type: 'done' }
  | { type: 'error'; message: string }
  | { type: 'heartbeat'; timestamp: number };

toResponse(streamable, input, opts?)

Creates a full Response object with SSE headers (Content-Type: text/event-stream, Cache-Control: no-cache, Connection: keep-alive). Pass it directly as the return value from a route handler.

export async function POST(request: Request) {
  const { message } = await request.json();

  return transport.toResponse(streamable, message);
}

toStream(streamable, input, opts?)

Returns just the ReadableStream<Uint8Array> for frameworks like Express or Koa where you pipe the stream into res.write() manually.

app.post('/api/chat', async (req, res) => {
  const stream = transport.toStream(streamable, req.body.message);
  const reader = stream.getReader();
  res.setHeader('Content-Type', 'text/event-stream');

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    res.write(value);
  }
  res.end();
});

Both methods accept an optional { signal?: AbortSignal } for cancellation.


SSE Event Types

Each SSE frame is a JSON-encoded data: line. Clients parse the type discriminant to handle each event:

TypeFieldsWhen
texttext: stringEach token from the agent stream
truncatedtext: stringThe response exceeded maxResponseChars and was cut short
doneThe stream completed successfully
errormessage: stringAn error occurred (message is user-facing)
heartbeattimestamp: numberKeep-alive ping at the configured interval (Unix ms)

Client-side parsing

Since the transport uses data: framing with custom JSON event types (not named SSE events), use fetch with a streaming reader rather than EventSource. EventSource only supports GET requests and expects standard SSE event: fields, which this transport does not use.

const res = await fetch('/api/chat', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ message }),
});

if (!res.ok) {
  // Handle HTTP errors (429, 400, etc.) before parsing SSE
  const err = await res.json();
  showError(err.error);

  return;
}

const reader = res.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  // Append new bytes – a single SSE frame may split across two reads
  buffer += decoder.decode(value, { stream: true });

  const lines = buffer.split('\n');
  buffer = lines.pop() ?? '';  // Retain the incomplete trailing line

  for (const line of lines) {
    if (!line.startsWith('data: ')) continue;
    const data = line.slice(6).trim();
    if (!data) continue;

    const event = JSON.parse(data);

    switch (event.type) {
      case 'text':
        appendToUI(event.text);
        break;
      case 'truncated':
        appendToUI(event.text);  // Show truncation notice
        break;
      case 'done':
        finishStream();
        break;
      case 'error':
        showError(event.message);
        break;
      case 'heartbeat':
        // Connection is alive, no action needed
        break;
    }
  }
}

Buffering is required

A single SSE frame can split across two reader.read() calls. Always retain the incomplete trailing line in a buffer (as shown above) rather than splitting on '\n' and parsing every fragment. Without buffering, JSON.parse will throw on partial frames.


Error Mapping

Map internal error codes to user-friendly messages. Pass a record of code-to-message pairs, or a function for full control:

// Record-based mapping
const transport = createSSETransport({
  errorMessages: {
    INPUT_GUARDRAIL_FAILED: 'Your message was flagged by our safety filter.',
    RATE_LIMIT_EXCEEDED: 'Too many requests. Please wait a moment.',
    CIRCUIT_OPEN: 'The service is temporarily unavailable.',
  },
});

// Function-based mapping
const transport = createSSETransport({
  errorMessages: (error) => {
    if (error instanceof RateLimitError) {
      return 'Slow down, please.';
    }

    return 'Something went wrong. Please try again.';
  },
});

When an error has a code property that matches a key in the record, that message is sent. Otherwise the default message is used: "AI service temporarily unavailable. Please try again."

Throwing error mappers

If a function-based errorMessages mapper throws, the transport catches the exception and falls back to the default error message. This prevents a broken mapper from crashing the SSE stream.


Truncation

Protect against runaway responses by capping the total character count:

const transport = createSSETransport({
  maxResponseChars: 8_000,
  truncationMessage: '\n\n---\n*Response limit reached.*',
});

When the limit is hit, the transport sends the truncation message as a truncated event, sends a done event, and aborts the underlying token stream. The final stream result is still awaited to ensure metrics and token counts are recorded.

Truncation sizing

The truncationMessage length is not counted against maxResponseChars. Suggested values:

Use casemaxResponseChars
Chat widget8,000–12,000
Docs Q&A15,000–25,000
Summarization3,000–5,000

Heartbeat

Long-running responses can be dropped by proxies and load balancers that enforce idle timeouts. Enable heartbeat to send periodic keep-alive events:

const transport = createSSETransport({
  heartbeatIntervalMs: 15_000,  // Send a heartbeat every 15 seconds
});

Heartbeat events are { type: "heartbeat", timestamp: 1707836400000 } where timestamp is Unix milliseconds (Date.now()). The timer is cleaned up automatically when the stream closes or errors.

Proxy idle timeouts

Most reverse proxies enforce idle-connection timeouts: nginx defaults to 60s, AWS ALB to 60s, and Cloudflare to 100s. Set heartbeatIntervalMs to 15,000–25,000 ms to stay well within these limits.


Abort Signal Propagation

Pass an AbortSignal to cancel the stream from the server side. This is useful for tying the stream lifetime to the HTTP request:

export async function POST(request: Request) {
  const { message } = await request.json();

  return transport.toResponse(streamable, message, {
    signal: request.signal,  // Cancels the agent stream if the client disconnects
  });
}

The signal is forwarded to the streaming runner, which aborts the underlying LLM call.


createAnthropicStreamingRunner

A built-in streaming runner that calls the Anthropic Messages API with server-sent events. It is re-exported from the @directive-run/ai entry point (and also available from @directive-run/ai/anthropic). Pair it with the SSE transport for an end-to-end Anthropic streaming pipeline.

import {
  createAnthropicRunner,
  createAnthropicStreamingRunner,
} from '@directive-run/ai/anthropic';
import { createStreamingRunner } from '@directive-run/ai';

const anthropicStreamingRunner = createAnthropicStreamingRunner({
  apiKey: process.env.ANTHROPIC_API_KEY!,
  model: 'claude-sonnet-4-5-20250929',
  maxTokens: 4096,
});

const streamRunner = createStreamingRunner(anthropicStreamingRunner);

const chatAgent = { name: 'chat', instructions: 'You are helpful.', model: 'claude-sonnet-4-5-20250929' };

// Create a streamable wrapper for SSE transport
const streamable = {
  stream: (input: string) => streamRunner(chatAgent, input),
};

AnthropicStreamingRunnerOptions

PropertyTypeDefaultDescription
apiKeystringrequiredAnthropic API key
modelstring'claude-sonnet-4-5-20250929'Default model (overridden by agent.model)
maxTokensnumber4096Maximum tokens to generate
baseURLstring'https://api.anthropic.com/v1'API base URL
fetchtypeof fetchglobalThis.fetchCustom fetch implementation

The runner reads each SSE event from the Anthropic API, emits tokens via callbacks.onToken(), tracks input/output token counts from message_start and message_delta events, and returns the assembled result.


Full Example: Next.js Route Handler

A complete Next.js App Router endpoint combining RAG enrichment, SSE transport, and a streaming runner:

// app/api/chat/route.ts
import {
  createAnthropicRunner,
  createAnthropicStreamingRunner,
} from '@directive-run/ai/anthropic';
import { createOpenAIEmbedder } from '@directive-run/ai/openai';
import {
  createStreamingRunner,
  createRAGEnricher,
  createJSONFileStore,
  createSSETransport,
} from '@directive-run/ai';

const apiKey = process.env.ANTHROPIC_API_KEY!;

const enricher = createRAGEnricher({
  embedder: createOpenAIEmbedder({ apiKey: process.env.OPENAI_API_KEY! }),
  storage: createJSONFileStore({ filePath: './embeddings.json' }),
  topK: 5,
  minSimilarity: 0.3,
});

const docsAgent = {
  name: 'docs-qa',
  instructions: 'Answer questions using the provided documentation context.',
  model: 'claude-sonnet-4-5-20250929',
};

const streamRunner = createStreamingRunner(
  createAnthropicStreamingRunner({ apiKey }),
);

// SSE transport accepts any SSEStreamable
const streamable = {
  stream: (input: string) => streamRunner(docsAgent, input),
};

const transport = createSSETransport({
  maxResponseChars: 10_000,
  heartbeatIntervalMs: 15_000,
  errorMessages: {
    INPUT_GUARDRAIL_FAILED: 'Your message was flagged by our safety filter.',
  },
});

export async function POST(request: Request) {
  const { message, history } = await request.json();

  const enrichedInput = await enricher.enrich(message, {
    history,
    filter: (chunk) => chunk.metadata.type === 'docs',
  });

  return transport.toResponse(streamable, enrichedInput, {
    signal: request.signal,
  });
}

Multi-Agent + SSE

Route SSE streams from a multi-agent orchestrator by agent ID. Each agent streams through the same transport:

import { createMultiAgentOrchestrator, createStreamingRunner, createSSETransport } from '@directive-run/ai';
import { createAnthropicRunner, createAnthropicStreamingRunner } from '@directive-run/ai/anthropic';

const apiKey = process.env.ANTHROPIC_API_KEY!;

const researcher = { name: 'researcher', instructions: 'Research topics.', model: 'claude-sonnet-4-5-20250929' };
const writer = { name: 'writer', instructions: 'Write articles.', model: 'claude-sonnet-4-5-20250929' };

const streamRunner = createStreamingRunner(
  createAnthropicStreamingRunner({ apiKey }),
);

const orchestrator = createMultiAgentOrchestrator({
  runner: createAnthropicRunner({ apiKey }),
  agents: {
    researcher: { agent: researcher },
    writer: { agent: writer },
  },
});

const agentMap: Record<string, typeof researcher> = { researcher, writer };

const transport = createSSETransport({
  maxResponseChars: 10_000,
  heartbeatIntervalMs: 15_000,
});

// Route requests to the correct agent via a body field
export async function POST(request: Request) {
  const { message, agentId } = await request.json();
  const agentDef = agentMap[agentId];
  if (!agentDef) {
    return new Response(JSON.stringify({ error: `Unknown agent: ${agentId}` }), { status: 400 });
  }

  const streamable = {
    stream: (input: string) => streamRunner(agentDef, input),
  };

  return transport.toResponse(streamable, message, { signal: request.signal });
}

Next Steps

Previous
RAG Enricher

We care about your data. We'll never share your email.

Powered by Directive. This signup uses a Directive module with facts, derivations, constraints, and resolvers – zero useState, zero useEffect. Read how it works

Directive - Constraint-Driven State Management for TypeScript