Multi-Agent Collaboration Agent — Python (full implementation with LLM)

Production-style runnable Multi-Agent Collaboration agent example in Python with team roles, a shared state board, alignment rounds, conflict detection, and stop reasons.
On this page
  1. Pattern Essence (Brief)
  2. What this example demonstrates
  3. Architecture
  4. Project structure
  5. How to run
  6. Task
  7. Solution
  8. Code
  9. signals.py — shared facts for the team
  10. gateway.py — policy boundary for collaboration
  11. llm.py — role contributions and final synthesis
  12. main.py — Team Loop -> Resolve -> Finalize
  13. requirements.txt
  14. Example output
  15. Typical stop_reason values
  16. What is NOT shown here
  17. What to try next

Pattern Essence (Brief)

Multi-Agent Collaboration is a pattern where multiple agents with different roles work on one task in rounds and converge on an aligned decision.

In each role, LLM produces its contribution, while collaboration policy controls the contribution contract, round limits, and readiness criterion.


What this example demonstrates

  • a team with 3 roles: demand_analyst, finance_analyst, risk_analyst
  • shared context (shared_context) for all agents
  • a collaboration gateway that validates agent contributions against the contract
  • alignment rounds: conflict detection + consensus check
  • separate policy vs execution role allowlists
  • final decision (final_decision) can be go, go_with_caution, or no_go (determined by policy)
  • runtime budgets: max_rounds, max_messages, max_seconds
  • explicit stop_reason, trace, and history for production monitoring

Architecture

  1. The coordinator forms a shared state board (goal + shared_context).
  2. Each role-specific LLM model returns a structured contribution (agent/stance/summary/confidence/actions).
  3. Gateway validates the contribution and blocks invalid or disallowed roles.
  4. After each round, the system finds conflicts and checks whether consensus has been reached.
  5. If there is no consensus, round results become context for the next round.
  6. After alignment, LLM synthesizes a final short operations brief.

Key contract: LLM proposes a contribution, but contribution acceptance and completion rules are controlled by the policy layer.


Project structure

TEXT
examples/
└── agent-patterns/
    └── multi-agent-collaboration/
        └── python/
            ├── main.py           # Team loop -> Validate -> Resolve -> Finalize
            ├── llm.py            # role contributions + final synthesis
            ├── gateway.py        # contribution contract + conflict/consensus policy
            ├── signals.py        # deterministic shared context
            └── requirements.txt

How to run

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/multi-agent-collaboration/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ is required.

