Supervisor Agent — Python (implementación completa con LLM)

Ejemplo ejecutable de agente Supervisor en Python con estilo de producción, con policy review, decisiones approve/revise/block/escalate, human approval, presupuestos y stop reasons.
En esta página
  1. Esencia del patrón (breve)
  2. Qué demuestra este ejemplo
  3. Arquitectura
  4. Estructura del proyecto
  5. Cómo ejecutar
  6. Tarea
  7. Solución
  8. Código
  9. tools.py — tools de negocio (solo hechos y ejecución)
  10. gateway.py — execution boundary (validación y control)
  11. supervisor.py — policy review y human approval
  12. llm.py — worker decisions + final synthesis
  13. main.py — Worker -> Supervisor -> Execute -> Final
  14. requirements.txt
  15. Ejemplo de salida
  16. Qué NO se muestra aquí
  17. Valores típicos de stop_reason
  18. Qué probar después

Esencia del patrón (breve)

Supervisor Agent es un patrón en el que un worker agent propone la siguiente acción y una capa supervisor separada decide si se puede ejecutar.

El LLM decide qué hacer después, y la supervisor policy decide si es seguro y permitido antes de ejecutar.


Qué demuestra este ejemplo

  • worker loop donde el LLM propone una acción (tool o final)
  • supervisor review separado antes de cada tool call
  • decisiones del supervisor: approve, revise, block, escalate
  • simulación de human approval para montos altos de reembolso
  • policy boundary entre intent (LLM) y execution (tools)
  • allowlist de tools, presupuestos de run y loop detection
  • stop_reason explícitos y audit trace para monitoreo en producción

Arquitectura

  1. El worker propone action JSON.
  2. La action se valida (validate_worker_action).
  3. El supervisor hace review y devuelve una decisión (approve/revise/block/escalate).
  4. ToolGateway ejecuta solo la acción permitida.
  5. Observation + decisión del supervisor se escriben en history.
  6. El worker ve history y da el siguiente paso o devuelve final.

El supervisor no ejecuta lógica de negocio por sí mismo: solo decide si el siguiente paso es admisible.


Estructura del proyecto

TEXT
examples/
└── agent-patterns/
    └── supervisor-agent/
        └── python/
            ├── main.py           # Worker -> Supervisor Review -> Execute -> Final
            ├── llm.py            # worker decision + final synthesis
            ├── supervisor.py     # policy review + human approval simulation
            ├── gateway.py        # action validation + allowlist + loop/budget control
            ├── tools.py          # deterministic business tools (refund flow)
            └── requirements.txt

Cómo ejecutar

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/supervisor-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Se requiere Python 3.11+.

