GenAI Foundations / Advanced Track Module 2 / 15
GenAI Foundations Advanced ⏱ 45 min
DEV

Multi-Agent Systems and Orchestration Patterns

Supervisor, parallel, and sequential multi-agent patterns. Design systems where specialized agents collaborate, with state management and failure handling.

How to Use This Lesson

  • Start with the user problem, then map the pattern to architecture and failure modes.
  • If a code or design example is included, change one assumption and reason through the impact.
  • Use role callouts, checklists, and Q&A sections as implementation or interview prep notes.

Prerequisites: intermediate/02-ai-agents-from-zero

When Multi-Agent Systems Are Actually Worth It

Multi-agent systems are genuinely harder to build, debug, and operate than single-agent systems. Use them only when you have a real need:

Parallel tasks. If your workflow has three independent sub-tasks that each take 10 seconds, running them in sequence takes 30 seconds. Running three agents in parallel takes 10 seconds. When latency matters and sub-tasks are independent, parallelism wins.

Specialization. A single agent with a 5,000-word system prompt is worse than five agents each with a focused 500-word system prompt. When your task requires distinct domain expertise (research, writing, code review, legal compliance), give each concern its own agent.

Fault isolation. If the code-review agent fails, the documentation agent shouldn’t fail with it. Specialized agents can retry or degrade independently without cascading failures.

Context window management. A single agent on a long multi-step task eventually fills its context window. Chaining agents resets the context at each handoff, keeping each agent’s working set small.

Pattern 1: Supervisor / Orchestrator

The supervisor pattern is the most common and most flexible. One orchestrator agent receives the user request, decides which specialist agents to invoke, passes them tasks, collects their outputs, and assembles the final response.

The orchestrator is stateful - it knows what has been done and what remains. The specialist agents are stateless - they receive a task, return a result.

Supervisor Pattern: Orchestrator Dispatches to Specialists

flowchart TD
  U([User Request]) --> ORCH[Orchestrator Agent
Plans and dispatches]

  ORCH -->|research task| RA[Researcher Agent
Web search + summarize]
  ORCH -->|writing task| WA[Writer Agent
Draft from outline]
  ORCH -->|review task| REV[Review Agent
Fact-check and critique]

  RA -->|research results| ORCH
  WA -->|draft content| ORCH
  REV -->|review notes| ORCH

  ORCH --> VAL{Validation
All tasks complete?}
  VAL -->|Yes| OUT([Final Response])
  VAL -->|No| ORCH

  style U fill:#dbeafe,stroke:#2563eb,color:#1d4ed8
  style ORCH fill:#fef3c7,stroke:#d97706,color:#92400e
  style RA fill:#f3e8ff,stroke:#7c3aed,color:#7c3aed
  style WA fill:#f3e8ff,stroke:#7c3aed,color:#7c3aed
  style REV fill:#f3e8ff,stroke:#7c3aed,color:#7c3aed
  style OUT fill:#dcfce7,stroke:#16a34a,color:#15803d
Code copied! Link copied!

Key design decisions for the supervisor:

  • The orchestrator prompt must include the list of available agents, their capabilities, and when to use each
  • The orchestrator decides the order of operations - it should be explicit in its plan before dispatching
  • The orchestrator should validate each specialist’s output before proceeding

Pattern 2: Parallel Dispatch with Merge

When sub-tasks are independent, run them in parallel. The orchestrator fans out to N agents simultaneously, then merges the results when all complete.

Parallel Dispatch: Fan-Out and Merge

flowchart LR
  Q([Query]) --> SPLIT[Decompose
Into parallel tasks]

  SPLIT --> A1[Agent 1
Task A]
  SPLIT --> A2[Agent 2
Task B]
  SPLIT --> A3[Agent 3
Task C]

  A1 -->|Result A| MERGE[Merge Agent
Synthesize results]
  A2 -->|Result B| MERGE
  A3 -->|Result C| MERGE

  MERGE --> OUT([Unified Response])

  style Q fill:#dbeafe,stroke:#2563eb,color:#1d4ed8
  style SPLIT fill:#fef3c7,stroke:#d97706,color:#92400e
  style MERGE fill:#fef3c7,stroke:#d97706,color:#92400e
  style OUT fill:#dcfce7,stroke:#16a34a,color:#15803d
