Skip to main content

Guides

4 min read

DAG Pipeline

Build workflows where agents run in parallel, depend on upstream results, and branch conditionally.


The Problem

Your pipeline isn't linear. A researcher and a fact-checker can run in parallel. A writer depends on both of their outputs. If the fact-checker fails, you want to skip the writer but still get the research. Sequential pipelines waste time; manual Promise orchestration is brittle.

The Solution

Use the dag() pattern to define nodes with dependencies, conditional edges, and error strategies:

import { dag } from '@directive-run/ai';

const contentPipeline = dag(
  {
    researcher: {
      agent: 'researcher',
    },
    factChecker: {
      agent: 'fact-checker',
    },
    writer: {
      agent: 'writer',
      deps: ['researcher', 'factChecker'],
      transform: (context) => {
        const research = context.outputs.researcher;
        const facts = context.outputs.factChecker;

        return `Research:\n${research}\n\nVerified facts:\n${facts}`;
      },
    },
    editor: {
      agent: 'editor',
      deps: ['writer'],
    },
  },
  // Merge function: combine all outputs into a final result
  (context) => ({
    research: context.outputs.researcher,
    facts: context.outputs.factChecker,
    draft: context.outputs.writer,
    final: context.outputs.editor,
  }),
  {
    onNodeError: 'skip-downstream',
    maxConcurrent: 3,
  },
);

// Run with: orchestrator.runPattern('contentPipeline', input) — see Full Example below

How It Works

    ┌────────────┐   ┌──────────────┐
    │ researcher │   │ factChecker  │
    └──────┬─────┘   └──────┬───────┘
           └────────┬───────┘

             ┌────────────┐
             │   writer   │
             └──────┬─────┘

             ┌────────────┐
             │   editor   │
             └──────┬─────┘
                    ╎ conditional

             ┌────────────┐
             │    seo     │
             └────────────┘
  • Nodes define agents and their dependencies. Nodes with no deps run immediately. Nodes with deps wait for all dependencies to complete.
  • transform shapes the input for a node based on upstream outputs. The context object has outputs, statuses, errors, and the original input.
  • deps creates edges in the DAG. writer waits for both researcher and factChecker.
  • onNodeError controls failure behavior:
    • "fail" — abort the entire DAG on any node failure
    • "skip-downstream" — skip nodes that depend on the failed node, but run everything else
    • "continue" — run everything, passing undefined for failed upstream outputs
  • maxConcurrent limits how many nodes run simultaneously.
  • when adds conditional edges — a node only runs if the condition is true.

Full Example

A content pipeline with conditional review and timeout handling:

import { createMultiAgentOrchestrator, dag } from '@directive-run/ai';

const orchestrator = createMultiAgentOrchestrator({
  runner, // See Running Agents (/ai/running-agents) for setup
  agents: {
    researcher: {
      agent: { name: 'researcher', instructions: 'Research the topic. Return structured findings.' },
    },
    'fact-checker': {
      agent: { name: 'fact-checker', instructions: 'Verify claims. Return confirmed and unconfirmed facts.' },
    },
    writer: {
      agent: { name: 'writer', instructions: 'Write a blog post from research and verified facts.' },
    },
    editor: {
      agent: { name: 'editor', instructions: 'Edit for clarity and grammar. Return final version.' },
    },
    'seo-optimizer': {
      agent: { name: 'seo-optimizer', instructions: 'Optimize the final post for SEO. Add meta description and keywords.' },
    },
  },
  patterns: {
    contentPipeline: dag(
      {
        researcher: {
          agent: 'researcher',
          timeout: 30000,
        },
        factChecker: {
          agent: 'fact-checker',
          timeout: 20000,
        },
        writer: {
          agent: 'writer',
          deps: ['researcher', 'factChecker'],
          transform: (context) => {
            const research = context.outputs.researcher;
            const facts = context.outputs.factChecker;

            return `Research:\n${research}\n\nFacts:\n${facts}`;
          },
        },
        editor: {
          agent: 'editor',
          deps: ['writer'],
        },
        seo: {
          agent: 'seo-optimizer',
          deps: ['editor'],
          // Only run SEO if the input requested it
          when: (context) => context.input.includes('[SEO]'),
        },
      },
      (context) => ({
        research: context.outputs.researcher,
        facts: context.outputs.factChecker,
        draft: context.outputs.writer,
        final: context.outputs.editor ?? context.outputs.writer,
        seo: context.outputs.seo,
        statuses: context.statuses,
      }),
      {
        onNodeError: 'skip-downstream',
        maxConcurrent: 2,
        timeout: 120000,
      },
    ),
  },
});

// Run the pipeline
const result = await orchestrator.runPattern(
  'contentPipeline',
  '[SEO] Write about quantum computing breakthroughs in 2025'
);

console.log('Statuses:', result.statuses);
// { researcher: 'completed', factChecker: 'completed', writer: 'completed', editor: 'completed', seo: 'completed' }

// If fact-checker failed with skip-downstream:
// { researcher: 'completed', factChecker: 'failed', writer: 'skipped', editor: 'skipped', seo: 'skipped' }
Previous
Smart Model Routing

We care about your data. We'll never share your email.

Powered by Directive. This signup uses a Directive module with facts, derivations, constraints, and resolvers – zero useState, zero useEffect. Read how it works

Directive - Constraint-Driven State Management for TypeScript