Supervisor Agent — Python (implémentation complète avec LLM)

Exemple exécutable d’agent Supervisor en Python, style production, avec policy review, décisions approve/revise/block/escalate, human approval, budgets et stop reasons.
Sur cette page
  1. Essence du pattern (bref)
  2. Ce que cet exemple démontre
  3. Architecture
  4. Structure du projet
  5. Lancer le projet
  6. Tâche
  7. Solution
  8. Code
  9. tools.py — tools métier (faits et exécution uniquement)
  10. gateway.py — execution boundary (validation et contrôle)
  11. supervisor.py — policy review et human approval
  12. llm.py — worker decisions + final synthesis
  13. main.py — Worker -> Supervisor -> Execute -> Final
  14. requirements.txt
  15. Exemple de sortie
  16. Ce qui n’est PAS montré ici
  17. Valeurs stop_reason typiques
  18. Ce que vous pouvez essayer ensuite

Essence du pattern (bref)

Supervisor Agent est un pattern où un worker agent propose l’action suivante, et une couche supervisor séparée décide si elle peut être exécutée.

Le LLM décide quoi faire ensuite, et la supervisor policy décide si c’est sûr et autorisé avant l’exécution.


Ce que cet exemple démontre

  • worker loop où le LLM propose une action (tool ou final)
  • supervisor review séparé avant chaque tool call
  • décisions du supervisor : approve, revise, block, escalate
  • simulation de human approval pour des montants élevés de remboursement
  • policy boundary entre intent (LLM) et execution (tools)
  • allowlist de tools, budgets de run et loop detection
  • stop_reason explicites et audit trace pour le monitoring production

Architecture

  1. Le worker propose un action JSON.
  2. L’action est validée (validate_worker_action).
  3. Le supervisor fait un review et renvoie une décision (approve/revise/block/escalate).
  4. ToolGateway exécute uniquement l’action autorisée.
  5. Observation + décision du supervisor sont écrites dans history.
  6. Le worker lit history et fait l’étape suivante ou renvoie final.

Le supervisor n’exécute pas lui-même la logique métier : il décide seulement si l’étape suivante est admissible.


Structure du projet

TEXT
examples/
└── agent-patterns/
    └── supervisor-agent/
        └── python/
            ├── main.py           # Worker -> Supervisor Review -> Execute -> Final
            ├── llm.py            # worker decision + final synthesis
            ├── supervisor.py     # policy review + human approval simulation
            ├── gateway.py        # action validation + allowlist + loop/budget control
            ├── tools.py          # deterministic business tools (refund flow)
            └── requirements.txt

Lancer le projet

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/supervisor-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ est requis.

Option via export :

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Option via .env (optionnel)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

C’est la variante shell (macOS/Linux). Sur Windows, il est plus simple d’utiliser des variables set ou, si souhaité, python-dotenv pour charger .env automatiquement.


Tâche

Imagine un cas réel de support en production :

"Un client demande un remboursement de 1200 USD pour un plan annuel payé il y a 10 jours."

Le worker ne doit pas exécuter automatiquement un remboursement de ce montant. Il faut :

  • collecter le contexte
  • proposer une action
  • laisser le supervisor la vérifier selon les policies
  • escalader vers un humain si nécessaire
  • exécuter uniquement l’action approuvée

Solution

Dans cet exemple :

  • le worker LLM propose une séquence d’actions
  • le supervisor contrôle chaque tool call
  • un high-risk refund est escaladé vers un humain
  • un humain (simulation) approuve le montant ajusté
  • ce n’est qu’ensuite que l’exécution et la réponse finale ont lieu

C’est le pattern Supervisor : l’exécution ne peut pas contourner la policy review.


Code

tools.py — tools métier (faits et exécution uniquement)

PYTHON
from __future__ import annotations

from typing import Any

USERS = {
    42: {"id": 42, "name": "Anna", "country": "US", "tier": "enterprise"},
    7: {"id": 7, "name": "Max", "country": "US", "tier": "pro"},
}

