Supervisor Agent — Python (full implementation with LLM)

Production-style runnable Supervisor agent example in Python with policy review, approve/revise/block/escalate decisions, human approval, budgets, and stop reasons.
On this page
  1. Pattern Essence (Brief)
  2. What this example demonstrates
  3. Architecture
  4. Project structure
  5. How to run
  6. Task
  7. Solution
  8. Code
  9. tools.py — business tools (facts and execution only)
  10. gateway.py — execution boundary (validation and control)
  11. supervisor.py — policy review and human approval
  12. llm.py — worker decisions + final synthesis
  13. main.py — Worker -> Supervisor -> Execute -> Final
  14. requirements.txt
  15. Example output
  16. What is NOT shown here
  17. Typical stop_reason values
  18. What to try next

Pattern Essence (Brief)

Supervisor Agent is a pattern where a worker agent proposes the next action, and a separate supervisor layer decides whether it can be executed.

LLM decides what to do next, and supervisor policy decides whether it is safe and allowed before execution.


What this example demonstrates

  • worker loop where LLM proposes an action (tool or final)
  • separate supervisor review before each tool call
  • supervisor decisions: approve, revise, block, escalate
  • human approval simulation for high refund amounts
  • policy boundary between intent (LLM) and execution (tools)
  • tool allowlist, run budgets, and loop detection
  • explicit stop_reason and audit trace for production monitoring

Architecture

  1. Worker proposes action JSON.
  2. Action is validated (validate_worker_action).
  3. Supervisor performs review and returns a decision (approve/revise/block/escalate).
  4. ToolGateway executes only the allowed action.
  5. Observation + supervisor decision are written to history.
  6. Worker sees history and takes the next step or returns final.

Supervisor does not execute business logic itself: it only decides whether the next step is admissible.


Project structure

TEXT
examples/
└── agent-patterns/
    └── supervisor-agent/
        └── python/
            ├── main.py           # Worker -> Supervisor Review -> Execute -> Final
            ├── llm.py            # worker decision + final synthesis
            ├── supervisor.py     # policy review + human approval simulation
            ├── gateway.py        # action validation + allowlist + loop/budget control
            ├── tools.py          # deterministic business tools (refund flow)
            └── requirements.txt

How to run

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/supervisor-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ is required.