Opción con export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Opción con .env (opcional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Esta es la variante de shell (macOS/Linux). En Windows es más fácil usar variables con set o, si quieres, python-dotenv para cargar .env automáticamente.


Tarea

Imagina un caso real de soporte en producción:

"El cliente solicita un reembolso de 1200 USD por un plan anual pagado hace 10 días."

El worker no debe ejecutar automáticamente un reembolso de ese monto. Debe:

  • recopilar contexto
  • proponer una acción
  • dejar que el supervisor la revise contra políticas
  • escalar a una persona si hace falta
  • ejecutar solo la acción aprobada

Solución

En este ejemplo:

  • el worker LLM propone una secuencia de acciones
  • el supervisor controla cada tool call
  • un high-risk refund se escala a una persona
  • una persona (simulación) aprueba el monto ajustado
  • solo después ocurren la ejecución y la respuesta final

Este es el patrón Supervisor: la ejecución no puede saltarse el policy review.


Código

tools.py — tools de negocio (solo hechos y ejecución)

PYTHON
from __future__ import annotations

from typing import Any

USERS = {
    42: {"id": 42, "name": "Anna", "country": "US", "tier": "enterprise"},
    7: {"id": 7, "name": "Max", "country": "US", "tier": "pro"},
}

BILLING = {
    42: {
        "plan": "annual_enterprise",
        "currency": "USD",
        "last_charge_usd": 1200.0,
        "days_since_payment": 10,
    },
    7: {
        "plan": "pro_monthly",
        "currency": "USD",
        "last_charge_usd": 49.0,
        "days_since_payment": 35,
    },
}


def get_refund_context(user_id: int) -> dict[str, Any]:
    user = USERS.get(user_id)
    billing = BILLING.get(user_id)
    if not user or not billing:
        return {"error": f"context_not_found_for_user:{user_id}"}

    return {
        "user": user,
        "billing": billing,
        "policy_hint": {
            "auto_refund_limit_usd": 1000.0,
            "refund_window_days": 14,
        },
    }


def issue_refund(
    user_id: int,
    amount_usd: float,
    reason: str,
) -> dict[str, Any]:
    user = USERS.get(user_id)
    billing = BILLING.get(user_id)
    if not user or not billing:
        return {"status": "error", "error": f"refund_user_not_found:{user_id}"}

    if amount_usd <= 0:
        return {"status": "error", "error": "refund_amount_must_be_positive"}

    paid = float(billing["last_charge_usd"])
    if amount_usd > paid:
        return {
            "status": "error",
            "error": "refund_exceeds_last_charge",
            "max_refund_usd": paid,
        }

    return {
        "status": "ok",
        "refund": {
            "user_id": user_id,
            "currency": "USD",
            "amount_usd": round(float(amount_usd), 2),
            "reason": reason.strip(),
            "transaction_id": f"rf_{user_id}_20260226",
        },
    }


def send_refund_email(user_id: int, amount_usd: float, message: str) -> dict[str, Any]:
    user = USERS.get(user_id)
    if not user:
        return {"status": "error", "error": f"email_user_not_found:{user_id}"}

    return {
        "status": "ok",
        "email": {
            "to": f"{user['name'].lower()}@example.com",
            "template": "refund_confirmation_v2",
            "amount_usd": round(float(amount_usd), 2),
            "message": message,
            "email_id": f"em_{user_id}_20260226",
        },
    }

Lo más importante aquí (en simple)

  • Las tools no toman decisiones de policy: solo devuelven hechos o ejecutan una acción.
  • Las restricciones de seguridad aquí son mínimas; el control principal lo hace el supervisor.

gateway.py — execution boundary (validación y control)

PYTHON
from __future__ import annotations

import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_steps: int = 8
    max_tool_calls: int = 5
    max_seconds: int = 30


TOOL_ARG_TYPES: dict[str, dict[str, str]] = {
    "get_refund_context": {"user_id": "int"},
    "issue_refund": {"user_id": "int", "amount_usd": "number", "reason": "str?"},
    "send_refund_email": {"user_id": "int", "amount_usd": "number", "message": "str"},
}


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(v) for v in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def _normalize_for_hash(value: Any) -> Any:
    if isinstance(value, str):
        return " ".join(value.strip().split())
    if isinstance(value, list):
        return [_normalize_for_hash(item) for item in value]
    if isinstance(value, dict):
        return {str(key): _normalize_for_hash(value[key]) for key in sorted(value)}
    return value


def args_hash(args: dict[str, Any]) -> str:
    normalized = _normalize_for_hash(args or {})
    raw = _stable_json(normalized)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def _is_number(value: Any) -> bool:
    return isinstance(value, (int, float)) and not isinstance(value, bool)


def _validate_tool_args(name: str, args: dict[str, Any]) -> dict[str, Any]:
    spec = TOOL_ARG_TYPES.get(name)
    if spec is None:
        raise StopRun(f"invalid_action:unknown_tool:{name}")

    extra = set(args.keys()) - set(spec.keys())
    if extra:
        raise StopRun(f"invalid_action:extra_tool_args:{name}")

    normalized: dict[str, Any] = {}
    for arg_name, expected in spec.items():
        is_optional = expected.endswith("?")
        expected_base = expected[:-1] if is_optional else expected

        if arg_name not in args:
            if is_optional:
                continue
            raise StopRun(f"invalid_action:missing_required_arg:{name}:{arg_name}")
        value = args[arg_name]

        if expected_base == "int":
            if not isinstance(value, int) or isinstance(value, bool):
                raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
            normalized[arg_name] = value
            continue

        if expected_base == "number":
            if not _is_number(value):
                raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
            normalized[arg_name] = float(value)
            continue

        if expected_base == "str":
            if not isinstance(value, str) or not value.strip():
                raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
            normalized[arg_name] = value.strip()
            continue

        raise StopRun(f"invalid_action:unknown_arg_spec:{name}:{arg_name}")

    return normalized


def validate_worker_action(action: Any) -> dict[str, Any]:
    if not isinstance(action, dict):
        raise StopRun("invalid_action:not_object")

    kind = action.get("kind")
    if kind == "invalid":
        raise StopRun("invalid_action:non_json")

    if kind == "final":
        allowed_keys = {"kind", "answer"}
        if set(action.keys()) - allowed_keys:
            raise StopRun("invalid_action:extra_keys_final")
        answer = action.get("answer")
        if not isinstance(answer, str) or not answer.strip():
            raise StopRun("invalid_action:bad_final_answer")
        return {"kind": "final", "answer": answer.strip()}

    if kind == "tool":
        allowed_keys = {"kind", "name", "args"}
        if set(action.keys()) - allowed_keys:
            raise StopRun("invalid_action:extra_keys_tool")

        name = action.get("name")
        if not isinstance(name, str) or not name.strip():
            raise StopRun("invalid_action:bad_tool_name")

        args = action.get("args", {})
        if args is None:
            args = {}
        if not isinstance(args, dict):
            raise StopRun("invalid_action:bad_tool_args")

        normalized_args = _validate_tool_args(name.strip(), args)
        return {"kind": "tool", "name": name.strip(), "args": normalized_args}

    raise StopRun("invalid_action:bad_kind")


class ToolGateway:
    def __init__(
        self,
        *,
        allow: set[str],
        registry: dict[str, Callable[..., dict[str, Any]]],
        budget: Budget,
    ):
        self.allow = set(allow)
        self.registry = registry
        self.budget = budget
        self.tool_calls = 0
        self.seen_call_counts: dict[str, int] = {}
        self.per_tool_counts: dict[str, int] = {}
        self.read_only_repeat_limit: dict[str, int] = {
            "get_refund_context": 2,
        }
        self.per_tool_limit: dict[str, int] = {
            "get_refund_context": 3,
            "issue_refund": 2,
            "send_refund_email": 2,
        }

    def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
        self.tool_calls += 1
        if self.tool_calls > self.budget.max_tool_calls:
            raise StopRun("max_tool_calls")

        if name not in self.allow:
            raise StopRun(f"tool_denied:{name}")

        fn = self.registry.get(name)
        if fn is None:
            raise StopRun(f"tool_missing:{name}")

        count_for_tool = self.per_tool_counts.get(name, 0) + 1
        if count_for_tool > self.per_tool_limit.get(name, 2):
            raise StopRun("loop_detected:per_tool_limit")
        self.per_tool_counts[name] = count_for_tool

        signature = f"{name}:{args_hash(args)}"
        seen = self.seen_call_counts.get(signature, 0) + 1
        allowed_repeats = self.read_only_repeat_limit.get(name, 1)
        if seen > allowed_repeats:
            raise StopRun("loop_detected:signature_repeat")
        self.seen_call_counts[signature] = seen

        try:
            out = fn(**args)
        except TypeError as exc:
            raise StopRun(f"tool_bad_args:{name}") from exc
        except Exception as exc:
            raise StopRun(f"tool_error:{name}") from exc

        if not isinstance(out, dict):
            raise StopRun(f"tool_bad_result:{name}")
        return out

Lo más importante aquí (en simple)

  • Gateway hace un schema-check mínimo de args (tipos, campos required + optional) antes de la ejecución.
  • Gateway controla la ejecución técnica: allowlist, budget, loop detection (tanto por tool+args como por el número total de tool calls).
  • Gateway no rellena campos faltantes; solo valida el contrato de args.
  • Para algunos campos (por ejemplo issue_refund.reason), gateway permite optional, porque el supervisor puede agregarlos con revise. Si el supervisor no los agrega, la ejecución se detiene con tool_bad_args:*.
  • Incluso si el supervisor aprueba, la ejecución igual pasa solo por la capa controlada.

supervisor.py — policy review y human approval

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any

HUMAN_APPROVAL_CAP_USD = 800.0


@dataclass(frozen=True)
class Policy:
    auto_refund_limit_usd: float = 1000.0
    max_refund_per_run_usd: float = 2000.0


@dataclass
class RuntimeState:
    executed_refund_total_usd: float = 0.0
    refund_executed: bool = False
    has_context: bool = False


@dataclass
class Decision:
    kind: str  # approve | revise | block | escalate
    reason: str
    revised_action: dict[str, Any] | None = None


def review_action(action: dict[str, Any], state: RuntimeState, policy: Policy) -> Decision:
    kind = action.get("kind")
    if kind == "final":
        if not state.has_context:
            return Decision(kind="block", reason="final_requires_context")
        return Decision(kind="approve", reason="final_with_context")
    if kind != "tool":
        return Decision(kind="block", reason="unknown_action_kind")

    name = action.get("name")
    args = dict(action.get("args") or {})

    if name == "get_refund_context":
        return Decision(kind="approve", reason="read_only_context")

    if name == "issue_refund":
        try:
            amount = float(args.get("amount_usd", 0.0))
        except (TypeError, ValueError):
            return Decision(kind="block", reason="invalid_refund_amount_type")
        if amount <= 0:
            return Decision(kind="block", reason="invalid_refund_amount")

        remaining = policy.max_refund_per_run_usd - state.executed_refund_total_usd
        if remaining <= 0:
            return Decision(kind="block", reason="refund_budget_exhausted")

        if amount > remaining:
            revised = {
                "kind": "tool",
                "name": name,
                "args": {**args, "amount_usd": round(remaining, 2)},
            }
            return Decision(kind="revise", reason="cap_to_remaining_run_budget", revised_action=revised)

        reason = str(args.get("reason", "")).strip()
        if not reason:
            revised = {
                "kind": "tool",
                "name": name,
                "args": {**args, "reason": "Customer requested refund within policy review"},
            }
            return Decision(kind="revise", reason="refund_reason_required", revised_action=revised)

        if amount > policy.auto_refund_limit_usd:
            return Decision(kind="escalate", reason="high_refund_requires_human")

        return Decision(kind="approve", reason="refund_within_auto_limit")

    if name == "send_refund_email":
        if not state.refund_executed:
            return Decision(kind="block", reason="email_before_refund")
        return Decision(kind="approve", reason="email_after_refund")

    return Decision(kind="block", reason=f"unknown_tool_for_supervisor:{name}")


def simulate_human_approval(action: dict[str, Any]) -> dict[str, Any]:
    name = action.get("name")
    args = dict(action.get("args") or {})

    if name != "issue_refund":
        return {"approved": True, "revised_action": action, "comment": "approved_by_human"}

    try:
        requested = float(args.get("amount_usd", 0.0))
    except (TypeError, ValueError):
        return {"approved": False, "comment": "invalid_requested_amount_type"}
    approved_amount = min(requested, HUMAN_APPROVAL_CAP_USD)

    if approved_amount <= 0:
        return {"approved": False, "comment": "invalid_requested_amount"}

    revised_action = {
        "kind": "tool",
        "name": "issue_refund",
        "args": {**args, "amount_usd": round(approved_amount, 2)},
    }
    return {
        "approved": True,
        "revised_action": revised_action,
        "comment": f"approved_with_cap:{approved_amount}",
    }

Lo más importante aquí (en simple)

  • El supervisor toma decisiones antes de la ejecución, no después.
  • final también pasa por supervisor review (sin contexto devuelve block).
  • Un tipo inválido en amount_usd no rompe con excepción; devuelve una decisión block controlada.
  • El supervisor puede hacer revise para agregar campos requeridos por policy (por ejemplo reason para issue_refund).
  • escalate aquí muestra una opción práctica: una persona puede aprobar la acción con un monto ajustado.

llm.py — worker decisions + final synthesis

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


WORKER_SYSTEM_PROMPT = """
You are a support worker agent.
Return exactly one JSON object in one of these shapes:
1) {"kind":"tool","name":"<tool_name>","args":{...}}
2) {"kind":"final","answer":"<short final answer>"}

Rules:
- First, collect facts with get_refund_context.
- Then propose issue_refund when facts are sufficient.
- Use send_refund_email only after refund success is visible in history.
- Use only tools from available_tools.
- Do not output markdown or extra keys.
""".strip()

FINAL_SYSTEM_PROMPT = """
You are a customer support assistant.
Write a short final answer in English.
Include:
- final refund amount in USD
- whether human approval was required
- one reason based on policy
""".strip()

TOOL_CATALOG = [
    {
        "name": "get_refund_context",
        "description": "Get user profile, billing facts, and policy hints",
        "args": {"user_id": "integer"},
    },
    {
        "name": "issue_refund",
        "description": "Issue refund in USD",
        "args": {"user_id": "integer", "amount_usd": "number", "reason": "optional string"},
    },
    {
        "name": "send_refund_email",
        "description": "Send refund confirmation email",
        "args": {"user_id": "integer", "amount_usd": "number", "message": "string"},
    },
]


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def _build_state_summary(history: list[dict[str, Any]]) -> dict[str, Any]:
    tools_used = [
        step.get("action", {}).get("name")
        for step in history
        if isinstance(step, dict)
        and isinstance(step.get("action"), dict)
        and step.get("action", {}).get("kind") == "tool"
    ]

    supervisor_decisions = [
        step.get("supervisor", {}).get("decision")
        for step in history
        if isinstance(step, dict) and isinstance(step.get("supervisor"), dict)
    ]

    return {
        "steps_completed": len(history),
        "tools_used": [t for t in tools_used if t],
        "supervisor_decisions": [d for d in supervisor_decisions if d],
        "last_step": history[-1] if history else None,
    }


def _recent_step_summaries(history: list[dict[str, Any]], limit: int = 3) -> list[dict[str, Any]]:
    out: list[dict[str, Any]] = []
    for step in history[-limit:]:
        out.append(
            {
                "step": step.get("step"),
                "proposed_tool": step.get("action", {}).get("name"),
                "supervisor_decision": step.get("supervisor", {}).get("decision"),
                "executed_tool": step.get("executed_action", {}).get("name"),
                "executed_from": step.get("executed_from"),
                "observation_status": step.get("observation", {}).get("status"),
            }
        )
    return out


def decide_next_action(
    goal: str,
    history: list[dict[str, Any]],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "state_summary": _build_state_summary(history),
        "recent_step_summaries": _recent_step_summaries(history, limit=3),
        "available_tools": TOOL_CATALOG,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": WORKER_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
    payload = {
        "goal": goal,
        "history": history,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            messages=[
                {"role": "system", "content": FINAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = (completion.choices[0].message.content or "").strip()
    if not text:
        raise LLMEmpty("llm_empty")
    return text

Lo más importante aquí (en simple)

  • El worker toma decisiones, pero no tiene derecho a ejecutar una acción directamente.
  • El contexto del worker incluye decisiones del supervisor y resúmenes compactos de pasos recientes sin payload "pesado".

main.py — Worker -> Supervisor -> Execute -> Final

PYTHON
from __future__ import annotations

import json
import time
from typing import Any

from gateway import Budget, StopRun, ToolGateway, args_hash, validate_worker_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, decide_next_action
from supervisor import Decision, Policy, RuntimeState, review_action, simulate_human_approval
from tools import get_refund_context, issue_refund, send_refund_email

GOAL = (
    "User Anna (user_id=42) requests a refund for a recent annual plan charge of 1200 USD. "
    "Apply supervisor policy before any refund execution and provide a short final response."
)

BUDGET = Budget(max_steps=8, max_tool_calls=5, max_seconds=30)
POLICY = Policy(
    auto_refund_limit_usd=1000.0,
    max_refund_per_run_usd=2000.0,
)

TOOL_REGISTRY = {
    "get_refund_context": get_refund_context,
    "issue_refund": issue_refund,
    "send_refund_email": send_refund_email,
}

ALLOWED_TOOLS = {"get_refund_context", "issue_refund", "send_refund_email"}


def _decision_payload(decision: Decision) -> dict[str, Any]:
    payload = {
        "decision": decision.kind,
        "reason": decision.reason,
    }
    if decision.revised_action:
        payload["revised_action"] = decision.revised_action
    return payload


def run_supervised_flow(goal: str) -> dict[str, Any]:
    started = time.monotonic()
    history: list[dict[str, Any]] = []
    trace: list[dict[str, Any]] = []

    state = RuntimeState()
    gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)

    for step in range(1, BUDGET.max_steps + 1):
        if (time.monotonic() - started) > BUDGET.max_seconds:
            return {
                "status": "stopped",
                "stop_reason": "max_seconds",
                "trace": trace,
                "history": history,
            }

        try:
            raw_action = decide_next_action(goal=goal, history=history)
        except LLMTimeout:
            return {
                "status": "stopped",
                "stop_reason": "llm_timeout",
                "phase": "worker",
                "trace": trace,
                "history": history,
            }

        try:
            action = validate_worker_action(raw_action)
        except StopRun as exc:
            return {
                "status": "stopped",
                "stop_reason": exc.reason,
                "phase": "worker",
                "raw_action": raw_action,
                "trace": trace,
                "history": history,
            }

        decision = review_action(action=action, state=state, policy=POLICY)
        supervisor_info = _decision_payload(decision)

        action_to_execute = action
        human_info: dict[str, Any] | None = None
        executed_from = "original"

        if decision.kind == "block":
            trace_row = {
                "step": step,
                "tool": action.get("name", "final"),
                "supervisor_decision": "block",
                "executed_from": executed_from,
                "stop_reason": f"supervisor_block:{decision.reason}",
                "ok": False,
            }
            if action.get("kind") == "tool":
                trace_row["args_hash"] = args_hash(action.get("args", {}))
            trace.append(trace_row)
            return {
                "status": "stopped",
                "stop_reason": f"supervisor_block:{decision.reason}",
                "phase": "supervisor",
                "action": action,
                "trace": trace,
                "history": history,
            }

        if decision.kind == "revise" and decision.revised_action:
            action_to_execute = decision.revised_action
            executed_from = "supervisor_revised"

        if decision.kind == "escalate":
            human = simulate_human_approval(action)
            human_info = human
            if not human.get("approved"):
                trace.append(
                    {
                        "step": step,
                        "tool": action["name"],
                        "args_hash": args_hash(action["args"]),
                        "supervisor_decision": "escalate",
                        "executed_from": executed_from,
                        "human_approved": False,
                        "stop_reason": "human_rejected",
                        "ok": False,
                    }
                )
                return {
                    "status": "stopped",
                    "stop_reason": "human_rejected",
                    "phase": "human_approval",
                    "action": action,
                    "trace": trace,
                    "history": history,
                }
            revised = human.get("revised_action")
            if isinstance(revised, dict):
                action_to_execute = revised
                executed_from = "human_revised"

        if action_to_execute["kind"] == "final":
            trace.append(
                {
                    "step": step,
                    "tool": "final",
                    "supervisor_decision": decision.kind,
                    "executed_from": executed_from,
                    "ok": True,
                }
            )
            history.append(
                {
                    "step": step,
                    "action": action,
                    "supervisor": supervisor_info,
                    "executed_action": action_to_execute,
                    "executed_from": executed_from,
                    "observation": {"status": "final"},
                }
            )
            return {
                "status": "ok",
                "stop_reason": "success",
                "answer": action_to_execute["answer"],
                "trace": trace,
                "history": history,
            }

        tool_name = action_to_execute["name"]
        tool_args = action_to_execute["args"]

        try:
            observation = gateway.call(tool_name, tool_args)
            trace_row = {
                "step": step,
                "tool": tool_name,
                "args_hash": args_hash(tool_args),
                "supervisor_decision": decision.kind,
                "executed_from": executed_from,
                "ok": True,
            }
            if human_info is not None:
                trace_row["human_approved"] = bool(human_info.get("approved"))
            trace.append(trace_row)
        except StopRun as exc:
            trace_row = {
                "step": step,
                "tool": tool_name,
                "args_hash": args_hash(tool_args),
                "supervisor_decision": decision.kind,
                "executed_from": executed_from,
                "ok": False,
                "stop_reason": exc.reason,
            }
            if human_info is not None:
                trace_row["human_approved"] = bool(human_info.get("approved"))
            trace.append(trace_row)
            return {
                "status": "stopped",
                "stop_reason": exc.reason,
                "phase": "execution",
                "action": action_to_execute,
                "trace": trace,
                "history": history,
            }

        if tool_name == "get_refund_context" and "error" not in observation:
            state.has_context = True

        if tool_name == "issue_refund" and observation.get("status") == "ok":
            amount = float(observation.get("refund", {}).get("amount_usd", 0.0))
            state.executed_refund_total_usd += amount
            state.refund_executed = True

        history_entry = {
            "step": step,
            "action": action,
            "supervisor": supervisor_info,
            "executed_action": action_to_execute,
            "executed_from": executed_from,
            "observation": observation,
        }
        if human_info is not None:
            history_entry["human_approval"] = human_info
        history.append(history_entry)

    try:
        answer = compose_final_answer(goal=goal, history=history)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "finalize",
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "phase": "finalize",
            "trace": trace,
            "history": history,
        }

    return {
        "status": "ok",
        "stop_reason": "success",
        "answer": answer,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_supervised_flow(GOAL)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Lo más importante aquí (en simple)

  • El worker no puede saltarse el supervisor review: cada tool call pasa policy check.
  • escalate lleva a human approval, y solo después se ejecuta la acción.
  • En trace se ve la fuente de la acción ejecutada: original, supervisor_revised o human_revised.
  • Un final temprano sin get_refund_context se bloquea como supervisor_block:final_requires_context.
  • trace + history dan una auditoría completa: qué propuso el worker, qué decidió el supervisor y qué se ejecutó realmente.

requirements.txt

TEXT
openai==2.21.0

Ejemplo de salida

Abajo hay un ejemplo de ejecución válida donde el worker, justo después de recopilar contexto, propone un reembolso parcial de 1000 USD, y el supervisor da approve.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "answer": "A partial refund of 1000 USD has been approved and processed for Anna as per supervisor policy, and a confirmation email has been sent.",
  "trace": [
    {
      "step": 1,
      "tool": "get_refund_context",
      "args_hash": "feaa769a39ae",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 2,
      "tool": "issue_refund",
      "args_hash": "36ffe69cb606",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 3,
      "tool": "send_refund_email",
      "args_hash": "ff6ec44bb2fa",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 4,
      "tool": "final",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    }
  ],
  "history": [
    {"step": 1, "action": {"kind": "tool", "name": "get_refund_context", "args": {"user_id": 42}}, "supervisor": {"decision": "approve", "reason": "read_only_context"}, "executed_action": {"kind": "tool", "name": "get_refund_context", "args": {"user_id": 42}}, "executed_from": "original", "observation": {...}},
    {"step": 2, "action": {"kind": "tool", "name": "issue_refund", "args": {"user_id": 42, "amount_usd": 1000.0, "reason": "..."}}, "supervisor": {"decision": "approve", "reason": "refund_within_auto_limit"}, "executed_action": {"kind": "tool", "name": "issue_refund", "args": {"user_id": 42, "amount_usd": 1000.0, "reason": "..."}}, "executed_from": "original", "observation": {...}},
    {"step": 3, "action": {"kind": "tool", "name": "send_refund_email", "args": {"user_id": 42, "amount_usd": 1000.0, "message": "..."}}, "supervisor": {"decision": "approve", "reason": "email_after_refund"}, "executed_action": {"kind": "tool", "name": "send_refund_email", "args": {"user_id": 42, "amount_usd": 1000.0, "message": "..."}}, "executed_from": "original", "observation": {...}},
    {"step": 4, "action": {"kind": "final", "answer": "..."}, "supervisor": {"decision": "approve", "reason": "final_with_context"}, "executed_action": {"kind": "final", "answer": "..."}, "executed_from": "original", "observation": {"status": "final"}}
  ]
}

Este es un ejemplo abreviado: history se recorta intencionalmente a campos clave para legibilidad.


Qué NO se muestra aquí

  • No hay una UI/cola integrada real de human-in-the-loop.
  • No hay role-based access ni aislamiento multi-tenant.
  • No hay políticas complejas de retry/backoff para LLM.
  • No hay orquestación completa de retry/backoff para write-tools; el loop guard aquí es intencionalmente conservador.
  • Optional args (por ejemplo issue_refund.reason) se rellenan mediante supervisor revise dentro del contrato de demo.
  • No hay presupuestos por tokens/costo (cost guardrails).

Valores típicos de stop_reason

  • success — el worker completó el escenario y devolvió la respuesta final
  • invalid_action:* — el worker devolvió action JSON inválido
  • invalid_action:bad_arg_type:* — en tool args hay un valor con tipo inválido (por ejemplo, amount_usd no es number)
  • supervisor_block:* — el supervisor bloqueó la acción por policy
  • human_rejected — la escalada a una persona fue rechazada
  • max_tool_calls — se agotó el límite de tool calls
  • max_seconds — se excedió el time budget del run
  • llm_timeout — el LLM no respondió dentro de OPENAI_TIMEOUT_SECONDS
  • llm_empty — la respuesta final del LLM está vacía
  • tool_denied:* — el tool no está en la execution allowlist
  • tool_missing:* — el tool falta en el registry
  • tool_bad_args:* — argumentos inválidos para el tool
  • loop_detected:per_tool_limit — se excedió per_tool_limit para un tool (protección contra tool spam incluso con args distintos)
  • loop_detected:signature_repeat — el mismo tool+args se repitió por encima del límite permitido

Qué probar después

  • Reduce POLICY.auto_refund_limit_usd a 500 y mira cómo escalate se activa más seguido.
  • Pon simulate_human_approval en modo rechazo y verifica human_rejected.
  • Quita send_refund_email de ALLOWED_TOOLS y verifica tool_denied:*.
  • Quita reason de issue_refund y verifica cómo el supervisor devuelve revise con autocompletado de motivo.
  • Agrega un campo risk_score a las decisiones del supervisor y muéstralo en trace para alerting.
⏱️ 19 min de lecturaActualizado Mar, 2026Dificultad: ★★☆
Integrado: control en producciónOnceOnly
Guardrails para agentes con tool-calling
Lleva este patrón a producción con gobernanza:
  • Presupuestos (pasos / topes de gasto)
  • Permisos de herramientas (allowlist / blocklist)
  • Kill switch y parada por incidente
  • Idempotencia y dedupe
  • Audit logs y trazabilidad
Mención integrada: OnceOnly es una capa de control para sistemas de agentes en producción.
Autor

Esta documentación está curada y mantenida por ingenieros que despliegan agentes de IA en producción.

El contenido es asistido por IA, con responsabilidad editorial humana sobre la exactitud, la claridad y la relevancia en producción.

Los patrones y las recomendaciones se basan en post-mortems, modos de fallo e incidentes operativos en sistemas desplegados, incluido durante el desarrollo y la operación de infraestructura de gobernanza para agentes en OnceOnly.