BILLING = {
    42: {
        "plan": "annual_enterprise",
        "currency": "USD",
        "last_charge_usd": 1200.0,
        "days_since_payment": 10,
    },
    7: {
        "plan": "pro_monthly",
        "currency": "USD",
        "last_charge_usd": 49.0,
        "days_since_payment": 35,
    },
}


def get_refund_context(user_id: int) -> dict[str, Any]:
    user = USERS.get(user_id)
    billing = BILLING.get(user_id)
    if not user or not billing:
        return {"error": f"context_not_found_for_user:{user_id}"}

    return {
        "user": user,
        "billing": billing,
        "policy_hint": {
            "auto_refund_limit_usd": 1000.0,
            "refund_window_days": 14,
        },
    }


def issue_refund(
    user_id: int,
    amount_usd: float,
    reason: str,
) -> dict[str, Any]:
    user = USERS.get(user_id)
    billing = BILLING.get(user_id)
    if not user or not billing:
        return {"status": "error", "error": f"refund_user_not_found:{user_id}"}

    if amount_usd <= 0:
        return {"status": "error", "error": "refund_amount_must_be_positive"}

    paid = float(billing["last_charge_usd"])
    if amount_usd > paid:
        return {
            "status": "error",
            "error": "refund_exceeds_last_charge",
            "max_refund_usd": paid,
        }

    return {
        "status": "ok",
        "refund": {
            "user_id": user_id,
            "currency": "USD",
            "amount_usd": round(float(amount_usd), 2),
            "reason": reason.strip(),
            "transaction_id": f"rf_{user_id}_20260226",
        },
    }


def send_refund_email(user_id: int, amount_usd: float, message: str) -> dict[str, Any]:
    user = USERS.get(user_id)
    if not user:
        return {"status": "error", "error": f"email_user_not_found:{user_id}"}

    return {
        "status": "ok",
        "email": {
            "to": f"{user['name'].lower()}@example.com",
            "template": "refund_confirmation_v2",
            "amount_usd": round(float(amount_usd), 2),
            "message": message,
            "email_id": f"em_{user_id}_20260226",
        },
    }

Ce qui compte le plus ici (en clair)

  • Les tools ne prennent pas de décisions de policy : ils renvoient seulement des faits ou exécutent une action.
  • Les contraintes de sécurité ici sont minimales ; le contrôle principal est assuré par le supervisor.

gateway.py — execution boundary (validation et contrôle)

PYTHON
from __future__ import annotations

import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_steps: int = 8
    max_tool_calls: int = 5
    max_seconds: int = 30


TOOL_ARG_TYPES: dict[str, dict[str, str]] = {
    "get_refund_context": {"user_id": "int"},
    "issue_refund": {"user_id": "int", "amount_usd": "number", "reason": "str?"},
    "send_refund_email": {"user_id": "int", "amount_usd": "number", "message": "str"},
}


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(v) for v in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def _normalize_for_hash(value: Any) -> Any:
    if isinstance(value, str):
        return " ".join(value.strip().split())
    if isinstance(value, list):
        return [_normalize_for_hash(item) for item in value]
    if isinstance(value, dict):
        return {str(key): _normalize_for_hash(value[key]) for key in sorted(value)}
    return value


def args_hash(args: dict[str, Any]) -> str:
    normalized = _normalize_for_hash(args or {})
    raw = _stable_json(normalized)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def _is_number(value: Any) -> bool:
    return isinstance(value, (int, float)) and not isinstance(value, bool)


def _validate_tool_args(name: str, args: dict[str, Any]) -> dict[str, Any]:
    spec = TOOL_ARG_TYPES.get(name)
    if spec is None:
        raise StopRun(f"invalid_action:unknown_tool:{name}")

    extra = set(args.keys()) - set(spec.keys())
    if extra:
        raise StopRun(f"invalid_action:extra_tool_args:{name}")

    normalized: dict[str, Any] = {}
    for arg_name, expected in spec.items():
        is_optional = expected.endswith("?")
        expected_base = expected[:-1] if is_optional else expected

        if arg_name not in args:
            if is_optional:
                continue
            raise StopRun(f"invalid_action:missing_required_arg:{name}:{arg_name}")
        value = args[arg_name]

        if expected_base == "int":
            if not isinstance(value, int) or isinstance(value, bool):
                raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
            normalized[arg_name] = value
            continue

        if expected_base == "number":
            if not _is_number(value):
                raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
            normalized[arg_name] = float(value)
            continue

        if expected_base == "str":
            if not isinstance(value, str) or not value.strip():
                raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
            normalized[arg_name] = value.strip()
            continue

        raise StopRun(f"invalid_action:unknown_arg_spec:{name}:{arg_name}")

    return normalized