Option via export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Option via .env (optional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

This is the shell variant (macOS/Linux). On Windows, it is easier to use environment set commands or, if desired, python-dotenv to load .env automatically.


Task

Imagine a real production support case:

"A customer requests a 1200 USD refund for an annual plan paid 10 days ago."

The worker must not automatically execute a refund of that size. It must:

  • gather context
  • propose an action
  • let the supervisor check it against policies
  • escalate to a human when needed
  • execute only the approved action

Solution

In this example:

  • the worker LLM proposes a sequence of actions
  • the supervisor controls every tool call
  • a high-risk refund is escalated to a human
  • a human (simulation) approves the adjusted amount
  • only then do execution and the final response happen

This is the Supervisor pattern: execution cannot bypass policy review.


Code

tools.py — business tools (facts and execution only)

PYTHON
from __future__ import annotations

from typing import Any

USERS = {
    42: {"id": 42, "name": "Anna", "country": "US", "tier": "enterprise"},
    7: {"id": 7, "name": "Max", "country": "US", "tier": "pro"},
}

BILLING = {
    42: {
        "plan": "annual_enterprise",
        "currency": "USD",
        "last_charge_usd": 1200.0,
        "days_since_payment": 10,
    },
    7: {
        "plan": "pro_monthly",
        "currency": "USD",
        "last_charge_usd": 49.0,
        "days_since_payment": 35,
    },
}


def get_refund_context(user_id: int) -> dict[str, Any]:
    user = USERS.get(user_id)
    billing = BILLING.get(user_id)
    if not user or not billing:
        return {"error": f"context_not_found_for_user:{user_id}"}

    return {
        "user": user,
        "billing": billing,
        "policy_hint": {
            "auto_refund_limit_usd": 1000.0,
            "refund_window_days": 14,
        },
    }


def issue_refund(
    user_id: int,
    amount_usd: float,
    reason: str,
) -> dict[str, Any]:
    user = USERS.get(user_id)
    billing = BILLING.get(user_id)
    if not user or not billing:
        return {"status": "error", "error": f"refund_user_not_found:{user_id}"}

    if amount_usd <= 0:
        return {"status": "error", "error": "refund_amount_must_be_positive"}

    paid = float(billing["last_charge_usd"])
    if amount_usd > paid:
        return {
            "status": "error",
            "error": "refund_exceeds_last_charge",
            "max_refund_usd": paid,
        }

    return {
        "status": "ok",
        "refund": {
            "user_id": user_id,
            "currency": "USD",
            "amount_usd": round(float(amount_usd), 2),
            "reason": reason.strip(),
            "transaction_id": f"rf_{user_id}_20260226",
        },
    }


def send_refund_email(user_id: int, amount_usd: float, message: str) -> dict[str, Any]:
    user = USERS.get(user_id)
    if not user:
        return {"status": "error", "error": f"email_user_not_found:{user_id}"}

    return {
        "status": "ok",
        "email": {
            "to": f"{user['name'].lower()}@example.com",
            "template": "refund_confirmation_v2",
            "amount_usd": round(float(amount_usd), 2),
            "message": message,
            "email_id": f"em_{user_id}_20260226",
        },
    }

What matters most here (in plain words)

  • Tools do not make policy decisions — they only return facts or execute an action.
  • Safety constraints here are minimal; the supervisor performs the primary control.

gateway.py — execution boundary (validation and control)

PYTHON
from __future__ import annotations

import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_steps: int = 8
    max_tool_calls: int = 5
    max_seconds: int = 30


TOOL_ARG_TYPES: dict[str, dict[str, str]] = {
    "get_refund_context": {"user_id": "int"},
    "issue_refund": {"user_id": "int", "amount_usd": "number", "reason": "str?"},
    "send_refund_email": {"user_id": "int", "amount_usd": "number", "message": "str"},
}


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(v) for v in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def _normalize_for_hash(value: Any) -> Any:
    if isinstance(value, str):
        return " ".join(value.strip().split())
    if isinstance(value, list):
        return [_normalize_for_hash(item) for item in value]
    if isinstance(value, dict):
        return {str(key): _normalize_for_hash(value[key]) for key in sorted(value)}
    return value


def args_hash(args: dict[str, Any]) -> str:
    normalized = _normalize_for_hash(args or {})
    raw = _stable_json(normalized)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def _is_number(value: Any) -> bool:
    return isinstance(value, (int, float)) and not isinstance(value, bool)


def _validate_tool_args(name: str, args: dict[str, Any]) -> dict[str, Any]:
    spec = TOOL_ARG_TYPES.get(name)
    if spec is None:
        raise StopRun(f"invalid_action:unknown_tool:{name}")

    extra = set(args.keys()) - set(spec.keys())
    if extra:
        raise StopRun(f"invalid_action:extra_tool_args:{name}")

    normalized: dict[str, Any] = {}
    for arg_name, expected in spec.items():
        is_optional = expected.endswith("?")
        expected_base = expected[:-1] if is_optional else expected

        if arg_name not in args:
            if is_optional:
                continue
            raise StopRun(f"invalid_action:missing_required_arg:{name}:{arg_name}")
        value = args[arg_name]

        if expected_base == "int":
            if not isinstance(value, int) or isinstance(value, bool):
                raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
            normalized[arg_name] = value
            continue

        if expected_base == "number":
            if not _is_number(value):
                raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
            normalized[arg_name] = float(value)
            continue

        if expected_base == "str":
            if not isinstance(value, str) or not value.strip():
                raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
            normalized[arg_name] = value.strip()
            continue

        raise StopRun(f"invalid_action:unknown_arg_spec:{name}:{arg_name}")

    return normalized


def validate_worker_action(action: Any) -> dict[str, Any]:
    if not isinstance(action, dict):
        raise StopRun("invalid_action:not_object")

    kind = action.get("kind")
    if kind == "invalid":
        raise StopRun("invalid_action:non_json")

    if kind == "final":
        allowed_keys = {"kind", "answer"}
        if set(action.keys()) - allowed_keys:
            raise StopRun("invalid_action:extra_keys_final")
        answer = action.get("answer")
        if not isinstance(answer, str) or not answer.strip():
            raise StopRun("invalid_action:bad_final_answer")
        return {"kind": "final", "answer": answer.strip()}

    if kind == "tool":
        allowed_keys = {"kind", "name", "args"}
        if set(action.keys()) - allowed_keys:
            raise StopRun("invalid_action:extra_keys_tool")

        name = action.get("name")
        if not isinstance(name, str) or not name.strip():
            raise StopRun("invalid_action:bad_tool_name")

        args = action.get("args", {})
        if args is None:
            args = {}
        if not isinstance(args, dict):
            raise StopRun("invalid_action:bad_tool_args")

        normalized_args = _validate_tool_args(name.strip(), args)
        return {"kind": "tool", "name": name.strip(), "args": normalized_args}

    raise StopRun("invalid_action:bad_kind")


class ToolGateway:
    def __init__(
        self,
        *,
        allow: set[str],
        registry: dict[str, Callable[..., dict[str, Any]]],
        budget: Budget,
    ):
        self.allow = set(allow)
        self.registry = registry
        self.budget = budget
        self.tool_calls = 0
        self.seen_call_counts: dict[str, int] = {}
        self.per_tool_counts: dict[str, int] = {}
        self.read_only_repeat_limit: dict[str, int] = {
            "get_refund_context": 2,
        }
        self.per_tool_limit: dict[str, int] = {
            "get_refund_context": 3,
            "issue_refund": 2,
            "send_refund_email": 2,
        }

    def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
        self.tool_calls += 1
        if self.tool_calls > self.budget.max_tool_calls:
            raise StopRun("max_tool_calls")

        if name not in self.allow:
            raise StopRun(f"tool_denied:{name}")

        fn = self.registry.get(name)
        if fn is None:
            raise StopRun(f"tool_missing:{name}")

        count_for_tool = self.per_tool_counts.get(name, 0) + 1
        if count_for_tool > self.per_tool_limit.get(name, 2):
            raise StopRun("loop_detected:per_tool_limit")
        self.per_tool_counts[name] = count_for_tool

        signature = f"{name}:{args_hash(args)}"
        seen = self.seen_call_counts.get(signature, 0) + 1
        allowed_repeats = self.read_only_repeat_limit.get(name, 1)
        if seen > allowed_repeats:
            raise StopRun("loop_detected:signature_repeat")
        self.seen_call_counts[signature] = seen

        try:
            out = fn(**args)
        except TypeError as exc:
            raise StopRun(f"tool_bad_args:{name}") from exc
        except Exception as exc:
            raise StopRun(f"tool_error:{name}") from exc

        if not isinstance(out, dict):
            raise StopRun(f"tool_bad_result:{name}")
        return out

What matters most here (in plain words)

  • Gateway performs a minimal args schema-check (types, required + optional fields) before execution.
  • Gateway controls technical execution: allowlist, budget, loop detection (both by tool+args and by the total number of tool calls).
  • Gateway does not fill missing fields — it only validates the args contract.
  • For some fields (for example issue_refund.reason), gateway allows optional because supervisor can add them via revise. If supervisor does not add them, execution stops with tool_bad_args:*.
  • Even if supervisor approved, execution still goes only through the controlled layer.

supervisor.py — policy review and human approval

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any

HUMAN_APPROVAL_CAP_USD = 800.0


@dataclass(frozen=True)
class Policy:
    auto_refund_limit_usd: float = 1000.0
    max_refund_per_run_usd: float = 2000.0


@dataclass
class RuntimeState:
    executed_refund_total_usd: float = 0.0
    refund_executed: bool = False
    has_context: bool = False


@dataclass
class Decision:
    kind: str  # approve | revise | block | escalate
    reason: str
    revised_action: dict[str, Any] | None = None


def review_action(action: dict[str, Any], state: RuntimeState, policy: Policy) -> Decision:
    kind = action.get("kind")
    if kind == "final":
        if not state.has_context:
            return Decision(kind="block", reason="final_requires_context")
        return Decision(kind="approve", reason="final_with_context")
    if kind != "tool":
        return Decision(kind="block", reason="unknown_action_kind")

    name = action.get("name")
    args = dict(action.get("args") or {})

    if name == "get_refund_context":
        return Decision(kind="approve", reason="read_only_context")

    if name == "issue_refund":
        try:
            amount = float(args.get("amount_usd", 0.0))
        except (TypeError, ValueError):
            return Decision(kind="block", reason="invalid_refund_amount_type")
        if amount <= 0:
            return Decision(kind="block", reason="invalid_refund_amount")

        remaining = policy.max_refund_per_run_usd - state.executed_refund_total_usd
        if remaining <= 0:
            return Decision(kind="block", reason="refund_budget_exhausted")

        if amount > remaining:
            revised = {
                "kind": "tool",
                "name": name,
                "args": {**args, "amount_usd": round(remaining, 2)},
            }
            return Decision(kind="revise", reason="cap_to_remaining_run_budget", revised_action=revised)

        reason = str(args.get("reason", "")).strip()
        if not reason:
            revised = {
                "kind": "tool",
                "name": name,
                "args": {**args, "reason": "Customer requested refund within policy review"},
            }
            return Decision(kind="revise", reason="refund_reason_required", revised_action=revised)

        if amount > policy.auto_refund_limit_usd:
            return Decision(kind="escalate", reason="high_refund_requires_human")

        return Decision(kind="approve", reason="refund_within_auto_limit")

    if name == "send_refund_email":
        if not state.refund_executed:
            return Decision(kind="block", reason="email_before_refund")
        return Decision(kind="approve", reason="email_after_refund")

    return Decision(kind="block", reason=f"unknown_tool_for_supervisor:{name}")


def simulate_human_approval(action: dict[str, Any]) -> dict[str, Any]:
    name = action.get("name")
    args = dict(action.get("args") or {})

    if name != "issue_refund":
        return {"approved": True, "revised_action": action, "comment": "approved_by_human"}

    try:
        requested = float(args.get("amount_usd", 0.0))
    except (TypeError, ValueError):
        return {"approved": False, "comment": "invalid_requested_amount_type"}
    approved_amount = min(requested, HUMAN_APPROVAL_CAP_USD)

    if approved_amount <= 0:
        return {"approved": False, "comment": "invalid_requested_amount"}

    revised_action = {
        "kind": "tool",
        "name": "issue_refund",
        "args": {**args, "amount_usd": round(approved_amount, 2)},
    }
    return {
        "approved": True,
        "revised_action": revised_action,
        "comment": f"approved_with_cap:{approved_amount}",
    }

What matters most here (in plain words)

  • Supervisor makes decisions before execution, not after.
  • final also goes through supervisor review (without context it returns block).
  • An invalid amount_usd type does not crash with an exception; it returns a controlled block decision.
  • Supervisor can revise to add policy-required fields (for example, reason for issue_refund).
  • escalate here demonstrates a practical option: a human can approve the action with an adjusted amount.

llm.py — worker decisions + final synthesis

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


WORKER_SYSTEM_PROMPT = """
You are a support worker agent.
Return exactly one JSON object in one of these shapes:
1) {"kind":"tool","name":"<tool_name>","args":{...}}
2) {"kind":"final","answer":"<short final answer>"}

Rules:
- First, collect facts with get_refund_context.
- Then propose issue_refund when facts are sufficient.
- Use send_refund_email only after refund success is visible in history.
- Use only tools from available_tools.
- Do not output markdown or extra keys.
""".strip()

FINAL_SYSTEM_PROMPT = """
You are a customer support assistant.
Write a short final answer in English.
Include:
- final refund amount in USD
- whether human approval was required
- one reason based on policy
""".strip()

TOOL_CATALOG = [
    {
        "name": "get_refund_context",
        "description": "Get user profile, billing facts, and policy hints",
        "args": {"user_id": "integer"},
    },
    {
        "name": "issue_refund",
        "description": "Issue refund in USD",
        "args": {"user_id": "integer", "amount_usd": "number", "reason": "optional string"},
    },
    {
        "name": "send_refund_email",
        "description": "Send refund confirmation email",
        "args": {"user_id": "integer", "amount_usd": "number", "message": "string"},
    },
]


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def _build_state_summary(history: list[dict[str, Any]]) -> dict[str, Any]:
    tools_used = [
        step.get("action", {}).get("name")
        for step in history
        if isinstance(step, dict)
        and isinstance(step.get("action"), dict)
        and step.get("action", {}).get("kind") == "tool"
    ]

    supervisor_decisions = [
        step.get("supervisor", {}).get("decision")
        for step in history
        if isinstance(step, dict) and isinstance(step.get("supervisor"), dict)
    ]

    return {
        "steps_completed": len(history),
        "tools_used": [t for t in tools_used if t],
        "supervisor_decisions": [d for d in supervisor_decisions if d],
        "last_step": history[-1] if history else None,
    }


def _recent_step_summaries(history: list[dict[str, Any]], limit: int = 3) -> list[dict[str, Any]]:
    out: list[dict[str, Any]] = []
    for step in history[-limit:]:
        out.append(
            {
                "step": step.get("step"),
                "proposed_tool": step.get("action", {}).get("name"),
                "supervisor_decision": step.get("supervisor", {}).get("decision"),
                "executed_tool": step.get("executed_action", {}).get("name"),
                "executed_from": step.get("executed_from"),
                "observation_status": step.get("observation", {}).get("status"),
            }
        )
    return out


def decide_next_action(
    goal: str,
    history: list[dict[str, Any]],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "state_summary": _build_state_summary(history),
        "recent_step_summaries": _recent_step_summaries(history, limit=3),
        "available_tools": TOOL_CATALOG,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": WORKER_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
    payload = {
        "goal": goal,
        "history": history,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            messages=[
                {"role": "system", "content": FINAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = (completion.choices[0].message.content or "").strip()
    if not text:
        raise LLMEmpty("llm_empty")
    return text

What matters most here (in plain words)

  • Worker makes decisions but has no right to execute an action directly.
  • Worker context includes supervisor decisions and compact summaries of recent steps without "fat" payload.

main.py — Worker -> Supervisor -> Execute -> Final

PYTHON
from __future__ import annotations

import json
import time
from typing import Any

from gateway import Budget, StopRun, ToolGateway, args_hash, validate_worker_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, decide_next_action
from supervisor import Decision, Policy, RuntimeState, review_action, simulate_human_approval
from tools import get_refund_context, issue_refund, send_refund_email

GOAL = (
    "User Anna (user_id=42) requests a refund for a recent annual plan charge of 1200 USD. "
    "Apply supervisor policy before any refund execution and provide a short final response."
)

BUDGET = Budget(max_steps=8, max_tool_calls=5, max_seconds=30)
POLICY = Policy(
    auto_refund_limit_usd=1000.0,
    max_refund_per_run_usd=2000.0,
)

TOOL_REGISTRY = {
    "get_refund_context": get_refund_context,
    "issue_refund": issue_refund,
    "send_refund_email": send_refund_email,
}

ALLOWED_TOOLS = {"get_refund_context", "issue_refund", "send_refund_email"}


def _decision_payload(decision: Decision) -> dict[str, Any]:
    payload = {
        "decision": decision.kind,
        "reason": decision.reason,
    }
    if decision.revised_action:
        payload["revised_action"] = decision.revised_action
    return payload


def run_supervised_flow(goal: str) -> dict[str, Any]:
    started = time.monotonic()
    history: list[dict[str, Any]] = []
    trace: list[dict[str, Any]] = []

    state = RuntimeState()
    gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)

    for step in range(1, BUDGET.max_steps + 1):
        if (time.monotonic() - started) > BUDGET.max_seconds:
            return {
                "status": "stopped",
                "stop_reason": "max_seconds",
                "trace": trace,
                "history": history,
            }

        try:
            raw_action = decide_next_action(goal=goal, history=history)
        except LLMTimeout:
            return {
                "status": "stopped",
                "stop_reason": "llm_timeout",
                "phase": "worker",
                "trace": trace,
                "history": history,
            }

        try:
            action = validate_worker_action(raw_action)
        except StopRun as exc:
            return {
                "status": "stopped",
                "stop_reason": exc.reason,
                "phase": "worker",
                "raw_action": raw_action,
                "trace": trace,
                "history": history,
            }

        decision = review_action(action=action, state=state, policy=POLICY)
        supervisor_info = _decision_payload(decision)

        action_to_execute = action
        human_info: dict[str, Any] | None = None
        executed_from = "original"

        if decision.kind == "block":
            trace_row = {
                "step": step,
                "tool": action.get("name", "final"),
                "supervisor_decision": "block",
                "executed_from": executed_from,
                "stop_reason": f"supervisor_block:{decision.reason}",
                "ok": False,
            }
            if action.get("kind") == "tool":
                trace_row["args_hash"] = args_hash(action.get("args", {}))
            trace.append(trace_row)
            return {
                "status": "stopped",
                "stop_reason": f"supervisor_block:{decision.reason}",
                "phase": "supervisor",
                "action": action,
                "trace": trace,
                "history": history,
            }

        if decision.kind == "revise" and decision.revised_action:
            action_to_execute = decision.revised_action
            executed_from = "supervisor_revised"

        if decision.kind == "escalate":
            human = simulate_human_approval(action)
            human_info = human
            if not human.get("approved"):
                trace.append(
                    {
                        "step": step,
                        "tool": action["name"],
                        "args_hash": args_hash(action["args"]),
                        "supervisor_decision": "escalate",
                        "executed_from": executed_from,
                        "human_approved": False,
                        "stop_reason": "human_rejected",
                        "ok": False,
                    }
                )
                return {
                    "status": "stopped",
                    "stop_reason": "human_rejected",
                    "phase": "human_approval",
                    "action": action,
                    "trace": trace,
                    "history": history,
                }
            revised = human.get("revised_action")
            if isinstance(revised, dict):
                action_to_execute = revised
                executed_from = "human_revised"

        if action_to_execute["kind"] == "final":
            trace.append(
                {
                    "step": step,
                    "tool": "final",
                    "supervisor_decision": decision.kind,
                    "executed_from": executed_from,
                    "ok": True,
                }
            )
            history.append(
                {
                    "step": step,
                    "action": action,
                    "supervisor": supervisor_info,
                    "executed_action": action_to_execute,
                    "executed_from": executed_from,
                    "observation": {"status": "final"},
                }
            )
            return {
                "status": "ok",
                "stop_reason": "success",
                "answer": action_to_execute["answer"],
                "trace": trace,
                "history": history,
            }

        tool_name = action_to_execute["name"]
        tool_args = action_to_execute["args"]

        try:
            observation = gateway.call(tool_name, tool_args)
            trace_row = {
                "step": step,
                "tool": tool_name,
                "args_hash": args_hash(tool_args),
                "supervisor_decision": decision.kind,
                "executed_from": executed_from,
                "ok": True,
            }
            if human_info is not None:
                trace_row["human_approved"] = bool(human_info.get("approved"))
            trace.append(trace_row)
        except StopRun as exc:
            trace_row = {
                "step": step,
                "tool": tool_name,
                "args_hash": args_hash(tool_args),
                "supervisor_decision": decision.kind,
                "executed_from": executed_from,
                "ok": False,
                "stop_reason": exc.reason,
            }
            if human_info is not None:
                trace_row["human_approved"] = bool(human_info.get("approved"))
            trace.append(trace_row)
            return {
                "status": "stopped",
                "stop_reason": exc.reason,
                "phase": "execution",
                "action": action_to_execute,
                "trace": trace,
                "history": history,
            }

        if tool_name == "get_refund_context" and "error" not in observation:
            state.has_context = True

        if tool_name == "issue_refund" and observation.get("status") == "ok":
            amount = float(observation.get("refund", {}).get("amount_usd", 0.0))
            state.executed_refund_total_usd += amount
            state.refund_executed = True

        history_entry = {
            "step": step,
            "action": action,
            "supervisor": supervisor_info,
            "executed_action": action_to_execute,
            "executed_from": executed_from,
            "observation": observation,
        }
        if human_info is not None:
            history_entry["human_approval"] = human_info
        history.append(history_entry)

    try:
        answer = compose_final_answer(goal=goal, history=history)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "finalize",
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "phase": "finalize",
            "trace": trace,
            "history": history,
        }

    return {
        "status": "ok",
        "stop_reason": "success",
        "answer": answer,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_supervised_flow(GOAL)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

What matters most here (in plain words)

  • Worker cannot bypass supervisor review: every tool call passes policy check.
  • escalate leads to human approval, and only after that is the action executed.
  • In trace, you can see the source of the executed action: original, supervisor_revised, or human_revised.
  • Early final without get_refund_context is blocked as supervisor_block:final_requires_context.
  • trace + history provide a full audit: what worker proposed, what supervisor decided, and what was actually executed.

requirements.txt

TEXT
openai==2.21.0

Example output

Below is an example of a valid run where the worker, right after gathering context, proposes a partial 1000 USD refund, and supervisor gives approve.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "answer": "A partial refund of 1000 USD has been approved and processed for Anna as per supervisor policy, and a confirmation email has been sent.",
  "trace": [
    {
      "step": 1,
      "tool": "get_refund_context",
      "args_hash": "feaa769a39ae",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 2,
      "tool": "issue_refund",
      "args_hash": "36ffe69cb606",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 3,
      "tool": "send_refund_email",
      "args_hash": "ff6ec44bb2fa",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 4,
      "tool": "final",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    }
  ],
  "history": [
    {"step": 1, "action": {"kind": "tool", "name": "get_refund_context", "args": {"user_id": 42}}, "supervisor": {"decision": "approve", "reason": "read_only_context"}, "executed_action": {"kind": "tool", "name": "get_refund_context", "args": {"user_id": 42}}, "executed_from": "original", "observation": {...}},
    {"step": 2, "action": {"kind": "tool", "name": "issue_refund", "args": {"user_id": 42, "amount_usd": 1000.0, "reason": "..."}}, "supervisor": {"decision": "approve", "reason": "refund_within_auto_limit"}, "executed_action": {"kind": "tool", "name": "issue_refund", "args": {"user_id": 42, "amount_usd": 1000.0, "reason": "..."}}, "executed_from": "original", "observation": {...}},
    {"step": 3, "action": {"kind": "tool", "name": "send_refund_email", "args": {"user_id": 42, "amount_usd": 1000.0, "message": "..."}}, "supervisor": {"decision": "approve", "reason": "email_after_refund"}, "executed_action": {"kind": "tool", "name": "send_refund_email", "args": {"user_id": 42, "amount_usd": 1000.0, "message": "..."}}, "executed_from": "original", "observation": {...}},
    {"step": 4, "action": {"kind": "final", "answer": "..."}, "supervisor": {"decision": "approve", "reason": "final_with_context"}, "executed_action": {"kind": "final", "answer": "..."}, "executed_from": "original", "observation": {"status": "final"}}
  ]
}

This is a shortened example: history is intentionally trimmed to key fields for readability.


What is NOT shown here

  • There is no real integrated human-in-the-loop UI/queue.
  • There is no role-based access or multi-tenant isolation.
  • There are no advanced retry/backoff policies for LLM.
  • There is no full retry/backoff orchestration for write-tools; the loop guard here is intentionally conservative.
  • Optional args (for example issue_refund.reason) are filled via supervisor revise within the demo contract.
  • There are no token/cost budgets (cost guardrails).

Typical stop_reason values

  • success — worker completed the scenario and returned the final response
  • invalid_action:* — worker returned invalid action JSON
  • invalid_action:bad_arg_type:* — tool args contain a value with an invalid type (for example, amount_usd is not a number)
  • supervisor_block:* — supervisor blocked the action by policy
  • human_rejected — escalation to a human was rejected
  • max_tool_calls — tool call limit is exhausted
  • max_seconds — run time budget is exceeded
  • llm_timeout — LLM did not respond within OPENAI_TIMEOUT_SECONDS
  • llm_empty — final LLM response is empty
  • tool_denied:* — tool is not in the execution allowlist
  • tool_missing:* — tool is missing in the registry
  • tool_bad_args:* — invalid arguments for tool
  • loop_detected:per_tool_limitper_tool_limit exceeded for a tool (protection from tool spam even with different args)
  • loop_detected:signature_repeat — the same tool+args repeated beyond the allowed repeat limit

What to try next

  • Reduce POLICY.auto_refund_limit_usd to 500 and observe how escalate triggers more often.
  • Set simulate_human_approval to reject mode and verify human_rejected.
  • Remove send_refund_email from ALLOWED_TOOLS and verify tool_denied:*.
  • Remove reason from issue_refund and verify how supervisor returns revise with auto-filled reason.
  • Add a risk_score field to supervisor decisions and output it in trace for alerting.
⏱️ 18 min readUpdated Mar, 2026Difficulty: ★★☆
Integrated: production controlOnceOnly
Add guardrails to tool-calling agents
Ship this pattern with governance:
  • Budgets (steps / spend caps)
  • Tool permissions (allowlist / blocklist)
  • Kill switch & incident stop
  • Idempotency & dedupe
  • Audit logs & traceability
Integrated mention: OnceOnly is a control layer for production agent systems.
Author

This documentation is curated and maintained by engineers who ship AI agents in production.

The content is AI-assisted, with human editorial responsibility for accuracy, clarity, and production relevance.

Patterns and recommendations are grounded in post-mortems, failure modes, and operational incidents in deployed systems, including during the development and operation of governance infrastructure for agents at OnceOnly.