Essence du pattern (bref)
Supervisor Agent est un pattern où un worker agent propose l’action suivante, et une couche supervisor séparée décide si elle peut être exécutée.
Le LLM décide quoi faire ensuite, et la supervisor policy décide si c’est sûr et autorisé avant l’exécution.
Ce que cet exemple démontre
- worker loop où le LLM propose une action (
tooloufinal) - supervisor review séparé avant chaque tool call
- décisions du supervisor :
approve,revise,block,escalate - simulation de human approval pour des montants élevés de remboursement
- policy boundary entre intent (LLM) et execution (tools)
- allowlist de tools, budgets de run et loop detection
stop_reasonexplicites et audit trace pour le monitoring production
Architecture
- Le worker propose un action JSON.
- L’action est validée (
validate_worker_action). - Le supervisor fait un review et renvoie une décision (
approve/revise/block/escalate). - ToolGateway exécute uniquement l’action autorisée.
- Observation + décision du supervisor sont écrites dans
history. - Le worker lit
historyet fait l’étape suivante ou renvoiefinal.
Le supervisor n’exécute pas lui-même la logique métier : il décide seulement si l’étape suivante est admissible.
Structure du projet
examples/
└── agent-patterns/
└── supervisor-agent/
└── python/
├── main.py # Worker -> Supervisor Review -> Execute -> Final
├── llm.py # worker decision + final synthesis
├── supervisor.py # policy review + human approval simulation
├── gateway.py # action validation + allowlist + loop/budget control
├── tools.py # deterministic business tools (refund flow)
└── requirements.txt
Lancer le projet
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/supervisor-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ est requis.
Option via export :
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Option via .env (optionnel)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
C’est la variante shell (macOS/Linux). Sur Windows, il est plus simple d’utiliser des variables set ou, si souhaité, python-dotenv pour charger .env automatiquement.
Tâche
Imagine un cas réel de support en production :
"Un client demande un remboursement de 1200 USD pour un plan annuel payé il y a 10 jours."
Le worker ne doit pas exécuter automatiquement un remboursement de ce montant. Il faut :
- collecter le contexte
- proposer une action
- laisser le supervisor la vérifier selon les policies
- escalader vers un humain si nécessaire
- exécuter uniquement l’action approuvée
Solution
Dans cet exemple :
- le worker LLM propose une séquence d’actions
- le supervisor contrôle chaque tool call
- un high-risk refund est escaladé vers un humain
- un humain (simulation) approuve le montant ajusté
- ce n’est qu’ensuite que l’exécution et la réponse finale ont lieu
C’est le pattern Supervisor : l’exécution ne peut pas contourner la policy review.
Code
tools.py — tools métier (faits et exécution uniquement)
from __future__ import annotations
from typing import Any
USERS = {
42: {"id": 42, "name": "Anna", "country": "US", "tier": "enterprise"},
7: {"id": 7, "name": "Max", "country": "US", "tier": "pro"},
}
BILLING = {
42: {
"plan": "annual_enterprise",
"currency": "USD",
"last_charge_usd": 1200.0,
"days_since_payment": 10,
},
7: {
"plan": "pro_monthly",
"currency": "USD",
"last_charge_usd": 49.0,
"days_since_payment": 35,
},
}
def get_refund_context(user_id: int) -> dict[str, Any]:
user = USERS.get(user_id)
billing = BILLING.get(user_id)
if not user or not billing:
return {"error": f"context_not_found_for_user:{user_id}"}
return {
"user": user,
"billing": billing,
"policy_hint": {
"auto_refund_limit_usd": 1000.0,
"refund_window_days": 14,
},
}
def issue_refund(
user_id: int,
amount_usd: float,
reason: str,
) -> dict[str, Any]:
user = USERS.get(user_id)
billing = BILLING.get(user_id)
if not user or not billing:
return {"status": "error", "error": f"refund_user_not_found:{user_id}"}
if amount_usd <= 0:
return {"status": "error", "error": "refund_amount_must_be_positive"}
paid = float(billing["last_charge_usd"])
if amount_usd > paid:
return {
"status": "error",
"error": "refund_exceeds_last_charge",
"max_refund_usd": paid,
}
return {
"status": "ok",
"refund": {
"user_id": user_id,
"currency": "USD",
"amount_usd": round(float(amount_usd), 2),
"reason": reason.strip(),
"transaction_id": f"rf_{user_id}_20260226",
},
}
def send_refund_email(user_id: int, amount_usd: float, message: str) -> dict[str, Any]:
user = USERS.get(user_id)
if not user:
return {"status": "error", "error": f"email_user_not_found:{user_id}"}
return {
"status": "ok",
"email": {
"to": f"{user['name'].lower()}@example.com",
"template": "refund_confirmation_v2",
"amount_usd": round(float(amount_usd), 2),
"message": message,
"email_id": f"em_{user_id}_20260226",
},
}
Ce qui compte le plus ici (en clair)
- Les tools ne prennent pas de décisions de policy : ils renvoient seulement des faits ou exécutent une action.
- Les contraintes de sécurité ici sont minimales ; le contrôle principal est assuré par le supervisor.
gateway.py — execution boundary (validation et contrôle)
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_steps: int = 8
max_tool_calls: int = 5
max_seconds: int = 30
TOOL_ARG_TYPES: dict[str, dict[str, str]] = {
"get_refund_context": {"user_id": "int"},
"issue_refund": {"user_id": "int", "amount_usd": "number", "reason": "str?"},
"send_refund_email": {"user_id": "int", "amount_usd": "number", "message": "str"},
}
def _stable_json(value: Any) -> str:
if value is None or isinstance(value, (bool, int, float, str)):
return json.dumps(value, ensure_ascii=True, sort_keys=True)
if isinstance(value, list):
return "[" + ",".join(_stable_json(v) for v in value) + "]"
if isinstance(value, dict):
parts = []
for key in sorted(value):
parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
return "{" + ",".join(parts) + "}"
return json.dumps(str(value), ensure_ascii=True)
def _normalize_for_hash(value: Any) -> Any:
if isinstance(value, str):
return " ".join(value.strip().split())
if isinstance(value, list):
return [_normalize_for_hash(item) for item in value]
if isinstance(value, dict):
return {str(key): _normalize_for_hash(value[key]) for key in sorted(value)}
return value
def args_hash(args: dict[str, Any]) -> str:
normalized = _normalize_for_hash(args or {})
raw = _stable_json(normalized)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
def _is_number(value: Any) -> bool:
return isinstance(value, (int, float)) and not isinstance(value, bool)
def _validate_tool_args(name: str, args: dict[str, Any]) -> dict[str, Any]:
spec = TOOL_ARG_TYPES.get(name)
if spec is None:
raise StopRun(f"invalid_action:unknown_tool:{name}")
extra = set(args.keys()) - set(spec.keys())
if extra:
raise StopRun(f"invalid_action:extra_tool_args:{name}")
normalized: dict[str, Any] = {}
for arg_name, expected in spec.items():
is_optional = expected.endswith("?")
expected_base = expected[:-1] if is_optional else expected
if arg_name not in args:
if is_optional:
continue
raise StopRun(f"invalid_action:missing_required_arg:{name}:{arg_name}")
value = args[arg_name]
if expected_base == "int":
if not isinstance(value, int) or isinstance(value, bool):
raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
normalized[arg_name] = value
continue
if expected_base == "number":
if not _is_number(value):
raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
normalized[arg_name] = float(value)
continue
if expected_base == "str":
if not isinstance(value, str) or not value.strip():
raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
normalized[arg_name] = value.strip()
continue
raise StopRun(f"invalid_action:unknown_arg_spec:{name}:{arg_name}")
return normalized
def validate_worker_action(action: Any) -> dict[str, Any]:
if not isinstance(action, dict):
raise StopRun("invalid_action:not_object")
kind = action.get("kind")
if kind == "invalid":
raise StopRun("invalid_action:non_json")
if kind == "final":
allowed_keys = {"kind", "answer"}
if set(action.keys()) - allowed_keys:
raise StopRun("invalid_action:extra_keys_final")
answer = action.get("answer")
if not isinstance(answer, str) or not answer.strip():
raise StopRun("invalid_action:bad_final_answer")
return {"kind": "final", "answer": answer.strip()}
if kind == "tool":
allowed_keys = {"kind", "name", "args"}
if set(action.keys()) - allowed_keys:
raise StopRun("invalid_action:extra_keys_tool")
name = action.get("name")
if not isinstance(name, str) or not name.strip():
raise StopRun("invalid_action:bad_tool_name")
args = action.get("args", {})
if args is None:
args = {}
if not isinstance(args, dict):
raise StopRun("invalid_action:bad_tool_args")
normalized_args = _validate_tool_args(name.strip(), args)
return {"kind": "tool", "name": name.strip(), "args": normalized_args}
raise StopRun("invalid_action:bad_kind")
class ToolGateway:
def __init__(
self,
*,
allow: set[str],
registry: dict[str, Callable[..., dict[str, Any]]],
budget: Budget,
):
self.allow = set(allow)
self.registry = registry
self.budget = budget
self.tool_calls = 0
self.seen_call_counts: dict[str, int] = {}
self.per_tool_counts: dict[str, int] = {}
self.read_only_repeat_limit: dict[str, int] = {
"get_refund_context": 2,
}
self.per_tool_limit: dict[str, int] = {
"get_refund_context": 3,
"issue_refund": 2,
"send_refund_email": 2,
}
def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
self.tool_calls += 1
if self.tool_calls > self.budget.max_tool_calls:
raise StopRun("max_tool_calls")
if name not in self.allow:
raise StopRun(f"tool_denied:{name}")
fn = self.registry.get(name)
if fn is None:
raise StopRun(f"tool_missing:{name}")
count_for_tool = self.per_tool_counts.get(name, 0) + 1
if count_for_tool > self.per_tool_limit.get(name, 2):
raise StopRun("loop_detected:per_tool_limit")
self.per_tool_counts[name] = count_for_tool
signature = f"{name}:{args_hash(args)}"
seen = self.seen_call_counts.get(signature, 0) + 1
allowed_repeats = self.read_only_repeat_limit.get(name, 1)
if seen > allowed_repeats:
raise StopRun("loop_detected:signature_repeat")
self.seen_call_counts[signature] = seen
try:
out = fn(**args)
except TypeError as exc:
raise StopRun(f"tool_bad_args:{name}") from exc
except Exception as exc:
raise StopRun(f"tool_error:{name}") from exc
if not isinstance(out, dict):
raise StopRun(f"tool_bad_result:{name}")
return out
Ce qui compte le plus ici (en clair)
- Gateway fait un schema-check minimal des args (types, champs required + optional) avant l’exécution.
- Gateway contrôle l’exécution technique : allowlist, budget, loop detection (à la fois par
tool+argset par le nombre total de tool calls). - Gateway ne remplit pas les champs manquants : il valide seulement le contrat des args.
- Pour certains champs (par exemple
issue_refund.reason), gateway autorise optional, car le supervisor peut les ajouter viarevise. Si le supervisor ne les ajoute pas, l’exécution s’arrête avectool_bad_args:*. - Même si le supervisor a approuvé, l’exécution passe quand même uniquement par la couche contrôlée.
supervisor.py — policy review et human approval
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
HUMAN_APPROVAL_CAP_USD = 800.0
@dataclass(frozen=True)
class Policy:
auto_refund_limit_usd: float = 1000.0
max_refund_per_run_usd: float = 2000.0
@dataclass
class RuntimeState:
executed_refund_total_usd: float = 0.0
refund_executed: bool = False
has_context: bool = False
@dataclass
class Decision:
kind: str # approve | revise | block | escalate
reason: str
revised_action: dict[str, Any] | None = None
def review_action(action: dict[str, Any], state: RuntimeState, policy: Policy) -> Decision:
kind = action.get("kind")
if kind == "final":
if not state.has_context:
return Decision(kind="block", reason="final_requires_context")
return Decision(kind="approve", reason="final_with_context")
if kind != "tool":
return Decision(kind="block", reason="unknown_action_kind")
name = action.get("name")
args = dict(action.get("args") or {})
if name == "get_refund_context":
return Decision(kind="approve", reason="read_only_context")
if name == "issue_refund":
try:
amount = float(args.get("amount_usd", 0.0))
except (TypeError, ValueError):
return Decision(kind="block", reason="invalid_refund_amount_type")
if amount <= 0:
return Decision(kind="block", reason="invalid_refund_amount")
remaining = policy.max_refund_per_run_usd - state.executed_refund_total_usd
if remaining <= 0:
return Decision(kind="block", reason="refund_budget_exhausted")
if amount > remaining:
revised = {
"kind": "tool",
"name": name,
"args": {**args, "amount_usd": round(remaining, 2)},
}
return Decision(kind="revise", reason="cap_to_remaining_run_budget", revised_action=revised)
reason = str(args.get("reason", "")).strip()
if not reason:
revised = {
"kind": "tool",
"name": name,
"args": {**args, "reason": "Customer requested refund within policy review"},
}
return Decision(kind="revise", reason="refund_reason_required", revised_action=revised)
if amount > policy.auto_refund_limit_usd:
return Decision(kind="escalate", reason="high_refund_requires_human")
return Decision(kind="approve", reason="refund_within_auto_limit")
if name == "send_refund_email":
if not state.refund_executed:
return Decision(kind="block", reason="email_before_refund")
return Decision(kind="approve", reason="email_after_refund")
return Decision(kind="block", reason=f"unknown_tool_for_supervisor:{name}")
def simulate_human_approval(action: dict[str, Any]) -> dict[str, Any]:
name = action.get("name")
args = dict(action.get("args") or {})
if name != "issue_refund":
return {"approved": True, "revised_action": action, "comment": "approved_by_human"}
try:
requested = float(args.get("amount_usd", 0.0))
except (TypeError, ValueError):
return {"approved": False, "comment": "invalid_requested_amount_type"}
approved_amount = min(requested, HUMAN_APPROVAL_CAP_USD)
if approved_amount <= 0:
return {"approved": False, "comment": "invalid_requested_amount"}
revised_action = {
"kind": "tool",
"name": "issue_refund",
"args": {**args, "amount_usd": round(approved_amount, 2)},
}
return {
"approved": True,
"revised_action": revised_action,
"comment": f"approved_with_cap:{approved_amount}",
}
Ce qui compte le plus ici (en clair)
- Le supervisor prend les décisions avant l’exécution, pas après.
finalpasse aussi par supervisor review (sans contexte, il renvoieblock).- Un type invalide pour
amount_usdne plante pas avec une exception ; il renvoie une décisionblockcontrôlée. - Le supervisor peut faire
revisepour ajouter des champs policy-required (par exemplereasonpourissue_refund). escalatemontre ici une option pratique : un humain peut approuver l’action avec un montant ajusté.
llm.py — worker decisions + final synthesis
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
WORKER_SYSTEM_PROMPT = """
You are a support worker agent.
Return exactly one JSON object in one of these shapes:
1) {"kind":"tool","name":"<tool_name>","args":{...}}
2) {"kind":"final","answer":"<short final answer>"}
Rules:
- First, collect facts with get_refund_context.
- Then propose issue_refund when facts are sufficient.
- Use send_refund_email only after refund success is visible in history.
- Use only tools from available_tools.
- Do not output markdown or extra keys.
""".strip()
FINAL_SYSTEM_PROMPT = """
You are a customer support assistant.
Write a short final answer in English.
Include:
- final refund amount in USD
- whether human approval was required
- one reason based on policy
""".strip()
TOOL_CATALOG = [
{
"name": "get_refund_context",
"description": "Get user profile, billing facts, and policy hints",
"args": {"user_id": "integer"},
},
{
"name": "issue_refund",
"description": "Issue refund in USD",
"args": {"user_id": "integer", "amount_usd": "number", "reason": "optional string"},
},
{
"name": "send_refund_email",
"description": "Send refund confirmation email",
"args": {"user_id": "integer", "amount_usd": "number", "message": "string"},
},
]
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _build_state_summary(history: list[dict[str, Any]]) -> dict[str, Any]:
tools_used = [
step.get("action", {}).get("name")
for step in history
if isinstance(step, dict)
and isinstance(step.get("action"), dict)
and step.get("action", {}).get("kind") == "tool"
]
supervisor_decisions = [
step.get("supervisor", {}).get("decision")
for step in history
if isinstance(step, dict) and isinstance(step.get("supervisor"), dict)
]
return {
"steps_completed": len(history),
"tools_used": [t for t in tools_used if t],
"supervisor_decisions": [d for d in supervisor_decisions if d],
"last_step": history[-1] if history else None,
}
def _recent_step_summaries(history: list[dict[str, Any]], limit: int = 3) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for step in history[-limit:]:
out.append(
{
"step": step.get("step"),
"proposed_tool": step.get("action", {}).get("name"),
"supervisor_decision": step.get("supervisor", {}).get("decision"),
"executed_tool": step.get("executed_action", {}).get("name"),
"executed_from": step.get("executed_from"),
"observation_status": step.get("observation", {}).get("status"),
}
)
return out
def decide_next_action(
goal: str,
history: list[dict[str, Any]],
) -> dict[str, Any]:
payload = {
"goal": goal,
"state_summary": _build_state_summary(history),
"recent_step_summaries": _recent_step_summaries(history, limit=3),
"available_tools": TOOL_CATALOG,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": WORKER_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
payload = {
"goal": goal,
"history": history,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = (completion.choices[0].message.content or "").strip()
if not text:
raise LLMEmpty("llm_empty")
return text
Ce qui compte le plus ici (en clair)
- Le worker prend des décisions, mais n’a pas le droit d’exécuter une action directement.
- Le contexte du worker inclut les décisions du supervisor et des summaries compacts des dernières étapes sans payload "lourd".
main.py — Worker -> Supervisor -> Execute -> Final
from __future__ import annotations
import json
import time
from typing import Any
from gateway import Budget, StopRun, ToolGateway, args_hash, validate_worker_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, decide_next_action
from supervisor import Decision, Policy, RuntimeState, review_action, simulate_human_approval
from tools import get_refund_context, issue_refund, send_refund_email
GOAL = (
"User Anna (user_id=42) requests a refund for a recent annual plan charge of 1200 USD. "
"Apply supervisor policy before any refund execution and provide a short final response."
)
BUDGET = Budget(max_steps=8, max_tool_calls=5, max_seconds=30)
POLICY = Policy(
auto_refund_limit_usd=1000.0,
max_refund_per_run_usd=2000.0,
)
TOOL_REGISTRY = {
"get_refund_context": get_refund_context,
"issue_refund": issue_refund,
"send_refund_email": send_refund_email,
}
ALLOWED_TOOLS = {"get_refund_context", "issue_refund", "send_refund_email"}
def _decision_payload(decision: Decision) -> dict[str, Any]:
payload = {
"decision": decision.kind,
"reason": decision.reason,
}
if decision.revised_action:
payload["revised_action"] = decision.revised_action
return payload
def run_supervised_flow(goal: str) -> dict[str, Any]:
started = time.monotonic()
history: list[dict[str, Any]] = []
trace: list[dict[str, Any]] = []
state = RuntimeState()
gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)
for step in range(1, BUDGET.max_steps + 1):
if (time.monotonic() - started) > BUDGET.max_seconds:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"trace": trace,
"history": history,
}
try:
raw_action = decide_next_action(goal=goal, history=history)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "worker",
"trace": trace,
"history": history,
}
try:
action = validate_worker_action(raw_action)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "worker",
"raw_action": raw_action,
"trace": trace,
"history": history,
}
decision = review_action(action=action, state=state, policy=POLICY)
supervisor_info = _decision_payload(decision)
action_to_execute = action
human_info: dict[str, Any] | None = None
executed_from = "original"
if decision.kind == "block":
trace_row = {
"step": step,
"tool": action.get("name", "final"),
"supervisor_decision": "block",
"executed_from": executed_from,
"stop_reason": f"supervisor_block:{decision.reason}",
"ok": False,
}
if action.get("kind") == "tool":
trace_row["args_hash"] = args_hash(action.get("args", {}))
trace.append(trace_row)
return {
"status": "stopped",
"stop_reason": f"supervisor_block:{decision.reason}",
"phase": "supervisor",
"action": action,
"trace": trace,
"history": history,
}
if decision.kind == "revise" and decision.revised_action:
action_to_execute = decision.revised_action
executed_from = "supervisor_revised"
if decision.kind == "escalate":
human = simulate_human_approval(action)
human_info = human
if not human.get("approved"):
trace.append(
{
"step": step,
"tool": action["name"],
"args_hash": args_hash(action["args"]),
"supervisor_decision": "escalate",
"executed_from": executed_from,
"human_approved": False,
"stop_reason": "human_rejected",
"ok": False,
}
)
return {
"status": "stopped",
"stop_reason": "human_rejected",
"phase": "human_approval",
"action": action,
"trace": trace,
"history": history,
}
revised = human.get("revised_action")
if isinstance(revised, dict):
action_to_execute = revised
executed_from = "human_revised"
if action_to_execute["kind"] == "final":
trace.append(
{
"step": step,
"tool": "final",
"supervisor_decision": decision.kind,
"executed_from": executed_from,
"ok": True,
}
)
history.append(
{
"step": step,
"action": action,
"supervisor": supervisor_info,
"executed_action": action_to_execute,
"executed_from": executed_from,
"observation": {"status": "final"},
}
)
return {
"status": "ok",
"stop_reason": "success",
"answer": action_to_execute["answer"],
"trace": trace,
"history": history,
}
tool_name = action_to_execute["name"]
tool_args = action_to_execute["args"]
try:
observation = gateway.call(tool_name, tool_args)
trace_row = {
"step": step,
"tool": tool_name,
"args_hash": args_hash(tool_args),
"supervisor_decision": decision.kind,
"executed_from": executed_from,
"ok": True,
}
if human_info is not None:
trace_row["human_approved"] = bool(human_info.get("approved"))
trace.append(trace_row)
except StopRun as exc:
trace_row = {
"step": step,
"tool": tool_name,
"args_hash": args_hash(tool_args),
"supervisor_decision": decision.kind,
"executed_from": executed_from,
"ok": False,
"stop_reason": exc.reason,
}
if human_info is not None:
trace_row["human_approved"] = bool(human_info.get("approved"))
trace.append(trace_row)
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "execution",
"action": action_to_execute,
"trace": trace,
"history": history,
}
if tool_name == "get_refund_context" and "error" not in observation:
state.has_context = True
if tool_name == "issue_refund" and observation.get("status") == "ok":
amount = float(observation.get("refund", {}).get("amount_usd", 0.0))
state.executed_refund_total_usd += amount
state.refund_executed = True
history_entry = {
"step": step,
"action": action,
"supervisor": supervisor_info,
"executed_action": action_to_execute,
"executed_from": executed_from,
"observation": observation,
}
if human_info is not None:
history_entry["human_approval"] = human_info
history.append(history_entry)
try:
answer = compose_final_answer(goal=goal, history=history)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "finalize",
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"phase": "finalize",
"trace": trace,
"history": history,
}
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"trace": trace,
"history": history,
}
def main() -> None:
result = run_supervised_flow(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Ce qui compte le plus ici (en clair)
- Le worker ne peut pas contourner supervisor review : chaque tool call passe par un policy check.
escalatemène à human approval, et seulement ensuite l’action est exécutée.- Dans
trace, on voit la source de l’action exécutée :original,supervisor_revisedouhuman_revised. - Un
finalprécoce sansget_refund_contextest bloqué commesupervisor_block:final_requires_context. trace+historydonnent un audit complet : ce que le worker a proposé, ce que le supervisor a décidé, et ce qui a réellement été exécuté.
requirements.txt
openai==2.21.0
Exemple de sortie
Ci-dessous, un exemple d’exécution valide où le worker, juste après avoir collecté le contexte, propose immédiatement un remboursement partiel de 1000 USD, et le supervisor donne approve.
{
"status": "ok",
"stop_reason": "success",
"answer": "A partial refund of 1000 USD has been approved and processed for Anna as per supervisor policy, and a confirmation email has been sent.",
"trace": [
{
"step": 1,
"tool": "get_refund_context",
"args_hash": "feaa769a39ae",
"supervisor_decision": "approve",
"executed_from": "original",
"ok": true
},
{
"step": 2,
"tool": "issue_refund",
"args_hash": "36ffe69cb606",
"supervisor_decision": "approve",
"executed_from": "original",
"ok": true
},
{
"step": 3,
"tool": "send_refund_email",
"args_hash": "ff6ec44bb2fa",
"supervisor_decision": "approve",
"executed_from": "original",
"ok": true
},
{
"step": 4,
"tool": "final",
"supervisor_decision": "approve",
"executed_from": "original",
"ok": true
}
],
"history": [
{"step": 1, "action": {"kind": "tool", "name": "get_refund_context", "args": {"user_id": 42}}, "supervisor": {"decision": "approve", "reason": "read_only_context"}, "executed_action": {"kind": "tool", "name": "get_refund_context", "args": {"user_id": 42}}, "executed_from": "original", "observation": {...}},
{"step": 2, "action": {"kind": "tool", "name": "issue_refund", "args": {"user_id": 42, "amount_usd": 1000.0, "reason": "..."}}, "supervisor": {"decision": "approve", "reason": "refund_within_auto_limit"}, "executed_action": {"kind": "tool", "name": "issue_refund", "args": {"user_id": 42, "amount_usd": 1000.0, "reason": "..."}}, "executed_from": "original", "observation": {...}},
{"step": 3, "action": {"kind": "tool", "name": "send_refund_email", "args": {"user_id": 42, "amount_usd": 1000.0, "message": "..."}}, "supervisor": {"decision": "approve", "reason": "email_after_refund"}, "executed_action": {"kind": "tool", "name": "send_refund_email", "args": {"user_id": 42, "amount_usd": 1000.0, "message": "..."}}, "executed_from": "original", "observation": {...}},
{"step": 4, "action": {"kind": "final", "answer": "..."}, "supervisor": {"decision": "approve", "reason": "final_with_context"}, "executed_action": {"kind": "final", "answer": "..."}, "executed_from": "original", "observation": {"status": "final"}}
]
}
C’est un exemple raccourci : history est volontairement réduit aux champs clés pour la lisibilité.
Ce qui n’est PAS montré ici
- Pas de véritable UI/queue intégrée de human-in-the-loop.
- Pas de role-based access ni d’isolation multi-tenant.
- Pas de policies retry/backoff complexes pour LLM.
- Pas d’orchestration retry/backoff complète pour write-tools ; le loop guard ici est volontairement conservateur.
- Les optional args (par exemple
issue_refund.reason) sont remplis via supervisorrevisedans le cadre du contrat de démo. - Pas de budgets tokens/coût (cost guardrails).
Valeurs stop_reason typiques
success— le worker a terminé le scénario et renvoyé la réponse finaleinvalid_action:*— le worker a renvoyé un action JSON invalideinvalid_action:bad_arg_type:*— dans les tool args, une valeur a un type invalide (par exempleamount_usdn’est pas un number)supervisor_block:*— le supervisor a bloqué l’action via policyhuman_rejected— l’escalade vers un humain a été rejetéemax_tool_calls— la limite de tool calls est atteintemax_seconds— le time budget du run est dépasséllm_timeout— le LLM n’a pas répondu dansOPENAI_TIMEOUT_SECONDSllm_empty— la réponse finale du LLM est videtool_denied:*— le tool n’est pas dans l’execution allowlisttool_missing:*— le tool est absent du registrytool_bad_args:*— arguments invalides pour le toolloop_detected:per_tool_limit—per_tool_limitdépassé pour un tool (protection contre le tool spam même avec des args différents)loop_detected:signature_repeat— le mêmetool+argsest répété au-delà de la limite autorisée
Ce que vous pouvez essayer ensuite
- Réduis
POLICY.auto_refund_limit_usdà500et observe commentescalatese déclenche plus souvent. - Mets
simulate_human_approvalen mode refus et vérifiehuman_rejected. - Retire
send_refund_emaildeALLOWED_TOOLSet vérifietool_denied:*. - Retire
reasondeissue_refundet vérifie comment le supervisor renvoiereviseavec auto-remplissage de la raison. - Ajoute un champ
risk_scoreaux décisions du supervisor et affiche-le danstracepour l’alerting.