def validate_worker_action(action: Any) -> dict[str, Any]:
    if not isinstance(action, dict):
        raise StopRun("invalid_action:not_object")

    kind = action.get("kind")
    if kind == "invalid":
        raise StopRun("invalid_action:non_json")

    if kind == "final":
        allowed_keys = {"kind", "answer"}
        if set(action.keys()) - allowed_keys:
            raise StopRun("invalid_action:extra_keys_final")
        answer = action.get("answer")
        if not isinstance(answer, str) or not answer.strip():
            raise StopRun("invalid_action:bad_final_answer")
        return {"kind": "final", "answer": answer.strip()}

    if kind == "tool":
        allowed_keys = {"kind", "name", "args"}
        if set(action.keys()) - allowed_keys:
            raise StopRun("invalid_action:extra_keys_tool")

        name = action.get("name")
        if not isinstance(name, str) or not name.strip():
            raise StopRun("invalid_action:bad_tool_name")

        args = action.get("args", {})
        if args is None:
            args = {}
        if not isinstance(args, dict):
            raise StopRun("invalid_action:bad_tool_args")

        normalized_args = _validate_tool_args(name.strip(), args)
        return {"kind": "tool", "name": name.strip(), "args": normalized_args}

    raise StopRun("invalid_action:bad_kind")


class ToolGateway:
    def __init__(
        self,
        *,
        allow: set[str],
        registry: dict[str, Callable[..., dict[str, Any]]],
        budget: Budget,
    ):
        self.allow = set(allow)
        self.registry = registry
        self.budget = budget
        self.tool_calls = 0
        self.seen_call_counts: dict[str, int] = {}
        self.per_tool_counts: dict[str, int] = {}
        self.read_only_repeat_limit: dict[str, int] = {
            "get_refund_context": 2,
        }
        self.per_tool_limit: dict[str, int] = {
            "get_refund_context": 3,
            "issue_refund": 2,
            "send_refund_email": 2,
        }

    def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
        self.tool_calls += 1
        if self.tool_calls > self.budget.max_tool_calls:
            raise StopRun("max_tool_calls")

        if name not in self.allow:
            raise StopRun(f"tool_denied:{name}")

        fn = self.registry.get(name)
        if fn is None:
            raise StopRun(f"tool_missing:{name}")

        count_for_tool = self.per_tool_counts.get(name, 0) + 1
        if count_for_tool > self.per_tool_limit.get(name, 2):
            raise StopRun("loop_detected:per_tool_limit")
        self.per_tool_counts[name] = count_for_tool

        signature = f"{name}:{args_hash(args)}"
        seen = self.seen_call_counts.get(signature, 0) + 1
        allowed_repeats = self.read_only_repeat_limit.get(name, 1)
        if seen > allowed_repeats:
            raise StopRun("loop_detected:signature_repeat")
        self.seen_call_counts[signature] = seen

        try:
            out = fn(**args)
        except TypeError as exc:
            raise StopRun(f"tool_bad_args:{name}") from exc
        except Exception as exc:
            raise StopRun(f"tool_error:{name}") from exc

        if not isinstance(out, dict):
            raise StopRun(f"tool_bad_result:{name}")
        return out