Option via export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Option via .env (optional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

This is the shell variant (macOS/Linux). On Windows, it is easier to use environment set commands or, if desired, python-dotenv to load .env automatically.


Task

Imagine a production case before a campaign launch:

"Prepare a go/no-go brief for Checkout v2 in the US market for 2026-03-02. We need an aligned decision across growth, finance, and risk."

A single agent can easily miss part of the risks or economics here. So a team loop is used:

  • each role gives its own conclusion
  • the system checks disagreements
  • the next round starts only if needed
  • the final response appears after alignment

Solution

In this example:

  • role agents work through one shared context
  • gateway keeps a strict JSON contract and execution limits
  • detect_conflicts(...) and decide_round_outcome(...) determine whether to continue
  • if the team does not align within max_rounds, the run stops in a controlled way
  • if it does align, a separate finalize step composes a short final brief

Code

signals.py — shared facts for the team

PYTHON
from __future__ import annotations

from typing import Any



def build_shared_context(*, report_date: str, region: str) -> dict[str, Any]:
    return {
        "report_date": report_date,
        "region": region,
        "campaign": {
            "name": "Checkout v2 Launch",
            "window": "2026-03-02",
            "channel": "US paid + lifecycle",
        },
        "demand_signals": {
            "projected_orders": 15200,
            "conversion_lift_pct": 12.4,
            "traffic_risk": "medium",
        },
        "finance_signals": {
            "projected_revenue_usd": 684000.0,
            "expected_margin_pct": 19.2,
            "promo_cost_usd": 94000.0,
        },
        "risk_signals": {
            "failed_payment_rate": 0.028,
            "chargeback_alerts": 4,
            "critical_incidents": 0,
        },
        "policy_limits": {
            "payment_failure_block_threshold": 0.03,
            "max_chargeback_alerts_for_go": 5,
        },
    }

What matters most here (in plain words)

  • All roles see the same shared_context.
  • Facts are deterministic: separated from LLM decisions.

gateway.py — policy boundary for collaboration

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_rounds: int = 3
    max_messages: int = 12
    max_seconds: int = 40
    min_go_votes: int = 2


ALLOWED_STANCES = {"go", "caution", "block"}



def _is_number(value: Any) -> bool:
    return isinstance(value, (int, float)) and not isinstance(value, bool)



def validate_contribution(raw: Any, *, allowed_agents: set[str]) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_contribution:not_object")

    required = {"agent", "stance", "summary", "confidence", "actions"}
    if not required.issubset(raw.keys()):
        raise StopRun("invalid_contribution:missing_keys")

    agent = raw["agent"]
    stance = raw["stance"]
    summary = raw["summary"]
    confidence = raw["confidence"]
    actions = raw["actions"]

    if not isinstance(agent, str) or not agent.strip():
        raise StopRun("invalid_contribution:agent")
    agent = agent.strip()
    if agent not in allowed_agents:
        raise StopRun(f"invalid_contribution:agent_not_allowed:{agent}")

    if not isinstance(stance, str) or stance.strip() not in ALLOWED_STANCES:
        raise StopRun("invalid_contribution:stance")
    stance = stance.strip()

    if not isinstance(summary, str) or not summary.strip():
        raise StopRun("invalid_contribution:summary")

    if not _is_number(confidence):
        raise StopRun("invalid_contribution:confidence_type")
    confidence = float(confidence)
    if not (0.0 <= confidence <= 1.0):
        raise StopRun("invalid_contribution:confidence_range")

    if not isinstance(actions, list) or not actions:
        raise StopRun("invalid_contribution:actions")

    normalized_actions: list[str] = []
    for item in actions:
        if not isinstance(item, str) or not item.strip():
            raise StopRun("invalid_contribution:action_item")
        normalized_actions.append(item.strip())

    # Ignore unknown keys to tolerate extra LLM fields.
    return {
        "agent": agent,
        "stance": stance,
        "summary": summary.strip(),
        "confidence": round(confidence, 3),
        "actions": normalized_actions[:3],
    }



def detect_conflicts(contributions: list[dict[str, Any]]) -> list[str]:
    if not contributions:
        return ["no_contributions"]

    stances = {item["stance"] for item in contributions}
    conflicts: list[str] = []

    if "go" in stances and "caution" in stances and "block" not in stances:
        conflicts.append("go_vs_caution")
    if "block" in stances and len(stances) > 1:
        conflicts.append("blocking_vs_non_block")
    if len(stances) == 3:
        conflicts.append("high_divergence")

    return conflicts



def decide_round_outcome(
    contributions: list[dict[str, Any]],
    *,
    min_go_votes: int,
) -> str | None:
    go_votes = sum(1 for item in contributions if item["stance"] == "go")
    caution_votes = sum(1 for item in contributions if item["stance"] == "caution")
    block_votes = sum(1 for item in contributions if item["stance"] == "block")

    if block_votes >= 2:
        return "no_go"

    if block_votes > 0:
        return None
    if go_votes >= min_go_votes and caution_votes == 0:
        return "go"
    if go_votes >= min_go_votes and caution_votes > 0:
        return "go_with_caution"
    return None


class CollaborationGateway:
    def __init__(self, *, allow: set[str], budget: Budget):
        self.allow = set(allow)
        self.budget = budget
        self.message_count = 0

    def _consume_message_budget(self) -> None:
        self.message_count += 1
        if self.message_count > self.budget.max_messages:
            raise StopRun("max_messages")

    def accept(self, raw: Any, *, expected_agent: str) -> dict[str, Any]:
        if expected_agent not in self.allow:
            raise StopRun(f"agent_denied:{expected_agent}")

        self._consume_message_budget()
        contribution = validate_contribution(raw, allowed_agents=self.allow)

        if contribution["agent"] != expected_agent:
            raise StopRun(f"invalid_contribution:agent_mismatch:{expected_agent}")

        return contribution

What matters most here (in plain words)

  • Gateway decides what exactly counts as a valid team contribution.
  • We ask roles not to add extra fields, but gateway tolerates them as protection from LLM "verbosity" (contract enforced by required keys).
  • detect_conflicts(...) and decide_round_outcome(...) separate the concepts of conflict and readiness for finalization.
  • Gateway only enforces the execution allowlist passed from main.py (policy/execution separation lives in main.py, not in gateway).

llm.py — role contributions and final synthesis

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


COMMON_RULES = """
Return exactly one JSON object with this shape:
{
  "agent": "<role_name>",
  "stance": "go|caution|block",
  "summary": "one short paragraph",
  "confidence": 0.0,
  "actions": ["action 1", "action 2"]
}

Rules:
- Use only the provided facts.
- Keep actions concrete and operational.
- Do not output markdown or extra keys.
""".strip()

AGENT_PROMPTS = {
    "demand_analyst": (
        "You are Demand Analyst. Focus on demand capacity, conversion, and traffic risks. "
        "Decide whether launch is feasible from growth and operational demand perspective."
    ),
    "finance_analyst": (
        "You are Finance Analyst. Focus on revenue, margin, campaign cost, and downside exposure. "
        "Decide if launch economics are acceptable."
    ),
    "risk_analyst": (
        "You are Risk Analyst. Focus on payment reliability, chargebacks, and incidents. "
        "Prioritize safety and compliance risk containment."
    ),
    "legal_analyst": (
        "You are Legal Analyst. Focus on regulatory, compliance, consumer protection, and policy constraints. "
        "Flag launch blockers and required mitigations."
    ),
}

FINAL_SYSTEM_PROMPT = """
You are a launch readiness editor.
Write a short operations brief in English.
Include:
- final decision (go/go_with_caution/no_go)
- why the team agreed
- top 2 immediate actions
Use only evidence from collaboration history.
""".strip()



def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)