Code copied! Link copied!

The merge agent is often the most complex. It must:

  • Detect contradictions between results (Agent 1 says X, Agent 2 says not-X)
  • Weight results by source reliability if agents have different trust levels
  • Produce a coherent unified output, not a concatenation
asyncio for Parallel Agents

In Python, run parallel agents with asyncio.gather(). Each agent call becomes a coroutine. await asyncio.gather(task_a(), task_b(), task_c()) runs all three concurrently. For CPU-bound work, use concurrent.futures.ThreadPoolExecutor instead - asyncio is for I/O-bound (network) tasks.

Pattern 3: Sequential Pipeline with Handoffs

In the sequential pattern, each agent’s output becomes the next agent’s input. This is appropriate when each step enriches, transforms, or validates the previous step’s output.

Sequential Pipeline with Validation Gates

flowchart LR
  I([Raw Input]) --> A1

  A1[Extractor Agent
Parse entities and intent]
  A1 -->|structured data| V1{Valid?}
  V1 -->|No| ERR1([Error: extraction failed])
  V1 -->|Yes| A2

  A2[Enricher Agent
Fetch additional context]
  A2 -->|enriched data| V2{Valid?}
  V2 -->|No| ERR2([Error: enrichment failed])
  V2 -->|Yes| A3

  A3[Generator Agent
Produce output]
  A3 --> OUT([Final Output])

  style I fill:#dbeafe,stroke:#2563eb,color:#1d4ed8
  style V1 fill:#fef3c7,stroke:#d97706,color:#92400e
  style V2 fill:#fef3c7,stroke:#d97706,color:#92400e
  style OUT fill:#dcfce7,stroke:#16a34a,color:#15803d
  style ERR1 fill:#fee2e2,stroke:#dc2626,color:#991b1b
  style ERR2 fill:#fee2e2,stroke:#dc2626,color:#991b1b
Code copied! Link copied!

Validation gates between agents are not optional. Each agent should check the output of the previous agent before processing it. A pipeline without gates propagates errors silently and produces meaningless final output.

State Management Across Agents

In a multi-agent system, state is the shared ground truth that all agents read from and write to. Without explicit state management, agents work with inconsistent views of the world.

State schema design principles:

  • Use a typed dictionary or Pydantic model - never a plain dict with string keys
  • Make state append-only where possible - agents add results, never delete previous results
  • Store the agent name and timestamp with each result - know who wrote what when
  • Include a status field that tracks task completion
from pydantic import BaseModel
from datetime import datetime

class AgentResult(BaseModel):
    agent_name: str
    task: str
    output: str
    timestamp: datetime
    confidence: float  # 0.0-1.0

class PipelineState(BaseModel):
    original_query: str
    results: list[AgentResult] = []
    status: str = "pending"  # pending | in_progress | complete | failed
    error: str | None = None

Failure Handling

Multi-agent systems fail in more ways than single agents. Design your failure modes explicitly:

Failure TypeResponse
Agent timeoutReturn partial results + warning
Agent returns invalid formatRetry once with corrected prompt
Agent confidence < thresholdFlag for human review
Orchestrator loop exceeds max stepsHalt, return best-effort result
⚙️ For Developers

Implement a maximum step count on your orchestrator. An orchestrator that loops - calling agents, evaluating results, deciding to call agents again - can run indefinitely if no clear completion condition is met. Set max_iterations = 10 and enforce it. Log a warning when the limit is hit so you can investigate whether your orchestrator’s reasoning is cycling.

Code: Simple Supervisor with Researcher and Writer

Supervisor Agent Dispatching to Researcher and Writer

Example code (static). Copy and run locally in your own environment.

import asyncio
import json
from typing import Any