Ce qui compte le plus ici (en clair)

  • Gateway fait un schema-check minimal des args (types, champs required + optional) avant l’exécution.
  • Gateway contrôle l’exécution technique : allowlist, budget, loop detection (à la fois par tool+args et par le nombre total de tool calls).
  • Gateway ne remplit pas les champs manquants : il valide seulement le contrat des args.
  • Pour certains champs (par exemple issue_refund.reason), gateway autorise optional, car le supervisor peut les ajouter via revise. Si le supervisor ne les ajoute pas, l’exécution s’arrête avec tool_bad_args:*.
  • Même si le supervisor a approuvé, l’exécution passe quand même uniquement par la couche contrôlée.

supervisor.py — policy review et human approval

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any

HUMAN_APPROVAL_CAP_USD = 800.0


@dataclass(frozen=True)
class Policy:
    auto_refund_limit_usd: float = 1000.0
    max_refund_per_run_usd: float = 2000.0


@dataclass
class RuntimeState:
    executed_refund_total_usd: float = 0.0
    refund_executed: bool = False
    has_context: bool = False


@dataclass
class Decision:
    kind: str  # approve | revise | block | escalate
    reason: str
    revised_action: dict[str, Any] | None = None


def review_action(action: dict[str, Any], state: RuntimeState, policy: Policy) -> Decision:
    kind = action.get("kind")
    if kind == "final":
        if not state.has_context:
            return Decision(kind="block", reason="final_requires_context")
        return Decision(kind="approve", reason="final_with_context")
    if kind != "tool":
        return Decision(kind="block", reason="unknown_action_kind")

    name = action.get("name")
    args = dict(action.get("args") or {})

    if name == "get_refund_context":
        return Decision(kind="approve", reason="read_only_context")

    if name == "issue_refund":
        try:
            amount = float(args.get("amount_usd", 0.0))
        except (TypeError, ValueError):
            return Decision(kind="block", reason="invalid_refund_amount_type")
        if amount <= 0:
            return Decision(kind="block", reason="invalid_refund_amount")

        remaining = policy.max_refund_per_run_usd - state.executed_refund_total_usd
        if remaining <= 0:
            return Decision(kind="block", reason="refund_budget_exhausted")

        if amount > remaining:
            revised = {
                "kind": "tool",
                "name": name,
                "args": {**args, "amount_usd": round(remaining, 2)},
            }
            return Decision(kind="revise", reason="cap_to_remaining_run_budget", revised_action=revised)

        reason = str(args.get("reason", "")).strip()
        if not reason:
            revised = {
                "kind": "tool",
                "name": name,
                "args": {**args, "reason": "Customer requested refund within policy review"},
            }
            return Decision(kind="revise", reason="refund_reason_required", revised_action=revised)

        if amount > policy.auto_refund_limit_usd:
            return Decision(kind="escalate", reason="high_refund_requires_human")

        return Decision(kind="approve", reason="refund_within_auto_limit")

    if name == "send_refund_email":
        if not state.refund_executed:
            return Decision(kind="block", reason="email_before_refund")
        return Decision(kind="approve", reason="email_after_refund")

    return Decision(kind="block", reason=f"unknown_tool_for_supervisor:{name}")


def simulate_human_approval(action: dict[str, Any]) -> dict[str, Any]:
    name = action.get("name")
    args = dict(action.get("args") or {})

    if name != "issue_refund":
        return {"approved": True, "revised_action": action, "comment": "approved_by_human"}

    try:
        requested = float(args.get("amount_usd", 0.0))
    except (TypeError, ValueError):
        return {"approved": False, "comment": "invalid_requested_amount_type"}
    approved_amount = min(requested, HUMAN_APPROVAL_CAP_USD)

    if approved_amount <= 0:
        return {"approved": False, "comment": "invalid_requested_amount"}

    revised_action = {
        "kind": "tool",
        "name": "issue_refund",
        "args": {**args, "amount_usd": round(approved_amount, 2)},
    }
    return {
        "approved": True,
        "revised_action": revised_action,
        "comment": f"approved_with_cap:{approved_amount}",
    }

