Skip to main content

Data Fetching

3 min read

Subscriptions

Subscriptions handle push-based data – WebSocket connections, Server-Sent Events, AI streaming responses. They produce the same ResourceState<T> as queries, with automatic lifecycle management.


Basic Subscription

import { createSubscription } from "@directive-run/query";

const prices = createSubscription({
  name: "prices",
  key: (facts) => facts.ticker ? { ticker: facts.ticker } : null,
  subscribe: (params, { onData, onError, signal }) => {
    const ws = new WebSocket(`wss://api.example.com/${params.ticker}`);
    ws.onmessage = (e) => onData(JSON.parse(e.data));
    ws.onerror = () => onError(new Error("Connection lost"));
    signal.addEventListener("abort", () => ws.close());
    return () => ws.close(); // cleanup function
  },
});

How It Works

  1. When key returns non-null, the subscribe function is called
  2. Push data via onData() – each call updates the ResourceState
  3. Report errors via onError() – sets error state
  4. The signal fires when the key changes or the system stops
  5. Return a cleanup function for resource teardown

Callbacks

Replace Data

subscribe: (params, { onData }) => {
  ws.onmessage = (e) => onData(JSON.parse(e.data)); // replaces current data
};

Accumulate Data

subscribe: (params, { onData }) => {
  ws.onmessage = (e) => {
    onData((current) => [...(current || []), JSON.parse(e.data)]); // updater function
  };
};

Server-Sent Events

const notifications = createSubscription({
  name: "notifications",
  key: () => ({ all: true }),
  subscribe: (params, { onData, onError, signal }) => {
    const source = new EventSource("/api/notifications");
    source.onmessage = (e) => onData(JSON.parse(e.data));
    source.onerror = () => onError(new Error("SSE connection lost"));
    signal.addEventListener("abort", () => source.close());
    return () => source.close();
  },
});

AI Streaming

Subscriptions are ideal for streaming LLM responses:

const chat = createSubscription({
  name: "chat",
  key: (facts) => facts.prompt ? { prompt: facts.prompt } : null,
  subscribe: (params, { onData, onError, signal }) => {
    let fullResponse = "";

    fetch("/api/chat", {
      method: "POST",
      body: JSON.stringify({ prompt: params.prompt }),
      signal,
    }).then(async (res) => {
      const reader = res.body.getReader();
      const decoder = new TextDecoder();

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        fullResponse += decoder.decode(value);
        onData(fullResponse); // update on each chunk
      }
    }).catch((err) => {
      if (!signal.aborted) onError(err);
    });
  },
});

With createQuerySystem

const app = createQuerySystem({
  facts: { ticker: "" },
  subscriptions: {
    prices: {
      key: (f) => f.ticker ? { ticker: f.ticker } : null,
      subscribe: (params, { onData, onError, signal }) => {
        const ws = new WebSocket(`wss://api.example.com/${params.ticker}`);
        ws.onmessage = (e) => onData(JSON.parse(e.data));
        ws.onerror = () => onError(new Error("Connection lost"));
        signal.addEventListener("abort", () => ws.close());
        return () => ws.close();
      },
    },
  },
});

app.facts.ticker = "AAPL"; // subscription starts
app.read("prices");         // { data: { price: 150.25 }, isSuccess: true, ... }
app.subscriptions.prices.setData({ price: 200 }); // manual data push

With AI Single-Agent Orchestrator

Combine subscriptions with @directive-run/ai for streaming agent responses:

import { createQuerySystem } from "@directive-run/query";
import { createAgentOrchestrator } from "@directive-run/ai";

const app = createQuerySystem({
  facts: { prompt: "" },
  subscriptions: {
    agent: {
      key: (f) => f.prompt ? { prompt: f.prompt } : null,
      subscribe: (params, { onData, onError, signal }) => {
        const orchestrator = createAgentOrchestrator({ /* config */ });
        orchestrator.stream(params.prompt, {
          onToken: (token) => onData((prev) => (prev || "") + token),
          onError: (err) => onError(err),
          signal,
        });
        return () => orchestrator.abort();
      },
    },
  },
});

app.facts.prompt = "Analyze this data..."; // agent starts streaming
Previous
Mutations

Stay in the loop. Sign up for our newsletter.

We care about your data. We'll never share your email.

Powered by Directive. This signup uses a Directive module with facts, derivations, constraints, and resolvers – zero useState, zero useEffect. Read how it works

Directive - Constraint-Driven Runtime for TypeScript | AI Guardrails & State Management