Essence du pattern (bref)
Multi-Agent Collaboration est un pattern où plusieurs agents avec des rôles différents travaillent sur une même tâche par rounds et convergent vers une décision alignée.
Dans chaque rôle, le LLM produit sa contribution, et la collaboration policy contrôle le contrat de contribution, les limites de rounds et le critère de préparation.
Ce que cet exemple démontre
- une équipe de 3 rôles :
demand_analyst,finance_analyst,risk_analyst - un contexte partagé (
shared_context) pour tous les agents - un collaboration gateway qui valide les contributions des agents selon le contrat
- rounds d’alignement : conflict detection + vérification du consensus
- allowlists séparées des rôles pour policy vs execution
- la décision finale (
final_decision) peut êtrego,go_with_cautionouno_go(déterminée par policy) - budgets runtime :
max_rounds,max_messages,max_seconds stop_reason,trace,historyexplicites pour le monitoring production
Architecture
- Le coordinateur construit un tableau d’état partagé (
goal + shared_context). - Chaque modèle LLM de rôle renvoie une contribution structurée (
agent/stance/summary/confidence/actions). - Gateway valide la contribution et bloque les rôles invalides ou non autorisés.
- Après chaque round, le système détecte les conflits et vérifie si le consensus est atteint.
- S’il n’y a pas de consensus, les résultats du round deviennent le contexte du round suivant.
- Après alignement, le LLM synthétise un operations brief final court.
Contrat clé : le LLM propose une contribution, mais l’acceptation de la contribution et les règles de terminaison sont contrôlées par la policy layer.
Structure du projet
examples/
└── agent-patterns/
└── multi-agent-collaboration/
└── python/
├── main.py # Team loop -> Validate -> Resolve -> Finalize
├── llm.py # role contributions + final synthesis
├── gateway.py # contribution contract + conflict/consensus policy
├── signals.py # deterministic shared context
└── requirements.txt
Lancer le projet
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/multi-agent-collaboration/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ est requis.
Option via export :
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Option via .env (optionnel)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
C’est la variante shell (macOS/Linux). Sur Windows, il est plus simple d’utiliser des variables set ou, si souhaité, python-dotenv pour charger .env automatiquement.
Tâche
Imagine un cas production avant le lancement d’une campagne :
"Prépare un brief go/no-go pour Checkout v2 sur le marché US au 2026-03-02. Une décision alignée entre growth, finance et risk est nécessaire."
Un seul agent peut facilement manquer une partie des risques ou de l’économie. Donc un cycle d’équipe est lancé :
- chaque rôle donne sa propre conclusion
- le système vérifie les divergences
- le round suivant démarre seulement si nécessaire
- la réponse finale apparaît après l’alignement
Solution
Dans cet exemple :
- les agents de rôle travaillent via un contexte partagé unique
- gateway maintient un contrat JSON strict et des limites d’exécution
detect_conflicts(...)etdecide_round_outcome(...)déterminent s’il faut continuer- si l’équipe ne s’aligne pas dans
max_rounds, le run s’arrête de façon contrôlée - si elle s’aligne, une étape finalize séparée compose un brief final court
Code
signals.py — faits partagés pour l’équipe
from __future__ import annotations
from typing import Any
def build_shared_context(*, report_date: str, region: str) -> dict[str, Any]:
return {
"report_date": report_date,
"region": region,
"campaign": {
"name": "Checkout v2 Launch",
"window": "2026-03-02",
"channel": "US paid + lifecycle",
},
"demand_signals": {
"projected_orders": 15200,
"conversion_lift_pct": 12.4,
"traffic_risk": "medium",
},
"finance_signals": {
"projected_revenue_usd": 684000.0,
"expected_margin_pct": 19.2,
"promo_cost_usd": 94000.0,
},
"risk_signals": {
"failed_payment_rate": 0.028,
"chargeback_alerts": 4,
"critical_incidents": 0,
},
"policy_limits": {
"payment_failure_block_threshold": 0.03,
"max_chargeback_alerts_for_go": 5,
},
}
Ce qui compte le plus ici (en clair)
- Tous les rôles voient le même
shared_context. - Les faits sont déterministes : séparés des décisions LLM.
gateway.py — policy boundary pour la collaboration
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_rounds: int = 3
max_messages: int = 12
max_seconds: int = 40
min_go_votes: int = 2
ALLOWED_STANCES = {"go", "caution", "block"}
def _is_number(value: Any) -> bool:
return isinstance(value, (int, float)) and not isinstance(value, bool)
def validate_contribution(raw: Any, *, allowed_agents: set[str]) -> dict[str, Any]:
if not isinstance(raw, dict):
raise StopRun("invalid_contribution:not_object")
required = {"agent", "stance", "summary", "confidence", "actions"}
if not required.issubset(raw.keys()):
raise StopRun("invalid_contribution:missing_keys")
agent = raw["agent"]
stance = raw["stance"]
summary = raw["summary"]
confidence = raw["confidence"]
actions = raw["actions"]
if not isinstance(agent, str) or not agent.strip():
raise StopRun("invalid_contribution:agent")
agent = agent.strip()
if agent not in allowed_agents:
raise StopRun(f"invalid_contribution:agent_not_allowed:{agent}")
if not isinstance(stance, str) or stance.strip() not in ALLOWED_STANCES:
raise StopRun("invalid_contribution:stance")
stance = stance.strip()
if not isinstance(summary, str) or not summary.strip():
raise StopRun("invalid_contribution:summary")
if not _is_number(confidence):
raise StopRun("invalid_contribution:confidence_type")
confidence = float(confidence)
if not (0.0 <= confidence <= 1.0):
raise StopRun("invalid_contribution:confidence_range")
if not isinstance(actions, list) or not actions:
raise StopRun("invalid_contribution:actions")
normalized_actions: list[str] = []
for item in actions:
if not isinstance(item, str) or not item.strip():
raise StopRun("invalid_contribution:action_item")
normalized_actions.append(item.strip())
# Ignore unknown keys to tolerate extra LLM fields.
return {
"agent": agent,
"stance": stance,
"summary": summary.strip(),
"confidence": round(confidence, 3),
"actions": normalized_actions[:3],
}
def detect_conflicts(contributions: list[dict[str, Any]]) -> list[str]:
if not contributions:
return ["no_contributions"]
stances = {item["stance"] for item in contributions}
conflicts: list[str] = []
if "go" in stances and "caution" in stances and "block" not in stances:
conflicts.append("go_vs_caution")
if "block" in stances and len(stances) > 1:
conflicts.append("blocking_vs_non_block")
if len(stances) == 3:
conflicts.append("high_divergence")
return conflicts
def decide_round_outcome(
contributions: list[dict[str, Any]],
*,
min_go_votes: int,
) -> str | None:
go_votes = sum(1 for item in contributions if item["stance"] == "go")
caution_votes = sum(1 for item in contributions if item["stance"] == "caution")
block_votes = sum(1 for item in contributions if item["stance"] == "block")
if block_votes >= 2:
return "no_go"
if block_votes > 0:
return None
if go_votes >= min_go_votes and caution_votes == 0:
return "go"
if go_votes >= min_go_votes and caution_votes > 0:
return "go_with_caution"
return None
class CollaborationGateway:
def __init__(self, *, allow: set[str], budget: Budget):
self.allow = set(allow)
self.budget = budget
self.message_count = 0
def _consume_message_budget(self) -> None:
self.message_count += 1
if self.message_count > self.budget.max_messages:
raise StopRun("max_messages")
def accept(self, raw: Any, *, expected_agent: str) -> dict[str, Any]:
if expected_agent not in self.allow:
raise StopRun(f"agent_denied:{expected_agent}")
self._consume_message_budget()
contribution = validate_contribution(raw, allowed_agents=self.allow)
if contribution["agent"] != expected_agent:
raise StopRun(f"invalid_contribution:agent_mismatch:{expected_agent}")
return contribution
Ce qui compte le plus ici (en clair)
- Gateway décide de ce qui est considéré comme une contribution d’équipe valide.
- On demande aux rôles de ne pas ajouter de champs en trop, mais gateway les tolère comme protection contre la "verbosité" du LLM (contrat enforced via required keys).
detect_conflicts(...)etdecide_round_outcome(...)séparent les notions de conflit et de préparation à la finalisation.- Gateway enforce uniquement l’execution allowlist transmise depuis
main.py(la séparation policy/execution vit dansmain.py, pas dans gateway).
llm.py — contributions de rôle et synthèse finale
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
COMMON_RULES = """
Return exactly one JSON object with this shape:
{
"agent": "<role_name>",
"stance": "go|caution|block",
"summary": "one short paragraph",
"confidence": 0.0,
"actions": ["action 1", "action 2"]
}
Rules:
- Use only the provided facts.
- Keep actions concrete and operational.
- Do not output markdown or extra keys.
""".strip()
AGENT_PROMPTS = {
"demand_analyst": (
"You are Demand Analyst. Focus on demand capacity, conversion, and traffic risks. "
"Decide whether launch is feasible from growth and operational demand perspective."
),
"finance_analyst": (
"You are Finance Analyst. Focus on revenue, margin, campaign cost, and downside exposure. "
"Decide if launch economics are acceptable."
),
"risk_analyst": (
"You are Risk Analyst. Focus on payment reliability, chargebacks, and incidents. "
"Prioritize safety and compliance risk containment."
),
"legal_analyst": (
"You are Legal Analyst. Focus on regulatory, compliance, consumer protection, and policy constraints. "
"Flag launch blockers and required mitigations."
),
}
FINAL_SYSTEM_PROMPT = """
You are a launch readiness editor.
Write a short operations brief in English.
Include:
- final decision (go/go_with_caution/no_go)
- why the team agreed
- top 2 immediate actions
Use only evidence from collaboration history.
""".strip()
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _round_summaries(history: list[dict[str, Any]], limit: int = 2) -> list[dict[str, Any]]:
summaries: list[dict[str, Any]] = []
for row in history[-limit:]:
summaries.append(
{
"round": row.get("round"),
"decision": row.get("decision"),
"conflicts": row.get("conflicts", []),
"stances": [
{
"agent": item.get("agent"),
"stance": item.get("stance"),
"confidence": item.get("confidence"),
}
for item in row.get("contributions", [])
],
}
)
return summaries
def propose_contribution(
*,
role: str,
goal: str,
shared_context: dict[str, Any],
history: list[dict[str, Any]],
open_conflicts: list[str],
) -> dict[str, Any]:
system = AGENT_PROMPTS.get(role)
if not system:
raise ValueError(f"unknown_role:{role}")
payload = {
"goal": goal,
"role": role,
"shared_context": shared_context,
"recent_rounds": _round_summaries(history, limit=2),
"open_conflicts": open_conflicts,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": f"{system}\n\n{COMMON_RULES}"},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"invalid": True, "raw": text}
def compose_final_answer(
*,
goal: str,
final_decision: str,
history: list[dict[str, Any]],
) -> str:
payload = {
"goal": goal,
"final_decision": final_decision,
"history": history,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = (completion.choices[0].message.content or "").strip()
if not text:
raise LLMEmpty("llm_empty")
return text
Ce qui compte le plus ici (en clair)
- Chaque rôle a son propre focus système, mais le même contrat de réponse structuré.
- L’historique des rounds précédents devient le contexte de la contribution suivante.
- Le brief final est généré dans une étape séparée après alignement, pas "au fil de l’eau".
main.py — Team Loop -> Resolve -> Finalize
from __future__ import annotations
import json
import time
from typing import Any
from gateway import (
Budget,
CollaborationGateway,
StopRun,
decide_round_outcome,
detect_conflicts,
)
from llm import LLMEmpty, LLMTimeout, compose_final_answer, propose_contribution
from signals import build_shared_context
REPORT_DATE = "2026-03-02"
REGION = "US"
GOAL = (
"Prepare a go/no-go launch brief for Checkout v2 campaign in US on 2026-03-02. "
"Use collaboration across demand, finance, and risk analysts; return one aligned decision."
)
BUDGET = Budget(max_rounds=3, max_messages=12, max_seconds=40, min_go_votes=2)
TEAM_ROLES_POLICY = {
"demand_analyst",
"finance_analyst",
"risk_analyst",
"legal_analyst",
}
LEGAL_ANALYST_ENABLED = False
TEAM_ROLES_EXECUTION = (
TEAM_ROLES_POLICY
if LEGAL_ANALYST_ENABLED
else {"demand_analyst", "finance_analyst", "risk_analyst"}
)
TEAM_SEQUENCE = ["demand_analyst", "finance_analyst", "risk_analyst"]
# Set LEGAL_ANALYST_ENABLED=True and append "legal_analyst" to TEAM_SEQUENCE to test runtime denial paths.
def _latest_stances(contributions: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [
{
"agent": item["agent"],
"stance": item["stance"],
"confidence": item["confidence"],
}
for item in contributions
]
def run_collaboration(goal: str) -> dict[str, Any]:
started = time.monotonic()
shared_context = build_shared_context(report_date=REPORT_DATE, region=REGION)
history: list[dict[str, Any]] = []
trace: list[dict[str, Any]] = []
open_conflicts: list[str] = []
final_decision: str | None = None
gateway = CollaborationGateway(allow=TEAM_ROLES_EXECUTION, budget=BUDGET)
for round_no in range(1, BUDGET.max_rounds + 1):
if (time.monotonic() - started) > BUDGET.max_seconds:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"trace": trace,
"history": history,
}
round_contributions: list[dict[str, Any]] = []
for role in TEAM_SEQUENCE:
try:
raw = propose_contribution(
role=role,
goal=goal,
shared_context=shared_context,
history=history,
open_conflicts=open_conflicts,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": f"round_{round_no}:{role}",
"trace": trace,
"history": history,
}
try:
contribution = gateway.accept(raw, expected_agent=role)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": f"round_{round_no}:{role}",
"raw_contribution": raw,
"trace": trace,
"history": history,
}
round_contributions.append(contribution)
trace.append(
{
"round": round_no,
"agent": role,
"stance": contribution["stance"],
"confidence": contribution["confidence"],
"accepted": True,
}
)
conflicts = detect_conflicts(round_contributions)
round_decision = decide_round_outcome(
round_contributions,
min_go_votes=BUDGET.min_go_votes,
)
history_entry = {
"round": round_no,
"contributions": round_contributions,
"conflicts": conflicts,
"decision": round_decision,
}
history.append(history_entry)
trace.append(
{
"round": round_no,
"conflicts": conflicts,
"decision": round_decision or "next_round",
}
)
if round_decision:
final_decision = round_decision
break
open_conflicts = conflicts
if not final_decision:
return {
"status": "stopped",
"stop_reason": "max_rounds_reached",
"trace": trace,
"history": history,
}
try:
answer = compose_final_answer(
goal=goal,
final_decision=final_decision,
history=history,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "finalize",
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"phase": "finalize",
"trace": trace,
"history": history,
}
last_round = history[-1]
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"final_decision": final_decision,
"rounds_used": len(history),
"team_summary": {
"report_date": REPORT_DATE,
"region": REGION,
"stances": _latest_stances(last_round["contributions"]),
"conflicts": last_round["conflicts"],
},
"trace": trace,
"history": history,
}
def main() -> None:
result = run_collaboration(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Ce qui compte le plus ici (en clair)
historyest une mémoire partagée entre les rounds.- Le critère de terminaison (
go/go_with_caution/no_go) est centralisé dans une fonction policy. - La différence entre rôles policy/execution est définie dans
main.pyviaTEAM_ROLES_POLICYvsTEAM_ROLES_EXECUTION, et gateway enforce seulement l’execution allowlist.
requirements.txt
openai==2.21.0
Exemple de sortie
Ci-dessous un exemple d’exécution valide où l’équipe s’aligne au round 1 et renvoie final_decision="go".
{
"status": "ok",
"stop_reason": "success",
"answer": "Operations Brief: Checkout v2 Campaign Launch — US, 2026-03-02. Final Decision: GO. Demand, finance, and risk agreed the launch is feasible and within policy thresholds.",
"final_decision": "go",
"rounds_used": 1,
"team_summary": {
"report_date": "2026-03-02",
"region": "US",
"stances": [
{"agent": "demand_analyst", "stance": "go", "confidence": 0.9},
{"agent": "finance_analyst", "stance": "go", "confidence": 0.9},
{"agent": "risk_analyst", "stance": "go", "confidence": 0.9}
],
"conflicts": []
},
"trace": [
{"round": 1, "agent": "demand_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
{"round": 1, "agent": "finance_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
{"round": 1, "agent": "risk_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
{"round": 1, "conflicts": [], "decision": "go"}
],
"history": [
{
"round": 1,
"contributions": [
{"agent": "demand_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["...", "..."]},
{"agent": "finance_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["..."]},
{"agent": "risk_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["..."]}
],
"conflicts": [],
"decision": "go"
}
]
}
C’est un exemple raccourci : le texte long de answer et summary/actions dans history est volontairement tronqué pour la lisibilité.
Valeurs stop_reason typiques
success— run terminé correctement ;final_decisionpeut êtrego,go_with_cautionouno_goinvalid_contribution:*— la contribution d’un agent n’a pas passé la validation du contratinvalid_contribution:actions— role a renvoyé desactionsvides ou invalides (dans cet exemple, 1-3 actions sont requises)agent_denied:<role>— rôle non autorisé par l’execution allowlistllm_timeout— le LLM n’a pas répondu dansOPENAI_TIMEOUT_SECONDSllm_empty— l’étape finalize a renvoyé un texte videmax_messages— budget de messages entre rôles dépassémax_rounds_reached— l’équipe n’a pas atteint de décision (final_decision) dansmax_roundsmax_seconds— budget total de temps du run dépassé
Ce qui n’est PAS montré ici
- Pas d’API de domaine réelles ni de données live.
- Pas de conflict resolver externe (par exemple un human reviewer).
- Pas de multi-tenant auth/ACL pour les rôles et les sources de données.
- Pas d’adaptive team sizing (ajout/suppression dynamique de rôles).
Ce que vous pouvez essayer ensuite
- Ajoute
legal_analystàTEAM_SEQUENCEavecLEGAL_ANALYST_ENABLED=Falseet observeagent_denied:legal_analystdansstop_reason. - Mets
min_go_votes=3pour exiger un consensus complet avant la finalisation. - Ajoute une règle d’escalade vers un humain si
blockse répète 2 rounds de suite.