Ce qui compte le plus ici (en clair)

  • Le supervisor prend les décisions avant l’exécution, pas après.
  • final passe aussi par supervisor review (sans contexte, il renvoie block).
  • Un type invalide pour amount_usd ne plante pas avec une exception ; il renvoie une décision block contrôlée.
  • Le supervisor peut faire revise pour ajouter des champs policy-required (par exemple reason pour issue_refund).
  • escalate montre ici une option pratique : un humain peut approuver l’action avec un montant ajusté.

llm.py — worker decisions + final synthesis

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


WORKER_SYSTEM_PROMPT = """
You are a support worker agent.
Return exactly one JSON object in one of these shapes:
1) {"kind":"tool","name":"<tool_name>","args":{...}}
2) {"kind":"final","answer":"<short final answer>"}

Rules:
- First, collect facts with get_refund_context.
- Then propose issue_refund when facts are sufficient.
- Use send_refund_email only after refund success is visible in history.
- Use only tools from available_tools.
- Do not output markdown or extra keys.
""".strip()

FINAL_SYSTEM_PROMPT = """
You are a customer support assistant.
Write a short final answer in English.
Include:
- final refund amount in USD
- whether human approval was required
- one reason based on policy
""".strip()

TOOL_CATALOG = [
    {
        "name": "get_refund_context",
        "description": "Get user profile, billing facts, and policy hints",
        "args": {"user_id": "integer"},
    },
    {
        "name": "issue_refund",
        "description": "Issue refund in USD",
        "args": {"user_id": "integer", "amount_usd": "number", "reason": "optional string"},
    },
    {
        "name": "send_refund_email",
        "description": "Send refund confirmation email",
        "args": {"user_id": "integer", "amount_usd": "number", "message": "string"},
    },
]


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def _build_state_summary(history: list[dict[str, Any]]) -> dict[str, Any]:
    tools_used = [
        step.get("action", {}).get("name")
        for step in history
        if isinstance(step, dict)
        and isinstance(step.get("action"), dict)
        and step.get("action", {}).get("kind") == "tool"
    ]

    supervisor_decisions = [
        step.get("supervisor", {}).get("decision")
        for step in history
        if isinstance(step, dict) and isinstance(step.get("supervisor"), dict)
    ]

    return {
        "steps_completed": len(history),
        "tools_used": [t for t in tools_used if t],
        "supervisor_decisions": [d for d in supervisor_decisions if d],
        "last_step": history[-1] if history else None,
    }


def _recent_step_summaries(history: list[dict[str, Any]], limit: int = 3) -> list[dict[str, Any]]:
    out: list[dict[str, Any]] = []
    for step in history[-limit:]:
        out.append(
            {
                "step": step.get("step"),
                "proposed_tool": step.get("action", {}).get("name"),
                "supervisor_decision": step.get("supervisor", {}).get("decision"),
                "executed_tool": step.get("executed_action", {}).get("name"),
                "executed_from": step.get("executed_from"),
                "observation_status": step.get("observation", {}).get("status"),
            }
        )
    return out


