Esencia del patrón (breve)
Supervisor Agent es un patrón en el que un worker agent propone la siguiente acción y una capa supervisor separada decide si se puede ejecutar.
El LLM decide qué hacer después, y la supervisor policy decide si es seguro y permitido antes de ejecutar.
Qué demuestra este ejemplo
- worker loop donde el LLM propone una acción (
toolofinal) - supervisor review separado antes de cada tool call
- decisiones del supervisor:
approve,revise,block,escalate - simulación de human approval para montos altos de reembolso
- policy boundary entre intent (LLM) y execution (tools)
- allowlist de tools, presupuestos de run y loop detection
stop_reasonexplícitos y audit trace para monitoreo en producción
Arquitectura
- El worker propone action JSON.
- La action se valida (
validate_worker_action). - El supervisor hace review y devuelve una decisión (
approve/revise/block/escalate). - ToolGateway ejecuta solo la acción permitida.
- Observation + decisión del supervisor se escriben en
history. - El worker ve
historyy da el siguiente paso o devuelvefinal.
El supervisor no ejecuta lógica de negocio por sí mismo: solo decide si el siguiente paso es admisible.
Estructura del proyecto
examples/
└── agent-patterns/
└── supervisor-agent/
└── python/
├── main.py # Worker -> Supervisor Review -> Execute -> Final
├── llm.py # worker decision + final synthesis
├── supervisor.py # policy review + human approval simulation
├── gateway.py # action validation + allowlist + loop/budget control
├── tools.py # deterministic business tools (refund flow)
└── requirements.txt
Cómo ejecutar
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/supervisor-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Se requiere Python 3.11+.
Opción con export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Opción con .env (opcional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
Esta es la variante de shell (macOS/Linux). En Windows es más fácil usar variables con set o, si quieres, python-dotenv para cargar .env automáticamente.
Tarea
Imagina un caso real de soporte en producción:
"El cliente solicita un reembolso de 1200 USD por un plan anual pagado hace 10 días."
El worker no debe ejecutar automáticamente un reembolso de ese monto. Debe:
- recopilar contexto
- proponer una acción
- dejar que el supervisor la revise contra políticas
- escalar a una persona si hace falta
- ejecutar solo la acción aprobada
Solución
En este ejemplo:
- el worker LLM propone una secuencia de acciones
- el supervisor controla cada tool call
- un high-risk refund se escala a una persona
- una persona (simulación) aprueba el monto ajustado
- solo después ocurren la ejecución y la respuesta final
Este es el patrón Supervisor: la ejecución no puede saltarse el policy review.
Código
tools.py — tools de negocio (solo hechos y ejecución)
from __future__ import annotations
from typing import Any
USERS = {
42: {"id": 42, "name": "Anna", "country": "US", "tier": "enterprise"},
7: {"id": 7, "name": "Max", "country": "US", "tier": "pro"},
}
BILLING = {
42: {
"plan": "annual_enterprise",
"currency": "USD",
"last_charge_usd": 1200.0,
"days_since_payment": 10,
},
7: {
"plan": "pro_monthly",
"currency": "USD",
"last_charge_usd": 49.0,
"days_since_payment": 35,
},
}
def get_refund_context(user_id: int) -> dict[str, Any]:
user = USERS.get(user_id)
billing = BILLING.get(user_id)
if not user or not billing:
return {"error": f"context_not_found_for_user:{user_id}"}
return {
"user": user,
"billing": billing,
"policy_hint": {
"auto_refund_limit_usd": 1000.0,
"refund_window_days": 14,
},
}
def issue_refund(
user_id: int,
amount_usd: float,
reason: str,
) -> dict[str, Any]:
user = USERS.get(user_id)
billing = BILLING.get(user_id)
if not user or not billing:
return {"status": "error", "error": f"refund_user_not_found:{user_id}"}
if amount_usd <= 0:
return {"status": "error", "error": "refund_amount_must_be_positive"}
paid = float(billing["last_charge_usd"])
if amount_usd > paid:
return {
"status": "error",
"error": "refund_exceeds_last_charge",
"max_refund_usd": paid,
}
return {
"status": "ok",
"refund": {
"user_id": user_id,
"currency": "USD",
"amount_usd": round(float(amount_usd), 2),
"reason": reason.strip(),
"transaction_id": f"rf_{user_id}_20260226",
},
}
def send_refund_email(user_id: int, amount_usd: float, message: str) -> dict[str, Any]:
user = USERS.get(user_id)
if not user:
return {"status": "error", "error": f"email_user_not_found:{user_id}"}
return {
"status": "ok",
"email": {
"to": f"{user['name'].lower()}@example.com",
"template": "refund_confirmation_v2",
"amount_usd": round(float(amount_usd), 2),
"message": message,
"email_id": f"em_{user_id}_20260226",
},
}
Lo más importante aquí (en simple)
- Las tools no toman decisiones de policy: solo devuelven hechos o ejecutan una acción.
- Las restricciones de seguridad aquí son mínimas; el control principal lo hace el supervisor.
gateway.py — execution boundary (validación y control)
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_steps: int = 8
max_tool_calls: int = 5
max_seconds: int = 30
TOOL_ARG_TYPES: dict[str, dict[str, str]] = {
"get_refund_context": {"user_id": "int"},
"issue_refund": {"user_id": "int", "amount_usd": "number", "reason": "str?"},
"send_refund_email": {"user_id": "int", "amount_usd": "number", "message": "str"},
}
def _stable_json(value: Any) -> str:
if value is None or isinstance(value, (bool, int, float, str)):
return json.dumps(value, ensure_ascii=True, sort_keys=True)
if isinstance(value, list):
return "[" + ",".join(_stable_json(v) for v in value) + "]"
if isinstance(value, dict):
parts = []
for key in sorted(value):
parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
return "{" + ",".join(parts) + "}"
return json.dumps(str(value), ensure_ascii=True)
def _normalize_for_hash(value: Any) -> Any:
if isinstance(value, str):
return " ".join(value.strip().split())
if isinstance(value, list):
return [_normalize_for_hash(item) for item in value]
if isinstance(value, dict):
return {str(key): _normalize_for_hash(value[key]) for key in sorted(value)}
return value
def args_hash(args: dict[str, Any]) -> str:
normalized = _normalize_for_hash(args or {})
raw = _stable_json(normalized)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
def _is_number(value: Any) -> bool:
return isinstance(value, (int, float)) and not isinstance(value, bool)
def _validate_tool_args(name: str, args: dict[str, Any]) -> dict[str, Any]:
spec = TOOL_ARG_TYPES.get(name)
if spec is None:
raise StopRun(f"invalid_action:unknown_tool:{name}")
extra = set(args.keys()) - set(spec.keys())
if extra:
raise StopRun(f"invalid_action:extra_tool_args:{name}")
normalized: dict[str, Any] = {}
for arg_name, expected in spec.items():
is_optional = expected.endswith("?")
expected_base = expected[:-1] if is_optional else expected
if arg_name not in args:
if is_optional:
continue
raise StopRun(f"invalid_action:missing_required_arg:{name}:{arg_name}")
value = args[arg_name]
if expected_base == "int":
if not isinstance(value, int) or isinstance(value, bool):
raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
normalized[arg_name] = value
continue
if expected_base == "number":
if not _is_number(value):
raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
normalized[arg_name] = float(value)
continue
if expected_base == "str":
if not isinstance(value, str) or not value.strip():
raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
normalized[arg_name] = value.strip()
continue
raise StopRun(f"invalid_action:unknown_arg_spec:{name}:{arg_name}")
return normalized
def validate_worker_action(action: Any) -> dict[str, Any]:
if not isinstance(action, dict):
raise StopRun("invalid_action:not_object")
kind = action.get("kind")
if kind == "invalid":
raise StopRun("invalid_action:non_json")
if kind == "final":
allowed_keys = {"kind", "answer"}
if set(action.keys()) - allowed_keys:
raise StopRun("invalid_action:extra_keys_final")
answer = action.get("answer")
if not isinstance(answer, str) or not answer.strip():
raise StopRun("invalid_action:bad_final_answer")
return {"kind": "final", "answer": answer.strip()}
if kind == "tool":
allowed_keys = {"kind", "name", "args"}
if set(action.keys()) - allowed_keys:
raise StopRun("invalid_action:extra_keys_tool")
name = action.get("name")
if not isinstance(name, str) or not name.strip():
raise StopRun("invalid_action:bad_tool_name")
args = action.get("args", {})
if args is None:
args = {}
if not isinstance(args, dict):
raise StopRun("invalid_action:bad_tool_args")
normalized_args = _validate_tool_args(name.strip(), args)
return {"kind": "tool", "name": name.strip(), "args": normalized_args}
raise StopRun("invalid_action:bad_kind")
class ToolGateway:
def __init__(
self,
*,
allow: set[str],
registry: dict[str, Callable[..., dict[str, Any]]],
budget: Budget,
):
self.allow = set(allow)
self.registry = registry
self.budget = budget
self.tool_calls = 0
self.seen_call_counts: dict[str, int] = {}
self.per_tool_counts: dict[str, int] = {}
self.read_only_repeat_limit: dict[str, int] = {
"get_refund_context": 2,
}
self.per_tool_limit: dict[str, int] = {
"get_refund_context": 3,
"issue_refund": 2,
"send_refund_email": 2,
}
def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
self.tool_calls += 1
if self.tool_calls > self.budget.max_tool_calls:
raise StopRun("max_tool_calls")
if name not in self.allow:
raise StopRun(f"tool_denied:{name}")
fn = self.registry.get(name)
if fn is None:
raise StopRun(f"tool_missing:{name}")
count_for_tool = self.per_tool_counts.get(name, 0) + 1
if count_for_tool > self.per_tool_limit.get(name, 2):
raise StopRun("loop_detected:per_tool_limit")
self.per_tool_counts[name] = count_for_tool
signature = f"{name}:{args_hash(args)}"
seen = self.seen_call_counts.get(signature, 0) + 1
allowed_repeats = self.read_only_repeat_limit.get(name, 1)
if seen > allowed_repeats:
raise StopRun("loop_detected:signature_repeat")
self.seen_call_counts[signature] = seen
try:
out = fn(**args)
except TypeError as exc:
raise StopRun(f"tool_bad_args:{name}") from exc
except Exception as exc:
raise StopRun(f"tool_error:{name}") from exc
if not isinstance(out, dict):
raise StopRun(f"tool_bad_result:{name}")
return out
Lo más importante aquí (en simple)
- Gateway hace un schema-check mínimo de args (tipos, campos required + optional) antes de la ejecución.
- Gateway controla la ejecución técnica: allowlist, budget, loop detection (tanto por
tool+argscomo por el número total de tool calls). - Gateway no rellena campos faltantes; solo valida el contrato de args.
- Para algunos campos (por ejemplo
issue_refund.reason), gateway permite optional, porque el supervisor puede agregarlos conrevise. Si el supervisor no los agrega, la ejecución se detiene contool_bad_args:*. - Incluso si el supervisor aprueba, la ejecución igual pasa solo por la capa controlada.
supervisor.py — policy review y human approval
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
HUMAN_APPROVAL_CAP_USD = 800.0
@dataclass(frozen=True)
class Policy:
auto_refund_limit_usd: float = 1000.0
max_refund_per_run_usd: float = 2000.0
@dataclass
class RuntimeState:
executed_refund_total_usd: float = 0.0
refund_executed: bool = False
has_context: bool = False
@dataclass
class Decision:
kind: str # approve | revise | block | escalate
reason: str
revised_action: dict[str, Any] | None = None
def review_action(action: dict[str, Any], state: RuntimeState, policy: Policy) -> Decision:
kind = action.get("kind")
if kind == "final":
if not state.has_context:
return Decision(kind="block", reason="final_requires_context")
return Decision(kind="approve", reason="final_with_context")
if kind != "tool":
return Decision(kind="block", reason="unknown_action_kind")
name = action.get("name")
args = dict(action.get("args") or {})
if name == "get_refund_context":
return Decision(kind="approve", reason="read_only_context")
if name == "issue_refund":
try:
amount = float(args.get("amount_usd", 0.0))
except (TypeError, ValueError):
return Decision(kind="block", reason="invalid_refund_amount_type")
if amount <= 0:
return Decision(kind="block", reason="invalid_refund_amount")
remaining = policy.max_refund_per_run_usd - state.executed_refund_total_usd
if remaining <= 0:
return Decision(kind="block", reason="refund_budget_exhausted")
if amount > remaining:
revised = {
"kind": "tool",
"name": name,
"args": {**args, "amount_usd": round(remaining, 2)},
}
return Decision(kind="revise", reason="cap_to_remaining_run_budget", revised_action=revised)
reason = str(args.get("reason", "")).strip()
if not reason:
revised = {
"kind": "tool",
"name": name,
"args": {**args, "reason": "Customer requested refund within policy review"},
}
return Decision(kind="revise", reason="refund_reason_required", revised_action=revised)
if amount > policy.auto_refund_limit_usd:
return Decision(kind="escalate", reason="high_refund_requires_human")
return Decision(kind="approve", reason="refund_within_auto_limit")
if name == "send_refund_email":
if not state.refund_executed:
return Decision(kind="block", reason="email_before_refund")
return Decision(kind="approve", reason="email_after_refund")
return Decision(kind="block", reason=f"unknown_tool_for_supervisor:{name}")
def simulate_human_approval(action: dict[str, Any]) -> dict[str, Any]:
name = action.get("name")
args = dict(action.get("args") or {})
if name != "issue_refund":
return {"approved": True, "revised_action": action, "comment": "approved_by_human"}
try:
requested = float(args.get("amount_usd", 0.0))
except (TypeError, ValueError):
return {"approved": False, "comment": "invalid_requested_amount_type"}
approved_amount = min(requested, HUMAN_APPROVAL_CAP_USD)
if approved_amount <= 0:
return {"approved": False, "comment": "invalid_requested_amount"}
revised_action = {
"kind": "tool",
"name": "issue_refund",
"args": {**args, "amount_usd": round(approved_amount, 2)},
}
return {
"approved": True,
"revised_action": revised_action,
"comment": f"approved_with_cap:{approved_amount}",
}
Lo más importante aquí (en simple)
- El supervisor toma decisiones antes de la ejecución, no después.
finaltambién pasa por supervisor review (sin contexto devuelveblock).- Un tipo inválido en
amount_usdno rompe con excepción; devuelve una decisiónblockcontrolada. - El supervisor puede hacer
revisepara agregar campos requeridos por policy (por ejemploreasonparaissue_refund). escalateaquí muestra una opción práctica: una persona puede aprobar la acción con un monto ajustado.
llm.py — worker decisions + final synthesis
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
WORKER_SYSTEM_PROMPT = """
You are a support worker agent.
Return exactly one JSON object in one of these shapes:
1) {"kind":"tool","name":"<tool_name>","args":{...}}
2) {"kind":"final","answer":"<short final answer>"}
Rules:
- First, collect facts with get_refund_context.
- Then propose issue_refund when facts are sufficient.
- Use send_refund_email only after refund success is visible in history.
- Use only tools from available_tools.
- Do not output markdown or extra keys.
""".strip()
FINAL_SYSTEM_PROMPT = """
You are a customer support assistant.
Write a short final answer in English.
Include:
- final refund amount in USD
- whether human approval was required
- one reason based on policy
""".strip()
TOOL_CATALOG = [
{
"name": "get_refund_context",
"description": "Get user profile, billing facts, and policy hints",
"args": {"user_id": "integer"},
},
{
"name": "issue_refund",
"description": "Issue refund in USD",
"args": {"user_id": "integer", "amount_usd": "number", "reason": "optional string"},
},
{
"name": "send_refund_email",
"description": "Send refund confirmation email",
"args": {"user_id": "integer", "amount_usd": "number", "message": "string"},
},
]
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _build_state_summary(history: list[dict[str, Any]]) -> dict[str, Any]:
tools_used = [
step.get("action", {}).get("name")
for step in history
if isinstance(step, dict)
and isinstance(step.get("action"), dict)
and step.get("action", {}).get("kind") == "tool"
]
supervisor_decisions = [
step.get("supervisor", {}).get("decision")
for step in history
if isinstance(step, dict) and isinstance(step.get("supervisor"), dict)
]
return {
"steps_completed": len(history),
"tools_used": [t for t in tools_used if t],
"supervisor_decisions": [d for d in supervisor_decisions if d],
"last_step": history[-1] if history else None,
}
def _recent_step_summaries(history: list[dict[str, Any]], limit: int = 3) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for step in history[-limit:]:
out.append(
{
"step": step.get("step"),
"proposed_tool": step.get("action", {}).get("name"),
"supervisor_decision": step.get("supervisor", {}).get("decision"),
"executed_tool": step.get("executed_action", {}).get("name"),
"executed_from": step.get("executed_from"),
"observation_status": step.get("observation", {}).get("status"),
}
)
return out
def decide_next_action(
goal: str,
history: list[dict[str, Any]],
) -> dict[str, Any]:
payload = {
"goal": goal,
"state_summary": _build_state_summary(history),
"recent_step_summaries": _recent_step_summaries(history, limit=3),
"available_tools": TOOL_CATALOG,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": WORKER_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
payload = {
"goal": goal,
"history": history,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = (completion.choices[0].message.content or "").strip()
if not text:
raise LLMEmpty("llm_empty")
return text
Lo más importante aquí (en simple)
- El worker toma decisiones, pero no tiene derecho a ejecutar una acción directamente.
- El contexto del worker incluye decisiones del supervisor y resúmenes compactos de pasos recientes sin payload "pesado".
main.py — Worker -> Supervisor -> Execute -> Final
from __future__ import annotations
import json
import time
from typing import Any
from gateway import Budget, StopRun, ToolGateway, args_hash, validate_worker_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, decide_next_action
from supervisor import Decision, Policy, RuntimeState, review_action, simulate_human_approval
from tools import get_refund_context, issue_refund, send_refund_email
GOAL = (
"User Anna (user_id=42) requests a refund for a recent annual plan charge of 1200 USD. "
"Apply supervisor policy before any refund execution and provide a short final response."
)
BUDGET = Budget(max_steps=8, max_tool_calls=5, max_seconds=30)
POLICY = Policy(
auto_refund_limit_usd=1000.0,
max_refund_per_run_usd=2000.0,
)
TOOL_REGISTRY = {
"get_refund_context": get_refund_context,
"issue_refund": issue_refund,
"send_refund_email": send_refund_email,
}
ALLOWED_TOOLS = {"get_refund_context", "issue_refund", "send_refund_email"}
def _decision_payload(decision: Decision) -> dict[str, Any]:
payload = {
"decision": decision.kind,
"reason": decision.reason,
}
if decision.revised_action:
payload["revised_action"] = decision.revised_action
return payload
def run_supervised_flow(goal: str) -> dict[str, Any]:
started = time.monotonic()
history: list[dict[str, Any]] = []
trace: list[dict[str, Any]] = []
state = RuntimeState()
gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)
for step in range(1, BUDGET.max_steps + 1):
if (time.monotonic() - started) > BUDGET.max_seconds:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"trace": trace,
"history": history,
}
try:
raw_action = decide_next_action(goal=goal, history=history)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "worker",
"trace": trace,
"history": history,
}
try:
action = validate_worker_action(raw_action)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "worker",
"raw_action": raw_action,
"trace": trace,
"history": history,
}
decision = review_action(action=action, state=state, policy=POLICY)
supervisor_info = _decision_payload(decision)
action_to_execute = action
human_info: dict[str, Any] | None = None
executed_from = "original"
if decision.kind == "block":
trace_row = {
"step": step,
"tool": action.get("name", "final"),
"supervisor_decision": "block",
"executed_from": executed_from,
"stop_reason": f"supervisor_block:{decision.reason}",
"ok": False,
}
if action.get("kind") == "tool":
trace_row["args_hash"] = args_hash(action.get("args", {}))
trace.append(trace_row)
return {
"status": "stopped",
"stop_reason": f"supervisor_block:{decision.reason}",
"phase": "supervisor",
"action": action,
"trace": trace,
"history": history,
}
if decision.kind == "revise" and decision.revised_action:
action_to_execute = decision.revised_action
executed_from = "supervisor_revised"
if decision.kind == "escalate":
human = simulate_human_approval(action)
human_info = human
if not human.get("approved"):
trace.append(
{
"step": step,
"tool": action["name"],
"args_hash": args_hash(action["args"]),
"supervisor_decision": "escalate",
"executed_from": executed_from,
"human_approved": False,
"stop_reason": "human_rejected",
"ok": False,
}
)
return {
"status": "stopped",
"stop_reason": "human_rejected",
"phase": "human_approval",
"action": action,
"trace": trace,
"history": history,
}
revised = human.get("revised_action")
if isinstance(revised, dict):
action_to_execute = revised
executed_from = "human_revised"
if action_to_execute["kind"] == "final":
trace.append(
{
"step": step,
"tool": "final",
"supervisor_decision": decision.kind,
"executed_from": executed_from,
"ok": True,
}
)
history.append(
{
"step": step,
"action": action,
"supervisor": supervisor_info,
"executed_action": action_to_execute,
"executed_from": executed_from,
"observation": {"status": "final"},
}
)
return {
"status": "ok",
"stop_reason": "success",
"answer": action_to_execute["answer"],
"trace": trace,
"history": history,
}
tool_name = action_to_execute["name"]
tool_args = action_to_execute["args"]
try:
observation = gateway.call(tool_name, tool_args)
trace_row = {
"step": step,
"tool": tool_name,
"args_hash": args_hash(tool_args),
"supervisor_decision": decision.kind,
"executed_from": executed_from,
"ok": True,
}
if human_info is not None:
trace_row["human_approved"] = bool(human_info.get("approved"))
trace.append(trace_row)
except StopRun as exc:
trace_row = {
"step": step,
"tool": tool_name,
"args_hash": args_hash(tool_args),
"supervisor_decision": decision.kind,
"executed_from": executed_from,
"ok": False,
"stop_reason": exc.reason,
}
if human_info is not None:
trace_row["human_approved"] = bool(human_info.get("approved"))
trace.append(trace_row)
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "execution",
"action": action_to_execute,
"trace": trace,
"history": history,
}
if tool_name == "get_refund_context" and "error" not in observation:
state.has_context = True
if tool_name == "issue_refund" and observation.get("status") == "ok":
amount = float(observation.get("refund", {}).get("amount_usd", 0.0))
state.executed_refund_total_usd += amount
state.refund_executed = True
history_entry = {
"step": step,
"action": action,
"supervisor": supervisor_info,
"executed_action": action_to_execute,
"executed_from": executed_from,
"observation": observation,
}
if human_info is not None:
history_entry["human_approval"] = human_info
history.append(history_entry)
try:
answer = compose_final_answer(goal=goal, history=history)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "finalize",
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"phase": "finalize",
"trace": trace,
"history": history,
}
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"trace": trace,
"history": history,
}
def main() -> None:
result = run_supervised_flow(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Lo más importante aquí (en simple)
- El worker no puede saltarse el supervisor review: cada tool call pasa policy check.
escalatelleva a human approval, y solo después se ejecuta la acción.- En
tracese ve la fuente de la acción ejecutada:original,supervisor_revisedohuman_revised. - Un
finaltemprano singet_refund_contextse bloquea comosupervisor_block:final_requires_context. trace+historydan una auditoría completa: qué propuso el worker, qué decidió el supervisor y qué se ejecutó realmente.
requirements.txt
openai==2.21.0
Ejemplo de salida
Abajo hay un ejemplo de ejecución válida donde el worker, justo después de recopilar contexto, propone un reembolso parcial de 1000 USD, y el supervisor da approve.
{
"status": "ok",
"stop_reason": "success",
"answer": "A partial refund of 1000 USD has been approved and processed for Anna as per supervisor policy, and a confirmation email has been sent.",
"trace": [
{
"step": 1,
"tool": "get_refund_context",
"args_hash": "feaa769a39ae",
"supervisor_decision": "approve",
"executed_from": "original",
"ok": true
},
{
"step": 2,
"tool": "issue_refund",
"args_hash": "36ffe69cb606",
"supervisor_decision": "approve",
"executed_from": "original",
"ok": true
},
{
"step": 3,
"tool": "send_refund_email",
"args_hash": "ff6ec44bb2fa",
"supervisor_decision": "approve",
"executed_from": "original",
"ok": true
},
{
"step": 4,
"tool": "final",
"supervisor_decision": "approve",
"executed_from": "original",
"ok": true
}
],
"history": [
{"step": 1, "action": {"kind": "tool", "name": "get_refund_context", "args": {"user_id": 42}}, "supervisor": {"decision": "approve", "reason": "read_only_context"}, "executed_action": {"kind": "tool", "name": "get_refund_context", "args": {"user_id": 42}}, "executed_from": "original", "observation": {...}},
{"step": 2, "action": {"kind": "tool", "name": "issue_refund", "args": {"user_id": 42, "amount_usd": 1000.0, "reason": "..."}}, "supervisor": {"decision": "approve", "reason": "refund_within_auto_limit"}, "executed_action": {"kind": "tool", "name": "issue_refund", "args": {"user_id": 42, "amount_usd": 1000.0, "reason": "..."}}, "executed_from": "original", "observation": {...}},
{"step": 3, "action": {"kind": "tool", "name": "send_refund_email", "args": {"user_id": 42, "amount_usd": 1000.0, "message": "..."}}, "supervisor": {"decision": "approve", "reason": "email_after_refund"}, "executed_action": {"kind": "tool", "name": "send_refund_email", "args": {"user_id": 42, "amount_usd": 1000.0, "message": "..."}}, "executed_from": "original", "observation": {...}},
{"step": 4, "action": {"kind": "final", "answer": "..."}, "supervisor": {"decision": "approve", "reason": "final_with_context"}, "executed_action": {"kind": "final", "answer": "..."}, "executed_from": "original", "observation": {"status": "final"}}
]
}
Este es un ejemplo abreviado: history se recorta intencionalmente a campos clave para legibilidad.
Qué NO se muestra aquí
- No hay una UI/cola integrada real de human-in-the-loop.
- No hay role-based access ni aislamiento multi-tenant.
- No hay políticas complejas de retry/backoff para LLM.
- No hay orquestación completa de retry/backoff para write-tools; el loop guard aquí es intencionalmente conservador.
- Optional args (por ejemplo
issue_refund.reason) se rellenan mediante supervisorrevisedentro del contrato de demo. - No hay presupuestos por tokens/costo (cost guardrails).
Valores típicos de stop_reason
success— el worker completó el escenario y devolvió la respuesta finalinvalid_action:*— el worker devolvió action JSON inválidoinvalid_action:bad_arg_type:*— en tool args hay un valor con tipo inválido (por ejemplo,amount_usdno es number)supervisor_block:*— el supervisor bloqueó la acción por policyhuman_rejected— la escalada a una persona fue rechazadamax_tool_calls— se agotó el límite de tool callsmax_seconds— se excedió el time budget del runllm_timeout— el LLM no respondió dentro deOPENAI_TIMEOUT_SECONDSllm_empty— la respuesta final del LLM está vacíatool_denied:*— el tool no está en la execution allowlisttool_missing:*— el tool falta en el registrytool_bad_args:*— argumentos inválidos para el toolloop_detected:per_tool_limit— se excedióper_tool_limitpara un tool (protección contra tool spam incluso con args distintos)loop_detected:signature_repeat— el mismotool+argsse repitió por encima del límite permitido
Qué probar después
- Reduce
POLICY.auto_refund_limit_usda500y mira cómoescalatese activa más seguido. - Pon
simulate_human_approvalen modo rechazo y verificahuman_rejected. - Quita
send_refund_emaildeALLOWED_TOOLSy verificatool_denied:*. - Quita
reasondeissue_refundy verifica cómo el supervisor devuelverevisecon autocompletado de motivo. - Agrega un campo
risk_scorea las decisiones del supervisor y muéstralo entracepara alerting.