# ── Minimal agent abstraction ─────────────────────────────────────────────────
class Agent:
  def __init__(self, name: str, system_prompt: str):
      self.name = name
      self.system_prompt = system_prompt

  async def run(self, task: str, context: dict[str, Any] = {}) -> dict[str, Any]:
      """
      In production: call your LLM API here.
      We simulate responses for the demo.
      """
      await asyncio.sleep(0.1)  # simulate API latency
      return self._simulate_response(task, context)

  def _simulate_response(self, task: str, context: dict) -> dict[str, Any]:
      if self.name == "researcher":
          return {
              "agent": self.name,
              "task": task,
              "findings": [
                  f"Finding 1: Key data point about '{task[:30]}'",
                  f"Finding 2: Supporting evidence with recent statistics",
                  f"Finding 3: Counterpoint to consider",
              ],
              "sources": ["source_a.com", "source_b.org"],
              "confidence": 0.85,
          }
      elif self.name == "writer":
          findings = context.get("findings", [])
          return {
              "agent": self.name,
              "task": task,
              "draft": (
                  f"# {task.title()}\n\n"
                  + "\n".join(f"- {f}" for f in findings)
                  + "\n\nConclusion: Based on the research above, we can conclude..."
              ),
              "word_count": 150,
              "confidence": 0.90,
          }
      else:
          return {"agent": self.name, "error": "unknown agent"}


# ── Pipeline state ─────────────────────────────────────────────────────────────
class PipelineState:
  def __init__(self, query: str):
      self.query = query
      self.research: dict[str, Any] = {}
      self.draft: dict[str, Any] = {}
      self.status = "pending"
      self.steps_taken = 0
      self.MAX_STEPS = 10

  def record_step(self):
      self.steps_taken += 1
      if self.steps_taken > self.MAX_STEPS:
          raise RuntimeError(f"Orchestrator exceeded {self.MAX_STEPS} steps")


# ── Specialist agents ──────────────────────────────────────────────────────────
researcher = Agent(
  name="researcher",
  system_prompt=(
      "You are a research specialist. Given a topic, produce a list of "
      "3-5 key findings with sources. Be factual and concise."
  ),
)

writer = Agent(
  name="writer",
  system_prompt=(
      "You are a technical writer. Given research findings, produce a "
      "clear, structured document. Use headers and bullet points."
  ),
)


# ── Supervisor orchestrator ────────────────────────────────────────────────────
async def supervisor(query: str) -> str:
  state = PipelineState(query)
  state.status = "in_progress"

  print(f"[Supervisor] Starting pipeline for: '{query}'")

  # Step 1: Delegate research
  state.record_step()
  print("[Supervisor] Dispatching to researcher...")
  state.research = await researcher.run(task=query)

  if state.research.get("confidence", 0) < 0.5:
      state.status = "failed"
      return "Research confidence too low  -  cannot proceed."

  print(f"[Researcher] Found {len(state.research['findings'])} findings")

  # Step 2: Delegate writing with research context
  state.record_step()
  print("[Supervisor] Dispatching to writer with research context...")
  state.draft = await writer.run(
      task=f"Write an article about: {query}",
      context={"findings": state.research["findings"]},
  )

  print(f"[Writer] Draft complete ({state.draft['word_count']} words)")

  # Step 3: Assemble final output
  state.status = "complete"
  final = {
      "query": query,
      "sources": state.research.get("sources", []),
      "document": state.draft.get("draft", ""),
      "pipeline_steps": state.steps_taken,
  }

  return json.dumps(final, indent=2)


# ── Run ────────────────────────────────────────────────────────────────────────
async def main():
  result = await supervisor("the impact of vector databases on enterprise AI")
  print("\n[Final Output]")
  print(result)

asyncio.run(main())
Production Gotcha: Agent Networks Amplify Errors

Agent networks amplify errors. If agent A produces a wrong intermediate result, every downstream agent works from that wrong foundation. Add validation checkpoints between agents, not just at the end. A validation gate that catches a bad extraction result after step 1 costs one retry. Discovering the same error after step 5 costs a full pipeline re-run. Validate early, validate at every handoff.

Interview Notes: Agent Topology

Supervisor, swarm, sequential pipeline, and blackboard patterns trade off control, latency, cost, and debuggability. For enterprise systems, prefer explicit state graphs, bounded delegation, and trace propagation over unbounded autonomous collaboration.

Interview Practice

  1. Compare supervisor, sequential, parallel, and blackboard multi-agent patterns.
  2. Why do multi-agent systems need shared state and trace IDs?
  3. How do you prevent unbounded agent delegation?
  4. When should agents run in parallel?
  5. How do you design fallback behavior for a failed specialist agent?
  6. What should be evaluated: final answer, trajectory, or both?