def decide_next_action(
    goal: str,
    history: list[dict[str, Any]],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "state_summary": _build_state_summary(history),
        "recent_step_summaries": _recent_step_summaries(history, limit=3),
        "available_tools": TOOL_CATALOG,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": WORKER_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
    payload = {
        "goal": goal,
        "history": history,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            messages=[
                {"role": "system", "content": FINAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = (completion.choices[0].message.content or "").strip()
    if not text:
        raise LLMEmpty("llm_empty")
    return text

Ce qui compte le plus ici (en clair)

  • Le worker prend des décisions, mais n’a pas le droit d’exécuter une action directement.
  • Le contexte du worker inclut les décisions du supervisor et des summaries compacts des dernières étapes sans payload "lourd".

main.py — Worker -> Supervisor -> Execute -> Final

PYTHON
from __future__ import annotations

import json
import time
from typing import Any

from gateway import Budget, StopRun, ToolGateway, args_hash, validate_worker_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, decide_next_action
from supervisor import Decision, Policy, RuntimeState, review_action, simulate_human_approval
from tools import get_refund_context, issue_refund, send_refund_email

GOAL = (
    "User Anna (user_id=42) requests a refund for a recent annual plan charge of 1200 USD. "
    "Apply supervisor policy before any refund execution and provide a short final response."
)

BUDGET = Budget(max_steps=8, max_tool_calls=5, max_seconds=30)
POLICY = Policy(
    auto_refund_limit_usd=1000.0,
    max_refund_per_run_usd=2000.0,
)

TOOL_REGISTRY = {
    "get_refund_context": get_refund_context,
    "issue_refund": issue_refund,
    "send_refund_email": send_refund_email,
}

ALLOWED_TOOLS = {"get_refund_context", "issue_refund", "send_refund_email"}


def _decision_payload(decision: Decision) -> dict[str, Any]:
    payload = {
        "decision": decision.kind,
        "reason": decision.reason,
    }
    if decision.revised_action:
        payload["revised_action"] = decision.revised_action
    return payload


def run_supervised_flow(goal: str) -> dict[str, Any]:
    started = time.monotonic()
    history: list[dict[str, Any]] = []
    trace: list[dict[str, Any]] = []

    state = RuntimeState()
    gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)

    for step in range(1, BUDGET.max_steps + 1):
        if (time.monotonic() - started) > BUDGET.max_seconds:
            return {
                "status": "stopped",
                "stop_reason": "max_seconds",
                "trace": trace,
                "history": history,
            }

        try:
            raw_action = decide_next_action(goal=goal, history=history)
        except LLMTimeout:
            return {
                "status": "stopped",
                "stop_reason": "llm_timeout",
                "phase": "worker",
                "trace": trace,
                "history": history,
            }

        try:
            action = validate_worker_action(raw_action)
        except StopRun as exc:
            return {
                "status": "stopped",
                "stop_reason": exc.reason,
                "phase": "worker",
                "raw_action": raw_action,
                "trace": trace,
                "history": history,
            }

        decision = review_action(action=action, state=state, policy=POLICY)
        supervisor_info = _decision_payload(decision)

        action_to_execute = action
        human_info: dict[str, Any] | None = None
        executed_from = "original"

        if decision.kind == "block":
            trace_row = {
                "step": step,
                "tool": action.get("name", "final"),
                "supervisor_decision": "block",
                "executed_from": executed_from,
                "stop_reason": f"supervisor_block:{decision.reason}",
                "ok": False,
            }
            if action.get("kind") == "tool":
                trace_row["args_hash"] = args_hash(action.get("args", {}))
            trace.append(trace_row)
            return {
                "status": "stopped",
                "stop_reason": f"supervisor_block:{decision.reason}",
                "phase": "supervisor",
                "action": action,
                "trace": trace,
                "history": history,
            }

        if decision.kind == "revise" and decision.revised_action:
            action_to_execute = decision.revised_action
            executed_from = "supervisor_revised"

        if decision.kind == "escalate":
            human = simulate_human_approval(action)
            human_info = human
            if not human.get("approved"):
                trace.append(
                    {
                        "step": step,
                        "tool": action["name"],
                        "args_hash": args_hash(action["args"]),
                        "supervisor_decision": "escalate",
                        "executed_from": executed_from,
                        "human_approved": False,
                        "stop_reason": "human_rejected",
                        "ok": False,
                    }
                )
                return {
                    "status": "stopped",
                    "stop_reason": "human_rejected",
                    "phase": "human_approval",
                    "action": action,
                    "trace": trace,
                    "history": history,
                }
            revised = human.get("revised_action")
            if isinstance(revised, dict):
                action_to_execute = revised
                executed_from = "human_revised"

        if action_to_execute["kind"] == "final":
            trace.append(
                {
                    "step": step,
                    "tool": "final",
                    "supervisor_decision": decision.kind,
                    "executed_from": executed_from,
                    "ok": True,
                }
            )
            history.append(
                {
                    "step": step,
                    "action": action,
                    "supervisor": supervisor_info,
                    "executed_action": action_to_execute,
                    "executed_from": executed_from,
                    "observation": {"status": "final"},
                }
            )
            return {
                "status": "ok",
                "stop_reason": "success",
                "answer": action_to_execute["answer"],
                "trace": trace,
                "history": history,
            }

        tool_name = action_to_execute["name"]
        tool_args = action_to_execute["args"]

        try:
            observation = gateway.call(tool_name, tool_args)
            trace_row = {
                "step": step,
                "tool": tool_name,
                "args_hash": args_hash(tool_args),
                "supervisor_decision": decision.kind,
                "executed_from": executed_from,
                "ok": True,
            }
            if human_info is not None:
                trace_row["human_approved"] = bool(human_info.get("approved"))
            trace.append(trace_row)
        except StopRun as exc:
            trace_row = {
                "step": step,
                "tool": tool_name,
                "args_hash": args_hash(tool_args),
                "supervisor_decision": decision.kind,
                "executed_from": executed_from,
                "ok": False,
                "stop_reason": exc.reason,
            }
            if human_info is not None:
                trace_row["human_approved"] = bool(human_info.get("approved"))
            trace.append(trace_row)
            return {
                "status": "stopped",
                "stop_reason": exc.reason,
                "phase": "execution",
                "action": action_to_execute,
                "trace": trace,
                "history": history,
            }

        if tool_name == "get_refund_context" and "error" not in observation:
            state.has_context = True

        if tool_name == "issue_refund" and observation.get("status") == "ok":
            amount = float(observation.get("refund", {}).get("amount_usd", 0.0))
            state.executed_refund_total_usd += amount
            state.refund_executed = True

        history_entry = {
            "step": step,
            "action": action,
            "supervisor": supervisor_info,
            "executed_action": action_to_execute,
            "executed_from": executed_from,
            "observation": observation,
        }
        if human_info is not None:
            history_entry["human_approval"] = human_info
        history.append(history_entry)

    try:
        answer = compose_final_answer(goal=goal, history=history)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "finalize",
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "phase": "finalize",
            "trace": trace,
            "history": history,
        }

    return {
        "status": "ok",
        "stop_reason": "success",
        "answer": answer,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_supervised_flow(GOAL)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Ce qui compte le plus ici (en clair)

  • Le worker ne peut pas contourner supervisor review : chaque tool call passe par un policy check.
  • escalate mène à human approval, et seulement ensuite l’action est exécutée.
  • Dans trace, on voit la source de l’action exécutée : original, supervisor_revised ou human_revised.
  • Un final précoce sans get_refund_context est bloqué comme supervisor_block:final_requires_context.
  • trace + history donnent un audit complet : ce que le worker a proposé, ce que le supervisor a décidé, et ce qui a réellement été exécuté.

requirements.txt

TEXT
openai==2.21.0

Exemple de sortie

Ci-dessous, un exemple d’exécution valide où le worker, juste après avoir collecté le contexte, propose immédiatement un remboursement partiel de 1000 USD, et le supervisor donne approve.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "answer": "A partial refund of 1000 USD has been approved and processed for Anna as per supervisor policy, and a confirmation email has been sent.",
  "trace": [
    {
      "step": 1,
      "tool": "get_refund_context",
      "args_hash": "feaa769a39ae",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 2,
      "tool": "issue_refund",
      "args_hash": "36ffe69cb606",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 3,
      "tool": "send_refund_email",
      "args_hash": "ff6ec44bb2fa",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 4,
      "tool": "final",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    }
  ],
  "history": [
    {"step": 1, "action": {"kind": "tool", "name": "get_refund_context", "args": {"user_id": 42}}, "supervisor": {"decision": "approve", "reason": "read_only_context"}, "executed_action": {"kind": "tool", "name": "get_refund_context", "args": {"user_id": 42}}, "executed_from": "original", "observation": {...}},
    {"step": 2, "action": {"kind": "tool", "name": "issue_refund", "args": {"user_id": 42, "amount_usd": 1000.0, "reason": "..."}}, "supervisor": {"decision": "approve", "reason": "refund_within_auto_limit"}, "executed_action": {"kind": "tool", "name": "issue_refund", "args": {"user_id": 42, "amount_usd": 1000.0, "reason": "..."}}, "executed_from": "original", "observation": {...}},
    {"step": 3, "action": {"kind": "tool", "name": "send_refund_email", "args": {"user_id": 42, "amount_usd": 1000.0, "message": "..."}}, "supervisor": {"decision": "approve", "reason": "email_after_refund"}, "executed_action": {"kind": "tool", "name": "send_refund_email", "args": {"user_id": 42, "amount_usd": 1000.0, "message": "..."}}, "executed_from": "original", "observation": {...}},
    {"step": 4, "action": {"kind": "final", "answer": "..."}, "supervisor": {"decision": "approve", "reason": "final_with_context"}, "executed_action": {"kind": "final", "answer": "..."}, "executed_from": "original", "observation": {"status": "final"}}
  ]
}

C’est un exemple raccourci : history est volontairement réduit aux champs clés pour la lisibilité.


Ce qui n’est PAS montré ici

  • Pas de véritable UI/queue intégrée de human-in-the-loop.
  • Pas de role-based access ni d’isolation multi-tenant.
  • Pas de policies retry/backoff complexes pour LLM.
  • Pas d’orchestration retry/backoff complète pour write-tools ; le loop guard ici est volontairement conservateur.
  • Les optional args (par exemple issue_refund.reason) sont remplis via supervisor revise dans le cadre du contrat de démo.
  • Pas de budgets tokens/coût (cost guardrails).

Valeurs stop_reason typiques

  • success — le worker a terminé le scénario et renvoyé la réponse finale
  • invalid_action:* — le worker a renvoyé un action JSON invalide
  • invalid_action:bad_arg_type:* — dans les tool args, une valeur a un type invalide (par exemple amount_usd n’est pas un number)
  • supervisor_block:* — le supervisor a bloqué l’action via policy
  • human_rejected — l’escalade vers un humain a été rejetée
  • max_tool_calls — la limite de tool calls est atteinte
  • max_seconds — le time budget du run est dépassé
  • llm_timeout — le LLM n’a pas répondu dans OPENAI_TIMEOUT_SECONDS
  • llm_empty — la réponse finale du LLM est vide
  • tool_denied:* — le tool n’est pas dans l’execution allowlist
  • tool_missing:* — le tool est absent du registry
  • tool_bad_args:* — arguments invalides pour le tool
  • loop_detected:per_tool_limitper_tool_limit dépassé pour un tool (protection contre le tool spam même avec des args différents)
  • loop_detected:signature_repeat — le même tool+args est répété au-delà de la limite autorisée

Ce que vous pouvez essayer ensuite

  • Réduis POLICY.auto_refund_limit_usd à 500 et observe comment escalate se déclenche plus souvent.
  • Mets simulate_human_approval en mode refus et vérifie human_rejected.
  • Retire send_refund_email de ALLOWED_TOOLS et vérifie tool_denied:*.
  • Retire reason de issue_refund et vérifie comment le supervisor renvoie revise avec auto-remplissage de la raison.
  • Ajoute un champ risk_score aux décisions du supervisor et affiche-le dans trace pour l’alerting.
⏱️ 19 min de lectureMis à jour Mars, 2026Difficulté: ★★☆
Intégré : contrôle en productionOnceOnly
Ajoutez des garde-fous aux agents tool-calling
Livrez ce pattern avec de la gouvernance :
  • Budgets (steps / plafonds de coût)
  • Permissions outils (allowlist / blocklist)
  • Kill switch & arrêt incident
  • Idempotence & déduplication
  • Audit logs & traçabilité
Mention intégrée : OnceOnly est une couche de contrôle pour des systèmes d’agents en prod.
Auteur

Cette documentation est organisée et maintenue par des ingénieurs qui déploient des agents IA en production.

Le contenu est assisté par l’IA, avec une responsabilité éditoriale humaine quant à l’exactitude, la clarté et la pertinence en production.

Les patterns et recommandations s’appuient sur des post-mortems, des modes de défaillance et des incidents opérationnels dans des systèmes déployés, notamment lors du développement et de l’exploitation d’une infrastructure de gouvernance pour les agents chez OnceOnly.