def _round_summaries(history: list[dict[str, Any]], limit: int = 2) -> list[dict[str, Any]]:
    summaries: list[dict[str, Any]] = []
    for row in history[-limit:]:
        summaries.append(
            {
                "round": row.get("round"),
                "decision": row.get("decision"),
                "conflicts": row.get("conflicts", []),
                "stances": [
                    {
                        "agent": item.get("agent"),
                        "stance": item.get("stance"),
                        "confidence": item.get("confidence"),
                    }
                    for item in row.get("contributions", [])
                ],
            }
        )
    return summaries



def propose_contribution(
    *,
    role: str,
    goal: str,
    shared_context: dict[str, Any],
    history: list[dict[str, Any]],
    open_conflicts: list[str],
) -> dict[str, Any]:
    system = AGENT_PROMPTS.get(role)
    if not system:
        raise ValueError(f"unknown_role:{role}")

    payload = {
        "goal": goal,
        "role": role,
        "shared_context": shared_context,
        "recent_rounds": _round_summaries(history, limit=2),
        "open_conflicts": open_conflicts,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": f"{system}\n\n{COMMON_RULES}"},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"invalid": True, "raw": text}



def compose_final_answer(
    *,
    goal: str,
    final_decision: str,
    history: list[dict[str, Any]],
) -> str:
    payload = {
        "goal": goal,
        "final_decision": final_decision,
        "history": history,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            messages=[
                {"role": "system", "content": FINAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = (completion.choices[0].message.content or "").strip()
    if not text:
        raise LLMEmpty("llm_empty")
    return text

What matters most here (in plain words)

  • Each role has its own system focus, but the same structured response contract.
  • History of previous rounds becomes context for the next contribution.
  • The final brief is generated in a separate step after alignment, not "on the fly".

main.py — Team Loop -> Resolve -> Finalize

PYTHON
from __future__ import annotations

import json
import time
from typing import Any

from gateway import (
    Budget,
    CollaborationGateway,
    StopRun,
    decide_round_outcome,
    detect_conflicts,
)
from llm import LLMEmpty, LLMTimeout, compose_final_answer, propose_contribution
from signals import build_shared_context

REPORT_DATE = "2026-03-02"
REGION = "US"
GOAL = (
    "Prepare a go/no-go launch brief for Checkout v2 campaign in US on 2026-03-02. "
    "Use collaboration across demand, finance, and risk analysts; return one aligned decision."
)

BUDGET = Budget(max_rounds=3, max_messages=12, max_seconds=40, min_go_votes=2)

TEAM_ROLES_POLICY = {
    "demand_analyst",
    "finance_analyst",
    "risk_analyst",
    "legal_analyst",
}
LEGAL_ANALYST_ENABLED = False
TEAM_ROLES_EXECUTION = (
    TEAM_ROLES_POLICY
    if LEGAL_ANALYST_ENABLED
    else {"demand_analyst", "finance_analyst", "risk_analyst"}
)

TEAM_SEQUENCE = ["demand_analyst", "finance_analyst", "risk_analyst"]
# Set LEGAL_ANALYST_ENABLED=True and append "legal_analyst" to TEAM_SEQUENCE to test runtime denial paths.



def _latest_stances(contributions: list[dict[str, Any]]) -> list[dict[str, Any]]:
    return [
        {
            "agent": item["agent"],
            "stance": item["stance"],
            "confidence": item["confidence"],
        }
        for item in contributions
    ]



def run_collaboration(goal: str) -> dict[str, Any]:
    started = time.monotonic()
    shared_context = build_shared_context(report_date=REPORT_DATE, region=REGION)

    history: list[dict[str, Any]] = []
    trace: list[dict[str, Any]] = []
    open_conflicts: list[str] = []
    final_decision: str | None = None

    gateway = CollaborationGateway(allow=TEAM_ROLES_EXECUTION, budget=BUDGET)

    for round_no in range(1, BUDGET.max_rounds + 1):
        if (time.monotonic() - started) > BUDGET.max_seconds:
            return {
                "status": "stopped",
                "stop_reason": "max_seconds",
                "trace": trace,
                "history": history,
            }

        round_contributions: list[dict[str, Any]] = []

        for role in TEAM_SEQUENCE:
            try:
                raw = propose_contribution(
                    role=role,
                    goal=goal,
                    shared_context=shared_context,
                    history=history,
                    open_conflicts=open_conflicts,
                )
            except LLMTimeout:
                return {
                    "status": "stopped",
                    "stop_reason": "llm_timeout",
                    "phase": f"round_{round_no}:{role}",
                    "trace": trace,
                    "history": history,
                }

            try:
                contribution = gateway.accept(raw, expected_agent=role)
            except StopRun as exc:
                return {
                    "status": "stopped",
                    "stop_reason": exc.reason,
                    "phase": f"round_{round_no}:{role}",
                    "raw_contribution": raw,
                    "trace": trace,
                    "history": history,
                }

            round_contributions.append(contribution)
            trace.append(
                {
                    "round": round_no,
                    "agent": role,
                    "stance": contribution["stance"],
                    "confidence": contribution["confidence"],
                    "accepted": True,
                }
            )

        conflicts = detect_conflicts(round_contributions)
        round_decision = decide_round_outcome(
            round_contributions,
            min_go_votes=BUDGET.min_go_votes,
        )

        history_entry = {
            "round": round_no,
            "contributions": round_contributions,
            "conflicts": conflicts,
            "decision": round_decision,
        }
        history.append(history_entry)

        trace.append(
            {
                "round": round_no,
                "conflicts": conflicts,
                "decision": round_decision or "next_round",
            }
        )

        if round_decision:
            final_decision = round_decision
            break

        open_conflicts = conflicts

    if not final_decision:
        return {
            "status": "stopped",
            "stop_reason": "max_rounds_reached",
            "trace": trace,
            "history": history,
        }

    try:
        answer = compose_final_answer(
            goal=goal,
            final_decision=final_decision,
            history=history,
        )
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "finalize",
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "phase": "finalize",
            "trace": trace,
            "history": history,
        }

    last_round = history[-1]
    return {
        "status": "ok",
        "stop_reason": "success",
        "answer": answer,
        "final_decision": final_decision,
        "rounds_used": len(history),
        "team_summary": {
            "report_date": REPORT_DATE,
            "region": REGION,
            "stances": _latest_stances(last_round["contributions"]),
            "conflicts": last_round["conflicts"],
        },
        "trace": trace,
        "history": history,
    }



def main() -> None:
    result = run_collaboration(GOAL)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

What matters most here (in plain words)

  • history is shared memory across rounds.
  • The completion criterion (go/go_with_caution/no_go) is centralized in a policy function.
  • The difference between policy/execution roles is defined in main.py via TEAM_ROLES_POLICY vs TEAM_ROLES_EXECUTION, and gateway only enforces the execution allowlist.

requirements.txt

TEXT
openai==2.21.0

Example output

Below is an example of a valid run where the team aligns in round 1 and returns final_decision="go".

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "answer": "Operations Brief: Checkout v2 Campaign Launch — US, 2026-03-02. Final Decision: GO. Demand, finance, and risk agreed the launch is feasible and within policy thresholds.",
  "final_decision": "go",
  "rounds_used": 1,
  "team_summary": {
    "report_date": "2026-03-02",
    "region": "US",
    "stances": [
      {"agent": "demand_analyst", "stance": "go", "confidence": 0.9},
      {"agent": "finance_analyst", "stance": "go", "confidence": 0.9},
      {"agent": "risk_analyst", "stance": "go", "confidence": 0.9}
    ],
    "conflicts": []
  },
  "trace": [
    {"round": 1, "agent": "demand_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
    {"round": 1, "agent": "finance_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
    {"round": 1, "agent": "risk_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
    {"round": 1, "conflicts": [], "decision": "go"}
  ],
  "history": [
    {
      "round": 1,
      "contributions": [
        {"agent": "demand_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["...", "..."]},
        {"agent": "finance_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["..."]},
        {"agent": "risk_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["..."]}
      ],
      "conflicts": [],
      "decision": "go"
    }
  ]
}

This is a shortened example: long answer text and summary/actions in history are intentionally trimmed for readability.


Typical stop_reason values

  • success — run completed correctly; final_decision can be go, go_with_caution, or no_go
  • invalid_contribution:* — an agent contribution failed contract validation
  • invalid_contribution:actions — role returned empty or invalid actions (this example requires 1-3 actions)
  • agent_denied:<role> — role is not allowed by the execution allowlist
  • llm_timeout — LLM did not respond within OPENAI_TIMEOUT_SECONDS
  • llm_empty — finalize step returned empty text
  • max_messages — message budget between roles was exceeded
  • max_rounds_reached — the team did not reach a decision (final_decision) within max_rounds
  • max_seconds — total run time budget was exceeded

What is NOT shown here

  • There are no real domain APIs or live data.
  • There is no external conflict resolver (for example, a human reviewer).
  • There is no multi-tenant auth/ACL for roles and data sources.
  • There is no adaptive team sizing (dynamic add/remove of roles).

What to try next

  1. Add legal_analyst to TEAM_SEQUENCE with LEGAL_ANALYST_ENABLED=False and observe agent_denied:legal_analyst in stop_reason.
  2. Set min_go_votes=3 to require full consensus before finalization.
  3. Add a human escalation rule if block repeats for 2 rounds in a row.
⏱️ 13 min readUpdated Mar, 2026Difficulty: ★★☆
Integrated: production controlOnceOnly
Add guardrails to tool-calling agents
Ship this pattern with governance:
  • Budgets (steps / spend caps)
  • Tool permissions (allowlist / blocklist)
  • Kill switch & incident stop
  • Idempotency & dedupe
  • Audit logs & traceability
Integrated mention: OnceOnly is a control layer for production agent systems.
Author

This documentation is curated and maintained by engineers who ship AI agents in production.

The content is AI-assisted, with human editorial responsibility for accuracy, clarity, and production relevance.

Patterns and recommendations are grounded in post-mortems, failure modes, and operational incidents in deployed systems, including during the development and operation of governance infrastructure for agents at OnceOnly.