Pattern Essence (Brief)
Multi-Agent Collaboration is a pattern where multiple agents with different roles work on one task in rounds and converge on an aligned decision.
In each role, LLM produces its contribution, while collaboration policy controls the contribution contract, round limits, and readiness criterion.
What this example demonstrates
- a team with 3 roles:
demand_analyst,finance_analyst,risk_analyst - shared context (
shared_context) for all agents - a collaboration gateway that validates agent contributions against the contract
- alignment rounds: conflict detection + consensus check
- separate policy vs execution role allowlists
- final decision (
final_decision) can bego,go_with_caution, orno_go(determined by policy) - runtime budgets:
max_rounds,max_messages,max_seconds - explicit
stop_reason,trace, andhistoryfor production monitoring
Architecture
- The coordinator forms a shared state board (
goal + shared_context). - Each role-specific LLM model returns a structured contribution (
agent/stance/summary/confidence/actions). - Gateway validates the contribution and blocks invalid or disallowed roles.
- After each round, the system finds conflicts and checks whether consensus has been reached.
- If there is no consensus, round results become context for the next round.
- After alignment, LLM synthesizes a final short operations brief.
Key contract: LLM proposes a contribution, but contribution acceptance and completion rules are controlled by the policy layer.
Project structure
examples/
└── agent-patterns/
└── multi-agent-collaboration/
└── python/
├── main.py # Team loop -> Validate -> Resolve -> Finalize
├── llm.py # role contributions + final synthesis
├── gateway.py # contribution contract + conflict/consensus policy
├── signals.py # deterministic shared context
└── requirements.txt
How to run
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/multi-agent-collaboration/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ is required.
Option via export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Option via .env (optional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
This is the shell variant (macOS/Linux). On Windows, it is easier to use environment set commands or, if desired, python-dotenv to load .env automatically.
Task
Imagine a production case before a campaign launch:
"Prepare a go/no-go brief for Checkout v2 in the US market for 2026-03-02. We need an aligned decision across growth, finance, and risk."
A single agent can easily miss part of the risks or economics here. So a team loop is used:
- each role gives its own conclusion
- the system checks disagreements
- the next round starts only if needed
- the final response appears after alignment
Solution
In this example:
- role agents work through one shared context
- gateway keeps a strict JSON contract and execution limits
detect_conflicts(...)anddecide_round_outcome(...)determine whether to continue- if the team does not align within
max_rounds, the run stops in a controlled way - if it does align, a separate finalize step composes a short final brief
Code
signals.py — shared facts for the team
from __future__ import annotations
from typing import Any
def build_shared_context(*, report_date: str, region: str) -> dict[str, Any]:
return {
"report_date": report_date,
"region": region,
"campaign": {
"name": "Checkout v2 Launch",
"window": "2026-03-02",
"channel": "US paid + lifecycle",
},
"demand_signals": {
"projected_orders": 15200,
"conversion_lift_pct": 12.4,
"traffic_risk": "medium",
},
"finance_signals": {
"projected_revenue_usd": 684000.0,
"expected_margin_pct": 19.2,
"promo_cost_usd": 94000.0,
},
"risk_signals": {
"failed_payment_rate": 0.028,
"chargeback_alerts": 4,
"critical_incidents": 0,
},
"policy_limits": {
"payment_failure_block_threshold": 0.03,
"max_chargeback_alerts_for_go": 5,
},
}
What matters most here (in plain words)
- All roles see the same
shared_context. - Facts are deterministic: separated from LLM decisions.
gateway.py — policy boundary for collaboration
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_rounds: int = 3
max_messages: int = 12
max_seconds: int = 40
min_go_votes: int = 2
ALLOWED_STANCES = {"go", "caution", "block"}
def _is_number(value: Any) -> bool:
return isinstance(value, (int, float)) and not isinstance(value, bool)
def validate_contribution(raw: Any, *, allowed_agents: set[str]) -> dict[str, Any]:
if not isinstance(raw, dict):
raise StopRun("invalid_contribution:not_object")
required = {"agent", "stance", "summary", "confidence", "actions"}
if not required.issubset(raw.keys()):
raise StopRun("invalid_contribution:missing_keys")
agent = raw["agent"]
stance = raw["stance"]
summary = raw["summary"]
confidence = raw["confidence"]
actions = raw["actions"]
if not isinstance(agent, str) or not agent.strip():
raise StopRun("invalid_contribution:agent")
agent = agent.strip()
if agent not in allowed_agents:
raise StopRun(f"invalid_contribution:agent_not_allowed:{agent}")
if not isinstance(stance, str) or stance.strip() not in ALLOWED_STANCES:
raise StopRun("invalid_contribution:stance")
stance = stance.strip()
if not isinstance(summary, str) or not summary.strip():
raise StopRun("invalid_contribution:summary")
if not _is_number(confidence):
raise StopRun("invalid_contribution:confidence_type")
confidence = float(confidence)
if not (0.0 <= confidence <= 1.0):
raise StopRun("invalid_contribution:confidence_range")
if not isinstance(actions, list) or not actions:
raise StopRun("invalid_contribution:actions")
normalized_actions: list[str] = []
for item in actions:
if not isinstance(item, str) or not item.strip():
raise StopRun("invalid_contribution:action_item")
normalized_actions.append(item.strip())
# Ignore unknown keys to tolerate extra LLM fields.
return {
"agent": agent,
"stance": stance,
"summary": summary.strip(),
"confidence": round(confidence, 3),
"actions": normalized_actions[:3],
}
def detect_conflicts(contributions: list[dict[str, Any]]) -> list[str]:
if not contributions:
return ["no_contributions"]
stances = {item["stance"] for item in contributions}
conflicts: list[str] = []
if "go" in stances and "caution" in stances and "block" not in stances:
conflicts.append("go_vs_caution")
if "block" in stances and len(stances) > 1:
conflicts.append("blocking_vs_non_block")
if len(stances) == 3:
conflicts.append("high_divergence")
return conflicts
def decide_round_outcome(
contributions: list[dict[str, Any]],
*,
min_go_votes: int,
) -> str | None:
go_votes = sum(1 for item in contributions if item["stance"] == "go")
caution_votes = sum(1 for item in contributions if item["stance"] == "caution")
block_votes = sum(1 for item in contributions if item["stance"] == "block")
if block_votes >= 2:
return "no_go"
if block_votes > 0:
return None
if go_votes >= min_go_votes and caution_votes == 0:
return "go"
if go_votes >= min_go_votes and caution_votes > 0:
return "go_with_caution"
return None
class CollaborationGateway:
def __init__(self, *, allow: set[str], budget: Budget):
self.allow = set(allow)
self.budget = budget
self.message_count = 0
def _consume_message_budget(self) -> None:
self.message_count += 1
if self.message_count > self.budget.max_messages:
raise StopRun("max_messages")
def accept(self, raw: Any, *, expected_agent: str) -> dict[str, Any]:
if expected_agent not in self.allow:
raise StopRun(f"agent_denied:{expected_agent}")
self._consume_message_budget()
contribution = validate_contribution(raw, allowed_agents=self.allow)
if contribution["agent"] != expected_agent:
raise StopRun(f"invalid_contribution:agent_mismatch:{expected_agent}")
return contribution
What matters most here (in plain words)
- Gateway decides what exactly counts as a valid team contribution.
- We ask roles not to add extra fields, but gateway tolerates them as protection from LLM "verbosity" (contract enforced by required keys).
detect_conflicts(...)anddecide_round_outcome(...)separate the concepts of conflict and readiness for finalization.- Gateway only enforces the execution allowlist passed from
main.py(policy/execution separation lives inmain.py, not in gateway).
llm.py — role contributions and final synthesis
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
COMMON_RULES = """
Return exactly one JSON object with this shape:
{
"agent": "<role_name>",
"stance": "go|caution|block",
"summary": "one short paragraph",
"confidence": 0.0,
"actions": ["action 1", "action 2"]
}
Rules:
- Use only the provided facts.
- Keep actions concrete and operational.
- Do not output markdown or extra keys.
""".strip()
AGENT_PROMPTS = {
"demand_analyst": (
"You are Demand Analyst. Focus on demand capacity, conversion, and traffic risks. "
"Decide whether launch is feasible from growth and operational demand perspective."
),
"finance_analyst": (
"You are Finance Analyst. Focus on revenue, margin, campaign cost, and downside exposure. "
"Decide if launch economics are acceptable."
),
"risk_analyst": (
"You are Risk Analyst. Focus on payment reliability, chargebacks, and incidents. "
"Prioritize safety and compliance risk containment."
),
"legal_analyst": (
"You are Legal Analyst. Focus on regulatory, compliance, consumer protection, and policy constraints. "
"Flag launch blockers and required mitigations."
),
}
FINAL_SYSTEM_PROMPT = """
You are a launch readiness editor.
Write a short operations brief in English.
Include:
- final decision (go/go_with_caution/no_go)
- why the team agreed
- top 2 immediate actions
Use only evidence from collaboration history.
""".strip()
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _round_summaries(history: list[dict[str, Any]], limit: int = 2) -> list[dict[str, Any]]:
summaries: list[dict[str, Any]] = []
for row in history[-limit:]:
summaries.append(
{
"round": row.get("round"),
"decision": row.get("decision"),
"conflicts": row.get("conflicts", []),
"stances": [
{
"agent": item.get("agent"),
"stance": item.get("stance"),
"confidence": item.get("confidence"),
}
for item in row.get("contributions", [])
],
}
)
return summaries
def propose_contribution(
*,
role: str,
goal: str,
shared_context: dict[str, Any],
history: list[dict[str, Any]],
open_conflicts: list[str],
) -> dict[str, Any]:
system = AGENT_PROMPTS.get(role)
if not system:
raise ValueError(f"unknown_role:{role}")
payload = {
"goal": goal,
"role": role,
"shared_context": shared_context,
"recent_rounds": _round_summaries(history, limit=2),
"open_conflicts": open_conflicts,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": f"{system}\n\n{COMMON_RULES}"},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"invalid": True, "raw": text}
def compose_final_answer(
*,
goal: str,
final_decision: str,
history: list[dict[str, Any]],
) -> str:
payload = {
"goal": goal,
"final_decision": final_decision,
"history": history,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = (completion.choices[0].message.content or "").strip()
if not text:
raise LLMEmpty("llm_empty")
return text
What matters most here (in plain words)
- Each role has its own system focus, but the same structured response contract.
- History of previous rounds becomes context for the next contribution.
- The final brief is generated in a separate step after alignment, not "on the fly".
main.py — Team Loop -> Resolve -> Finalize
from __future__ import annotations
import json
import time
from typing import Any
from gateway import (
Budget,
CollaborationGateway,
StopRun,
decide_round_outcome,
detect_conflicts,
)
from llm import LLMEmpty, LLMTimeout, compose_final_answer, propose_contribution
from signals import build_shared_context
REPORT_DATE = "2026-03-02"
REGION = "US"
GOAL = (
"Prepare a go/no-go launch brief for Checkout v2 campaign in US on 2026-03-02. "
"Use collaboration across demand, finance, and risk analysts; return one aligned decision."
)
BUDGET = Budget(max_rounds=3, max_messages=12, max_seconds=40, min_go_votes=2)
TEAM_ROLES_POLICY = {
"demand_analyst",
"finance_analyst",
"risk_analyst",
"legal_analyst",
}
LEGAL_ANALYST_ENABLED = False
TEAM_ROLES_EXECUTION = (
TEAM_ROLES_POLICY
if LEGAL_ANALYST_ENABLED
else {"demand_analyst", "finance_analyst", "risk_analyst"}
)
TEAM_SEQUENCE = ["demand_analyst", "finance_analyst", "risk_analyst"]
# Set LEGAL_ANALYST_ENABLED=True and append "legal_analyst" to TEAM_SEQUENCE to test runtime denial paths.
def _latest_stances(contributions: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [
{
"agent": item["agent"],
"stance": item["stance"],
"confidence": item["confidence"],
}
for item in contributions
]
def run_collaboration(goal: str) -> dict[str, Any]:
started = time.monotonic()
shared_context = build_shared_context(report_date=REPORT_DATE, region=REGION)
history: list[dict[str, Any]] = []
trace: list[dict[str, Any]] = []
open_conflicts: list[str] = []
final_decision: str | None = None
gateway = CollaborationGateway(allow=TEAM_ROLES_EXECUTION, budget=BUDGET)
for round_no in range(1, BUDGET.max_rounds + 1):
if (time.monotonic() - started) > BUDGET.max_seconds:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"trace": trace,
"history": history,
}
round_contributions: list[dict[str, Any]] = []
for role in TEAM_SEQUENCE:
try:
raw = propose_contribution(
role=role,
goal=goal,
shared_context=shared_context,
history=history,
open_conflicts=open_conflicts,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": f"round_{round_no}:{role}",
"trace": trace,
"history": history,
}
try:
contribution = gateway.accept(raw, expected_agent=role)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": f"round_{round_no}:{role}",
"raw_contribution": raw,
"trace": trace,
"history": history,
}
round_contributions.append(contribution)
trace.append(
{
"round": round_no,
"agent": role,
"stance": contribution["stance"],
"confidence": contribution["confidence"],
"accepted": True,
}
)
conflicts = detect_conflicts(round_contributions)
round_decision = decide_round_outcome(
round_contributions,
min_go_votes=BUDGET.min_go_votes,
)
history_entry = {
"round": round_no,
"contributions": round_contributions,
"conflicts": conflicts,
"decision": round_decision,
}
history.append(history_entry)
trace.append(
{
"round": round_no,
"conflicts": conflicts,
"decision": round_decision or "next_round",
}
)
if round_decision:
final_decision = round_decision
break
open_conflicts = conflicts
if not final_decision:
return {
"status": "stopped",
"stop_reason": "max_rounds_reached",
"trace": trace,
"history": history,
}
try:
answer = compose_final_answer(
goal=goal,
final_decision=final_decision,
history=history,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "finalize",
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"phase": "finalize",
"trace": trace,
"history": history,
}
last_round = history[-1]
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"final_decision": final_decision,
"rounds_used": len(history),
"team_summary": {
"report_date": REPORT_DATE,
"region": REGION,
"stances": _latest_stances(last_round["contributions"]),
"conflicts": last_round["conflicts"],
},
"trace": trace,
"history": history,
}
def main() -> None:
result = run_collaboration(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
What matters most here (in plain words)
historyis shared memory across rounds.- The completion criterion (
go/go_with_caution/no_go) is centralized in a policy function. - The difference between policy/execution roles is defined in
main.pyviaTEAM_ROLES_POLICYvsTEAM_ROLES_EXECUTION, and gateway only enforces the execution allowlist.
requirements.txt
openai==2.21.0
Example output
Below is an example of a valid run where the team aligns in round 1 and returns final_decision="go".
{
"status": "ok",
"stop_reason": "success",
"answer": "Operations Brief: Checkout v2 Campaign Launch — US, 2026-03-02. Final Decision: GO. Demand, finance, and risk agreed the launch is feasible and within policy thresholds.",
"final_decision": "go",
"rounds_used": 1,
"team_summary": {
"report_date": "2026-03-02",
"region": "US",
"stances": [
{"agent": "demand_analyst", "stance": "go", "confidence": 0.9},
{"agent": "finance_analyst", "stance": "go", "confidence": 0.9},
{"agent": "risk_analyst", "stance": "go", "confidence": 0.9}
],
"conflicts": []
},
"trace": [
{"round": 1, "agent": "demand_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
{"round": 1, "agent": "finance_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
{"round": 1, "agent": "risk_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
{"round": 1, "conflicts": [], "decision": "go"}
],
"history": [
{
"round": 1,
"contributions": [
{"agent": "demand_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["...", "..."]},
{"agent": "finance_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["..."]},
{"agent": "risk_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["..."]}
],
"conflicts": [],
"decision": "go"
}
]
}
This is a shortened example: long answer text and summary/actions in history are intentionally trimmed for readability.
Typical stop_reason values
success— run completed correctly;final_decisioncan bego,go_with_caution, orno_goinvalid_contribution:*— an agent contribution failed contract validationinvalid_contribution:actions— role returned empty or invalidactions(this example requires 1-3 actions)agent_denied:<role>— role is not allowed by the execution allowlistllm_timeout— LLM did not respond withinOPENAI_TIMEOUT_SECONDSllm_empty— finalize step returned empty textmax_messages— message budget between roles was exceededmax_rounds_reached— the team did not reach a decision (final_decision) withinmax_roundsmax_seconds— total run time budget was exceeded
What is NOT shown here
- There are no real domain APIs or live data.
- There is no external conflict resolver (for example, a human reviewer).
- There is no multi-tenant auth/ACL for roles and data sources.
- There is no adaptive team sizing (dynamic add/remove of roles).
What to try next
- Add
legal_analysttoTEAM_SEQUENCEwithLEGAL_ANALYST_ENABLED=Falseand observeagent_denied:legal_analystinstop_reason. - Set
min_go_votes=3to require full consensus before finalization. - Add a human escalation rule if
blockrepeats for 2 rounds in a row.