When Multi-Agent Systems Are Actually Worth It
Multi-agent systems are genuinely harder to build, debug, and operate than single-agent systems. Use them only when you have a real need:
Parallel tasks. If your workflow has three independent sub-tasks that each take 10 seconds, running them in sequence takes 30 seconds. Running three agents in parallel takes 10 seconds. When latency matters and sub-tasks are independent, parallelism wins.
Specialization. A single agent with a 5,000-word system prompt is worse than five agents each with a focused 500-word system prompt. When your task requires distinct domain expertise (research, writing, code review, legal compliance), give each concern its own agent.
Fault isolation. If the code-review agent fails, the documentation agent shouldn’t fail with it. Specialized agents can retry or degrade independently without cascading failures.
Context window management. A single agent on a long multi-step task eventually fills its context window. Chaining agents resets the context at each handoff, keeping each agent’s working set small.
Pattern 1: Supervisor / Orchestrator
The supervisor pattern is the most common and most flexible. One orchestrator agent receives the user request, decides which specialist agents to invoke, passes them tasks, collects their outputs, and assembles the final response.
The orchestrator is stateful - it knows what has been done and what remains. The specialist agents are stateless - they receive a task, return a result.
Supervisor Pattern: Orchestrator Dispatches to Specialists
flowchart TD
U([User Request]) --> ORCH[Orchestrator Agent
Plans and dispatches]
ORCH -->|research task| RA[Researcher Agent
Web search + summarize]
ORCH -->|writing task| WA[Writer Agent
Draft from outline]
ORCH -->|review task| REV[Review Agent
Fact-check and critique]
RA -->|research results| ORCH
WA -->|draft content| ORCH
REV -->|review notes| ORCH
ORCH --> VAL{Validation
All tasks complete?}
VAL -->|Yes| OUT([Final Response])
VAL -->|No| ORCH
style U fill:#dbeafe,stroke:#2563eb,color:#1d4ed8
style ORCH fill:#fef3c7,stroke:#d97706,color:#92400e
style RA fill:#f3e8ff,stroke:#7c3aed,color:#7c3aed
style WA fill:#f3e8ff,stroke:#7c3aed,color:#7c3aed
style REV fill:#f3e8ff,stroke:#7c3aed,color:#7c3aed
style OUT fill:#dcfce7,stroke:#16a34a,color:#15803d
flowchart TD
U([User Request]) --> ORCH[Orchestrator Agent
Plans and dispatches]
ORCH -->|research task| RA[Researcher Agent
Web search + summarize]
ORCH -->|writing task| WA[Writer Agent
Draft from outline]
ORCH -->|review task| REV[Review Agent
Fact-check and critique]
RA -->|research results| ORCH
WA -->|draft content| ORCH
REV -->|review notes| ORCH
ORCH --> VAL{Validation
All tasks complete?}
VAL -->|Yes| OUT([Final Response])
VAL -->|No| ORCH
style U fill:#dbeafe,stroke:#2563eb,color:#1d4ed8
style ORCH fill:#fef3c7,stroke:#d97706,color:#92400e
style RA fill:#f3e8ff,stroke:#7c3aed,color:#7c3aed
style WA fill:#f3e8ff,stroke:#7c3aed,color:#7c3aed
style REV fill:#f3e8ff,stroke:#7c3aed,color:#7c3aed
style OUT fill:#dcfce7,stroke:#16a34a,color:#15803d
Key design decisions for the supervisor:
- The orchestrator prompt must include the list of available agents, their capabilities, and when to use each
- The orchestrator decides the order of operations - it should be explicit in its plan before dispatching
- The orchestrator should validate each specialist’s output before proceeding
Pattern 2: Parallel Dispatch with Merge
When sub-tasks are independent, run them in parallel. The orchestrator fans out to N agents simultaneously, then merges the results when all complete.
Parallel Dispatch: Fan-Out and Merge
flowchart LR Q([Query]) --> SPLIT[Decompose Into parallel tasks] SPLIT --> A1[Agent 1 Task A] SPLIT --> A2[Agent 2 Task B] SPLIT --> A3[Agent 3 Task C] A1 -->|Result A| MERGE[Merge Agent Synthesize results] A2 -->|Result B| MERGE A3 -->|Result C| MERGE MERGE --> OUT([Unified Response]) style Q fill:#dbeafe,stroke:#2563eb,color:#1d4ed8 style SPLIT fill:#fef3c7,stroke:#d97706,color:#92400e style MERGE fill:#fef3c7,stroke:#d97706,color:#92400e style OUT fill:#dcfce7,stroke:#16a34a,color:#15803dflowchart LR Q([Query]) --> SPLIT[Decompose Into parallel tasks] SPLIT --> A1[Agent 1 Task A] SPLIT --> A2[Agent 2 Task B] SPLIT --> A3[Agent 3 Task C] A1 -->|Result A| MERGE[Merge Agent Synthesize results] A2 -->|Result B| MERGE A3 -->|Result C| MERGE MERGE --> OUT([Unified Response]) style Q fill:#dbeafe,stroke:#2563eb,color:#1d4ed8 style SPLIT fill:#fef3c7,stroke:#d97706,color:#92400e style MERGE fill:#fef3c7,stroke:#d97706,color:#92400e style OUT fill:#dcfce7,stroke:#16a34a,color:#15803d
The merge agent is often the most complex. It must:
- Detect contradictions between results (Agent 1 says X, Agent 2 says not-X)
- Weight results by source reliability if agents have different trust levels
- Produce a coherent unified output, not a concatenation
In Python, run parallel agents with asyncio.gather(). Each agent call becomes a coroutine. await asyncio.gather(task_a(), task_b(), task_c()) runs all three concurrently. For CPU-bound work, use concurrent.futures.ThreadPoolExecutor instead - asyncio is for I/O-bound (network) tasks.
Pattern 3: Sequential Pipeline with Handoffs
In the sequential pattern, each agent’s output becomes the next agent’s input. This is appropriate when each step enriches, transforms, or validates the previous step’s output.
Sequential Pipeline with Validation Gates
flowchart LR
I([Raw Input]) --> A1
A1[Extractor Agent
Parse entities and intent]
A1 -->|structured data| V1{Valid?}
V1 -->|No| ERR1([Error: extraction failed])
V1 -->|Yes| A2
A2[Enricher Agent
Fetch additional context]
A2 -->|enriched data| V2{Valid?}
V2 -->|No| ERR2([Error: enrichment failed])
V2 -->|Yes| A3
A3[Generator Agent
Produce output]
A3 --> OUT([Final Output])
style I fill:#dbeafe,stroke:#2563eb,color:#1d4ed8
style V1 fill:#fef3c7,stroke:#d97706,color:#92400e
style V2 fill:#fef3c7,stroke:#d97706,color:#92400e
style OUT fill:#dcfce7,stroke:#16a34a,color:#15803d
style ERR1 fill:#fee2e2,stroke:#dc2626,color:#991b1b
style ERR2 fill:#fee2e2,stroke:#dc2626,color:#991b1b
flowchart LR
I([Raw Input]) --> A1
A1[Extractor Agent
Parse entities and intent]
A1 -->|structured data| V1{Valid?}
V1 -->|No| ERR1([Error: extraction failed])
V1 -->|Yes| A2
A2[Enricher Agent
Fetch additional context]
A2 -->|enriched data| V2{Valid?}
V2 -->|No| ERR2([Error: enrichment failed])
V2 -->|Yes| A3
A3[Generator Agent
Produce output]
A3 --> OUT([Final Output])
style I fill:#dbeafe,stroke:#2563eb,color:#1d4ed8
style V1 fill:#fef3c7,stroke:#d97706,color:#92400e
style V2 fill:#fef3c7,stroke:#d97706,color:#92400e
style OUT fill:#dcfce7,stroke:#16a34a,color:#15803d
style ERR1 fill:#fee2e2,stroke:#dc2626,color:#991b1b
style ERR2 fill:#fee2e2,stroke:#dc2626,color:#991b1b
Validation gates between agents are not optional. Each agent should check the output of the previous agent before processing it. A pipeline without gates propagates errors silently and produces meaningless final output.
State Management Across Agents
In a multi-agent system, state is the shared ground truth that all agents read from and write to. Without explicit state management, agents work with inconsistent views of the world.
State schema design principles:
- Use a typed dictionary or Pydantic model - never a plain dict with string keys
- Make state append-only where possible - agents add results, never delete previous results
- Store the agent name and timestamp with each result - know who wrote what when
- Include a
statusfield that tracks task completion
from pydantic import BaseModel
from datetime import datetime
class AgentResult(BaseModel):
agent_name: str
task: str
output: str
timestamp: datetime
confidence: float # 0.0-1.0
class PipelineState(BaseModel):
original_query: str
results: list[AgentResult] = []
status: str = "pending" # pending | in_progress | complete | failed
error: str | None = None
Failure Handling
Multi-agent systems fail in more ways than single agents. Design your failure modes explicitly:
| Failure Type | Response |
|---|---|
| Agent timeout | Return partial results + warning |
| Agent returns invalid format | Retry once with corrected prompt |
| Agent confidence < threshold | Flag for human review |
| Orchestrator loop exceeds max steps | Halt, return best-effort result |
Implement a maximum step count on your orchestrator. An orchestrator that loops - calling agents, evaluating results, deciding to call agents again - can run indefinitely if no clear completion condition is met. Set max_iterations = 10 and enforce it. Log a warning when the limit is hit so you can investigate whether your orchestrator’s reasoning is cycling.
Code: Simple Supervisor with Researcher and Writer
Supervisor Agent Dispatching to Researcher and Writer
Example code (static). Copy and run locally in your own environment.
import asyncio
import json
from typing import Any
# ── Minimal agent abstraction ─────────────────────────────────────────────────
class Agent:
def __init__(self, name: str, system_prompt: str):
self.name = name
self.system_prompt = system_prompt
async def run(self, task: str, context: dict[str, Any] = {}) -> dict[str, Any]:
"""
In production: call your LLM API here.
We simulate responses for the demo.
"""
await asyncio.sleep(0.1) # simulate API latency
return self._simulate_response(task, context)
def _simulate_response(self, task: str, context: dict) -> dict[str, Any]:
if self.name == "researcher":
return {
"agent": self.name,
"task": task,
"findings": [
f"Finding 1: Key data point about '{task[:30]}'",
f"Finding 2: Supporting evidence with recent statistics",
f"Finding 3: Counterpoint to consider",
],
"sources": ["source_a.com", "source_b.org"],
"confidence": 0.85,
}
elif self.name == "writer":
findings = context.get("findings", [])
return {
"agent": self.name,
"task": task,
"draft": (
f"# {task.title()}\n\n"
+ "\n".join(f"- {f}" for f in findings)
+ "\n\nConclusion: Based on the research above, we can conclude..."
),
"word_count": 150,
"confidence": 0.90,
}
else:
return {"agent": self.name, "error": "unknown agent"}
# ── Pipeline state ─────────────────────────────────────────────────────────────
class PipelineState:
def __init__(self, query: str):
self.query = query
self.research: dict[str, Any] = {}
self.draft: dict[str, Any] = {}
self.status = "pending"
self.steps_taken = 0
self.MAX_STEPS = 10
def record_step(self):
self.steps_taken += 1
if self.steps_taken > self.MAX_STEPS:
raise RuntimeError(f"Orchestrator exceeded {self.MAX_STEPS} steps")
# ── Specialist agents ──────────────────────────────────────────────────────────
researcher = Agent(
name="researcher",
system_prompt=(
"You are a research specialist. Given a topic, produce a list of "
"3-5 key findings with sources. Be factual and concise."
),
)
writer = Agent(
name="writer",
system_prompt=(
"You are a technical writer. Given research findings, produce a "
"clear, structured document. Use headers and bullet points."
),
)
# ── Supervisor orchestrator ────────────────────────────────────────────────────
async def supervisor(query: str) -> str:
state = PipelineState(query)
state.status = "in_progress"
print(f"[Supervisor] Starting pipeline for: '{query}'")
# Step 1: Delegate research
state.record_step()
print("[Supervisor] Dispatching to researcher...")
state.research = await researcher.run(task=query)
if state.research.get("confidence", 0) < 0.5:
state.status = "failed"
return "Research confidence too low - cannot proceed."
print(f"[Researcher] Found {len(state.research['findings'])} findings")
# Step 2: Delegate writing with research context
state.record_step()
print("[Supervisor] Dispatching to writer with research context...")
state.draft = await writer.run(
task=f"Write an article about: {query}",
context={"findings": state.research["findings"]},
)
print(f"[Writer] Draft complete ({state.draft['word_count']} words)")
# Step 3: Assemble final output
state.status = "complete"
final = {
"query": query,
"sources": state.research.get("sources", []),
"document": state.draft.get("draft", ""),
"pipeline_steps": state.steps_taken,
}
return json.dumps(final, indent=2)
# ── Run ────────────────────────────────────────────────────────────────────────
async def main():
result = await supervisor("the impact of vector databases on enterprise AI")
print("\n[Final Output]")
print(result)
asyncio.run(main())
Agent networks amplify errors. If agent A produces a wrong intermediate result, every downstream agent works from that wrong foundation. Add validation checkpoints between agents, not just at the end. A validation gate that catches a bad extraction result after step 1 costs one retry. Discovering the same error after step 5 costs a full pipeline re-run. Validate early, validate at every handoff.
Interview Notes: Agent Topology
Supervisor, swarm, sequential pipeline, and blackboard patterns trade off control, latency, cost, and debuggability. For enterprise systems, prefer explicit state graphs, bounded delegation, and trace propagation over unbounded autonomous collaboration.
Interview Practice
- Compare supervisor, sequential, parallel, and blackboard multi-agent patterns.
- Why do multi-agent systems need shared state and trace IDs?
- How do you prevent unbounded agent delegation?
- When should agents run in parallel?
- How do you design fallback behavior for a failed specialist agent?
- What should be evaluated: final answer, trajectory, or both?