Supervisor Agent — Python (vollständige Implementierung mit LLM)

Ausführbares Supervisor-Agent-Beispiel in Python im Production-Stil mit Policy-Review, approve/revise/block/escalate-Entscheidungen, Human Approval, Budgets und Stop-Reasons.
Auf dieser Seite
  1. Kern des Musters (Kurz)
  2. Was dieses Beispiel zeigt
  3. Architektur
  4. Projektstruktur
  5. Ausführen
  6. Aufgabe
  7. Lösung
  8. Code
  9. tools.py — Business-Tools (nur Fakten und Ausführung)
  10. gateway.py — Execution Boundary (Validierung und Kontrolle)
  11. supervisor.py — Policy-Review und Human Approval
  12. llm.py — worker decisions + final synthesis
  13. main.py — Worker -> Supervisor -> Execute -> Final
  14. requirements.txt
  15. Beispielausgabe
  16. Was hier NICHT gezeigt wird
  17. Typische stop_reason-Werte
  18. Was du als Nächstes ausprobieren kannst

Kern des Musters (Kurz)

Supervisor Agent ist ein Muster, bei dem ein Worker-Agent die nächste Aktion vorschlägt und eine separate Supervisor-Schicht entscheidet, ob sie ausgeführt werden darf.

Das LLM entscheidet was als Nächstes zu tun ist, und die Supervisor-Policy entscheidet ob es sicher und erlaubt ist, bevor ausgeführt wird.


Was dieses Beispiel zeigt

  • Worker-Loop, in dem das LLM eine Aktion vorschlägt (tool oder final)
  • separates Supervisor-Review vor jedem Tool-Call
  • Supervisor-Entscheidungen: approve, revise, block, escalate
  • Simulation von Human Approval für hohe Rückerstattungsbeträge
  • Policy Boundary zwischen Intent (LLM) und Execution (Tools)
  • Tool-Allowlist, Run-Budgets und Loop Detection
  • explizite stop_reason und Audit-Trace für Production-Monitoring

Architektur

  1. Worker schlägt Action-JSON vor.
  2. Action wird validiert (validate_worker_action).
  3. Supervisor macht ein Review und gibt eine Entscheidung zurück (approve/revise/block/escalate).
  4. ToolGateway führt nur die erlaubte Aktion aus.
  5. Observation + Supervisor-Entscheidung werden in history geschrieben.
  6. Worker sieht history und macht den nächsten Schritt oder gibt final zurück.

Supervisor führt die Business-Logik nicht selbst aus: Er entscheidet nur, ob der nächste Schritt zulässig ist.


Projektstruktur

TEXT
examples/
└── agent-patterns/
    └── supervisor-agent/
        └── python/
            ├── main.py           # Worker -> Supervisor Review -> Execute -> Final
            ├── llm.py            # worker decision + final synthesis
            ├── supervisor.py     # policy review + human approval simulation
            ├── gateway.py        # action validation + allowlist + loop/budget control
            ├── tools.py          # deterministic business tools (refund flow)
            └── requirements.txt

Ausführen

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/supervisor-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ ist erforderlich.

Variante über export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Variante über .env (optional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Das ist die Shell-Variante (macOS/Linux). Unter Windows ist es einfacher, set-Variablen zu verwenden oder optional python-dotenv, um .env automatisch zu laden.


Aufgabe

Stell dir einen realen Production-Support-Fall vor:

"Ein Kunde fordert eine Rückerstattung von 1200 USD für einen Jahresplan, der vor 10 Tagen bezahlt wurde."

Der Worker darf eine Rückerstattung in dieser Höhe nicht automatisch ausführen. Er muss:

  • Kontext sammeln
  • eine Aktion vorschlagen
  • sie vom Supervisor gegen Policies prüfen lassen
  • bei Bedarf an einen Menschen eskalieren
  • nur die genehmigte Aktion ausführen

Lösung

In diesem Beispiel:

  • das Worker-LLM schlägt eine Folge von Aktionen vor
  • der Supervisor kontrolliert jeden Tool-Call
  • eine High-Risk-Rückerstattung wird an einen Menschen eskaliert
  • ein Mensch (Simulation) genehmigt den angepassten Betrag
  • erst danach passieren Ausführung und finale Antwort

Das ist das Supervisor-Muster: Execution kann das Policy-Review nicht umgehen.


Code

tools.py — Business-Tools (nur Fakten und Ausführung)

PYTHON
from __future__ import annotations

from typing import Any

USERS = {
    42: {"id": 42, "name": "Anna", "country": "US", "tier": "enterprise"},
    7: {"id": 7, "name": "Max", "country": "US", "tier": "pro"},
}

BILLING = {
    42: {
        "plan": "annual_enterprise",
        "currency": "USD",
        "last_charge_usd": 1200.0,
        "days_since_payment": 10,
    },
    7: {
        "plan": "pro_monthly",
        "currency": "USD",
        "last_charge_usd": 49.0,
        "days_since_payment": 35,
    },
}


def get_refund_context(user_id: int) -> dict[str, Any]:
    user = USERS.get(user_id)
    billing = BILLING.get(user_id)
    if not user or not billing:
        return {"error": f"context_not_found_for_user:{user_id}"}

    return {
        "user": user,
        "billing": billing,
        "policy_hint": {
            "auto_refund_limit_usd": 1000.0,
            "refund_window_days": 14,
        },
    }


def issue_refund(
    user_id: int,
    amount_usd: float,
    reason: str,
) -> dict[str, Any]:
    user = USERS.get(user_id)
    billing = BILLING.get(user_id)
    if not user or not billing:
        return {"status": "error", "error": f"refund_user_not_found:{user_id}"}

    if amount_usd <= 0:
        return {"status": "error", "error": "refund_amount_must_be_positive"}

    paid = float(billing["last_charge_usd"])
    if amount_usd > paid:
        return {
            "status": "error",
            "error": "refund_exceeds_last_charge",
            "max_refund_usd": paid,
        }

    return {
        "status": "ok",
        "refund": {
            "user_id": user_id,
            "currency": "USD",
            "amount_usd": round(float(amount_usd), 2),
            "reason": reason.strip(),
            "transaction_id": f"rf_{user_id}_20260226",
        },
    }


def send_refund_email(user_id: int, amount_usd: float, message: str) -> dict[str, Any]:
    user = USERS.get(user_id)
    if not user:
        return {"status": "error", "error": f"email_user_not_found:{user_id}"}

    return {
        "status": "ok",
        "email": {
            "to": f"{user['name'].lower()}@example.com",
            "template": "refund_confirmation_v2",
            "amount_usd": round(float(amount_usd), 2),
            "message": message,
            "email_id": f"em_{user_id}_20260226",
        },
    }

Was hier am wichtigsten ist (einfach erklärt)

  • Tools treffen keine Policy-Entscheidungen — sie liefern nur Fakten oder führen eine Aktion aus.
  • Die Sicherheitsbeschränkungen hier sind minimal; die Hauptkontrolle übernimmt der Supervisor.

gateway.py — Execution Boundary (Validierung und Kontrolle)

PYTHON
from __future__ import annotations

import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_steps: int = 8
    max_tool_calls: int = 5
    max_seconds: int = 30


TOOL_ARG_TYPES: dict[str, dict[str, str]] = {
    "get_refund_context": {"user_id": "int"},
    "issue_refund": {"user_id": "int", "amount_usd": "number", "reason": "str?"},
    "send_refund_email": {"user_id": "int", "amount_usd": "number", "message": "str"},
}


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(v) for v in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def _normalize_for_hash(value: Any) -> Any:
    if isinstance(value, str):
        return " ".join(value.strip().split())
    if isinstance(value, list):
        return [_normalize_for_hash(item) for item in value]
    if isinstance(value, dict):
        return {str(key): _normalize_for_hash(value[key]) for key in sorted(value)}
    return value


def args_hash(args: dict[str, Any]) -> str:
    normalized = _normalize_for_hash(args or {})
    raw = _stable_json(normalized)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def _is_number(value: Any) -> bool:
    return isinstance(value, (int, float)) and not isinstance(value, bool)


def _validate_tool_args(name: str, args: dict[str, Any]) -> dict[str, Any]:
    spec = TOOL_ARG_TYPES.get(name)
    if spec is None:
        raise StopRun(f"invalid_action:unknown_tool:{name}")

    extra = set(args.keys()) - set(spec.keys())
    if extra:
        raise StopRun(f"invalid_action:extra_tool_args:{name}")

    normalized: dict[str, Any] = {}
    for arg_name, expected in spec.items():
        is_optional = expected.endswith("?")
        expected_base = expected[:-1] if is_optional else expected

        if arg_name not in args:
            if is_optional:
                continue
            raise StopRun(f"invalid_action:missing_required_arg:{name}:{arg_name}")
        value = args[arg_name]

        if expected_base == "int":
            if not isinstance(value, int) or isinstance(value, bool):
                raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
            normalized[arg_name] = value
            continue

        if expected_base == "number":
            if not _is_number(value):
                raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
            normalized[arg_name] = float(value)
            continue

        if expected_base == "str":
            if not isinstance(value, str) or not value.strip():
                raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
            normalized[arg_name] = value.strip()
            continue

        raise StopRun(f"invalid_action:unknown_arg_spec:{name}:{arg_name}")

    return normalized


def validate_worker_action(action: Any) -> dict[str, Any]:
    if not isinstance(action, dict):
        raise StopRun("invalid_action:not_object")

    kind = action.get("kind")
    if kind == "invalid":
        raise StopRun("invalid_action:non_json")

    if kind == "final":
        allowed_keys = {"kind", "answer"}
        if set(action.keys()) - allowed_keys:
            raise StopRun("invalid_action:extra_keys_final")
        answer = action.get("answer")
        if not isinstance(answer, str) or not answer.strip():
            raise StopRun("invalid_action:bad_final_answer")
        return {"kind": "final", "answer": answer.strip()}

    if kind == "tool":
        allowed_keys = {"kind", "name", "args"}
        if set(action.keys()) - allowed_keys:
            raise StopRun("invalid_action:extra_keys_tool")

        name = action.get("name")
        if not isinstance(name, str) or not name.strip():
            raise StopRun("invalid_action:bad_tool_name")

        args = action.get("args", {})
        if args is None:
            args = {}
        if not isinstance(args, dict):
            raise StopRun("invalid_action:bad_tool_args")

        normalized_args = _validate_tool_args(name.strip(), args)
        return {"kind": "tool", "name": name.strip(), "args": normalized_args}

    raise StopRun("invalid_action:bad_kind")


class ToolGateway:
    def __init__(
        self,
        *,
        allow: set[str],
        registry: dict[str, Callable[..., dict[str, Any]]],
        budget: Budget,
    ):
        self.allow = set(allow)
        self.registry = registry
        self.budget = budget
        self.tool_calls = 0
        self.seen_call_counts: dict[str, int] = {}
        self.per_tool_counts: dict[str, int] = {}
        self.read_only_repeat_limit: dict[str, int] = {
            "get_refund_context": 2,
        }
        self.per_tool_limit: dict[str, int] = {
            "get_refund_context": 3,
            "issue_refund": 2,
            "send_refund_email": 2,
        }

    def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
        self.tool_calls += 1
        if self.tool_calls > self.budget.max_tool_calls:
            raise StopRun("max_tool_calls")

        if name not in self.allow:
            raise StopRun(f"tool_denied:{name}")

        fn = self.registry.get(name)
        if fn is None:
            raise StopRun(f"tool_missing:{name}")

        count_for_tool = self.per_tool_counts.get(name, 0) + 1
        if count_for_tool > self.per_tool_limit.get(name, 2):
            raise StopRun("loop_detected:per_tool_limit")
        self.per_tool_counts[name] = count_for_tool

        signature = f"{name}:{args_hash(args)}"
        seen = self.seen_call_counts.get(signature, 0) + 1
        allowed_repeats = self.read_only_repeat_limit.get(name, 1)
        if seen > allowed_repeats:
            raise StopRun("loop_detected:signature_repeat")
        self.seen_call_counts[signature] = seen

        try:
            out = fn(**args)
        except TypeError as exc:
            raise StopRun(f"tool_bad_args:{name}") from exc
        except Exception as exc:
            raise StopRun(f"tool_error:{name}") from exc

        if not isinstance(out, dict):
            raise StopRun(f"tool_bad_result:{name}")
        return out

Was hier am wichtigsten ist (einfach erklärt)

  • Gateway macht vor der Ausführung einen minimalen Args-Schema-Check (Typen, required + optional Felder).
  • Gateway steuert die technische Ausführung: Allowlist, Budget, Loop Detection (sowohl nach tool+args als auch nach der Gesamtzahl der Tool-Calls).
  • Gateway füllt keine fehlenden Felder auf — es validiert nur den Args-Vertrag.
  • Für manche Felder (zum Beispiel issue_refund.reason) erlaubt Gateway optional, weil Supervisor sie per revise ergänzen kann. Wenn Supervisor sie nicht ergänzt, stoppt die Ausführung mit tool_bad_args:*.
  • Selbst wenn Supervisor genehmigt hat, läuft die Ausführung weiterhin nur über die kontrollierte Schicht.

supervisor.py — Policy-Review und Human Approval

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any

HUMAN_APPROVAL_CAP_USD = 800.0


@dataclass(frozen=True)
class Policy:
    auto_refund_limit_usd: float = 1000.0
    max_refund_per_run_usd: float = 2000.0


@dataclass
class RuntimeState:
    executed_refund_total_usd: float = 0.0
    refund_executed: bool = False
    has_context: bool = False


@dataclass
class Decision:
    kind: str  # approve | revise | block | escalate
    reason: str
    revised_action: dict[str, Any] | None = None


def review_action(action: dict[str, Any], state: RuntimeState, policy: Policy) -> Decision:
    kind = action.get("kind")
    if kind == "final":
        if not state.has_context:
            return Decision(kind="block", reason="final_requires_context")
        return Decision(kind="approve", reason="final_with_context")
    if kind != "tool":
        return Decision(kind="block", reason="unknown_action_kind")

    name = action.get("name")
    args = dict(action.get("args") or {})

    if name == "get_refund_context":
        return Decision(kind="approve", reason="read_only_context")

    if name == "issue_refund":
        try:
            amount = float(args.get("amount_usd", 0.0))
        except (TypeError, ValueError):
            return Decision(kind="block", reason="invalid_refund_amount_type")
        if amount <= 0:
            return Decision(kind="block", reason="invalid_refund_amount")

        remaining = policy.max_refund_per_run_usd - state.executed_refund_total_usd
        if remaining <= 0:
            return Decision(kind="block", reason="refund_budget_exhausted")

        if amount > remaining:
            revised = {
                "kind": "tool",
                "name": name,
                "args": {**args, "amount_usd": round(remaining, 2)},
            }
            return Decision(kind="revise", reason="cap_to_remaining_run_budget", revised_action=revised)

        reason = str(args.get("reason", "")).strip()
        if not reason:
            revised = {
                "kind": "tool",
                "name": name,
                "args": {**args, "reason": "Customer requested refund within policy review"},
            }
            return Decision(kind="revise", reason="refund_reason_required", revised_action=revised)

        if amount > policy.auto_refund_limit_usd:
            return Decision(kind="escalate", reason="high_refund_requires_human")

        return Decision(kind="approve", reason="refund_within_auto_limit")

    if name == "send_refund_email":
        if not state.refund_executed:
            return Decision(kind="block", reason="email_before_refund")
        return Decision(kind="approve", reason="email_after_refund")

    return Decision(kind="block", reason=f"unknown_tool_for_supervisor:{name}")


def simulate_human_approval(action: dict[str, Any]) -> dict[str, Any]:
    name = action.get("name")
    args = dict(action.get("args") or {})

    if name != "issue_refund":
        return {"approved": True, "revised_action": action, "comment": "approved_by_human"}

    try:
        requested = float(args.get("amount_usd", 0.0))
    except (TypeError, ValueError):
        return {"approved": False, "comment": "invalid_requested_amount_type"}
    approved_amount = min(requested, HUMAN_APPROVAL_CAP_USD)

    if approved_amount <= 0:
        return {"approved": False, "comment": "invalid_requested_amount"}

    revised_action = {
        "kind": "tool",
        "name": "issue_refund",
        "args": {**args, "amount_usd": round(approved_amount, 2)},
    }
    return {
        "approved": True,
        "revised_action": revised_action,
        "comment": f"approved_with_cap:{approved_amount}",
    }

Was hier am wichtigsten ist (einfach erklärt)

  • Supervisor trifft Entscheidungen vor der Ausführung, nicht danach.
  • final läuft ebenfalls durch Supervisor-Review (ohne Kontext wird block zurückgegeben).
  • Ein ungültiger Typ bei amount_usd wirft keine Exception, sondern gibt eine kontrollierte block-Entscheidung zurück.
  • Supervisor kann revise verwenden, um policy-required Felder zu ergänzen (zum Beispiel reason für issue_refund).
  • escalate zeigt hier eine praktische Variante: Ein Mensch kann die Aktion mit angepasstem Betrag genehmigen.

llm.py — worker decisions + final synthesis

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


WORKER_SYSTEM_PROMPT = """
You are a support worker agent.
Return exactly one JSON object in one of these shapes:
1) {"kind":"tool","name":"<tool_name>","args":{...}}
2) {"kind":"final","answer":"<short final answer>"}

Rules:
- First, collect facts with get_refund_context.
- Then propose issue_refund when facts are sufficient.
- Use send_refund_email only after refund success is visible in history.
- Use only tools from available_tools.
- Do not output markdown or extra keys.
""".strip()

FINAL_SYSTEM_PROMPT = """
You are a customer support assistant.
Write a short final answer in English.
Include:
- final refund amount in USD
- whether human approval was required
- one reason based on policy
""".strip()

TOOL_CATALOG = [
    {
        "name": "get_refund_context",
        "description": "Get user profile, billing facts, and policy hints",
        "args": {"user_id": "integer"},
    },
    {
        "name": "issue_refund",
        "description": "Issue refund in USD",
        "args": {"user_id": "integer", "amount_usd": "number", "reason": "optional string"},
    },
    {
        "name": "send_refund_email",
        "description": "Send refund confirmation email",
        "args": {"user_id": "integer", "amount_usd": "number", "message": "string"},
    },
]


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def _build_state_summary(history: list[dict[str, Any]]) -> dict[str, Any]:
    tools_used = [
        step.get("action", {}).get("name")
        for step in history
        if isinstance(step, dict)
        and isinstance(step.get("action"), dict)
        and step.get("action", {}).get("kind") == "tool"
    ]

    supervisor_decisions = [
        step.get("supervisor", {}).get("decision")
        for step in history
        if isinstance(step, dict) and isinstance(step.get("supervisor"), dict)
    ]

    return {
        "steps_completed": len(history),
        "tools_used": [t for t in tools_used if t],
        "supervisor_decisions": [d for d in supervisor_decisions if d],
        "last_step": history[-1] if history else None,
    }


def _recent_step_summaries(history: list[dict[str, Any]], limit: int = 3) -> list[dict[str, Any]]:
    out: list[dict[str, Any]] = []
    for step in history[-limit:]:
        out.append(
            {
                "step": step.get("step"),
                "proposed_tool": step.get("action", {}).get("name"),
                "supervisor_decision": step.get("supervisor", {}).get("decision"),
                "executed_tool": step.get("executed_action", {}).get("name"),
                "executed_from": step.get("executed_from"),
                "observation_status": step.get("observation", {}).get("status"),
            }
        )
    return out


def decide_next_action(
    goal: str,
    history: list[dict[str, Any]],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "state_summary": _build_state_summary(history),
        "recent_step_summaries": _recent_step_summaries(history, limit=3),
        "available_tools": TOOL_CATALOG,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": WORKER_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
    payload = {
        "goal": goal,
        "history": history,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            messages=[
                {"role": "system", "content": FINAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = (completion.choices[0].message.content or "").strip()
    if not text:
        raise LLMEmpty("llm_empty")
    return text

Was hier am wichtigsten ist (einfach erklärt)

  • Worker trifft Entscheidungen, hat aber kein Recht, eine Aktion direkt auszuführen.
  • In den Worker-Kontext gehen Supervisor-Entscheidungen und kompakte Summaries der letzten Schritte ohne "fetten" Payload ein.

main.py — Worker -> Supervisor -> Execute -> Final

PYTHON
from __future__ import annotations

import json
import time
from typing import Any

from gateway import Budget, StopRun, ToolGateway, args_hash, validate_worker_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, decide_next_action
from supervisor import Decision, Policy, RuntimeState, review_action, simulate_human_approval
from tools import get_refund_context, issue_refund, send_refund_email

GOAL = (
    "User Anna (user_id=42) requests a refund for a recent annual plan charge of 1200 USD. "
    "Apply supervisor policy before any refund execution and provide a short final response."
)

BUDGET = Budget(max_steps=8, max_tool_calls=5, max_seconds=30)
POLICY = Policy(
    auto_refund_limit_usd=1000.0,
    max_refund_per_run_usd=2000.0,
)

TOOL_REGISTRY = {
    "get_refund_context": get_refund_context,
    "issue_refund": issue_refund,
    "send_refund_email": send_refund_email,
}

ALLOWED_TOOLS = {"get_refund_context", "issue_refund", "send_refund_email"}


def _decision_payload(decision: Decision) -> dict[str, Any]:
    payload = {
        "decision": decision.kind,
        "reason": decision.reason,
    }
    if decision.revised_action:
        payload["revised_action"] = decision.revised_action
    return payload


def run_supervised_flow(goal: str) -> dict[str, Any]:
    started = time.monotonic()
    history: list[dict[str, Any]] = []
    trace: list[dict[str, Any]] = []

    state = RuntimeState()
    gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)

    for step in range(1, BUDGET.max_steps + 1):
        if (time.monotonic() - started) > BUDGET.max_seconds:
            return {
                "status": "stopped",
                "stop_reason": "max_seconds",
                "trace": trace,
                "history": history,
            }

        try:
            raw_action = decide_next_action(goal=goal, history=history)
        except LLMTimeout:
            return {
                "status": "stopped",
                "stop_reason": "llm_timeout",
                "phase": "worker",
                "trace": trace,
                "history": history,
            }

        try:
            action = validate_worker_action(raw_action)
        except StopRun as exc:
            return {
                "status": "stopped",
                "stop_reason": exc.reason,
                "phase": "worker",
                "raw_action": raw_action,
                "trace": trace,
                "history": history,
            }

        decision = review_action(action=action, state=state, policy=POLICY)
        supervisor_info = _decision_payload(decision)

        action_to_execute = action
        human_info: dict[str, Any] | None = None
        executed_from = "original"

        if decision.kind == "block":
            trace_row = {
                "step": step,
                "tool": action.get("name", "final"),
                "supervisor_decision": "block",
                "executed_from": executed_from,
                "stop_reason": f"supervisor_block:{decision.reason}",
                "ok": False,
            }
            if action.get("kind") == "tool":
                trace_row["args_hash"] = args_hash(action.get("args", {}))
            trace.append(trace_row)
            return {
                "status": "stopped",
                "stop_reason": f"supervisor_block:{decision.reason}",
                "phase": "supervisor",
                "action": action,
                "trace": trace,
                "history": history,
            }

        if decision.kind == "revise" and decision.revised_action:
            action_to_execute = decision.revised_action
            executed_from = "supervisor_revised"

        if decision.kind == "escalate":
            human = simulate_human_approval(action)
            human_info = human
            if not human.get("approved"):
                trace.append(
                    {
                        "step": step,
                        "tool": action["name"],
                        "args_hash": args_hash(action["args"]),
                        "supervisor_decision": "escalate",
                        "executed_from": executed_from,
                        "human_approved": False,
                        "stop_reason": "human_rejected",
                        "ok": False,
                    }
                )
                return {
                    "status": "stopped",
                    "stop_reason": "human_rejected",
                    "phase": "human_approval",
                    "action": action,
                    "trace": trace,
                    "history": history,
                }
            revised = human.get("revised_action")
            if isinstance(revised, dict):
                action_to_execute = revised
                executed_from = "human_revised"

        if action_to_execute["kind"] == "final":
            trace.append(
                {
                    "step": step,
                    "tool": "final",
                    "supervisor_decision": decision.kind,
                    "executed_from": executed_from,
                    "ok": True,
                }
            )
            history.append(
                {
                    "step": step,
                    "action": action,
                    "supervisor": supervisor_info,
                    "executed_action": action_to_execute,
                    "executed_from": executed_from,
                    "observation": {"status": "final"},
                }
            )
            return {
                "status": "ok",
                "stop_reason": "success",
                "answer": action_to_execute["answer"],
                "trace": trace,
                "history": history,
            }

        tool_name = action_to_execute["name"]
        tool_args = action_to_execute["args"]

        try:
            observation = gateway.call(tool_name, tool_args)
            trace_row = {
                "step": step,
                "tool": tool_name,
                "args_hash": args_hash(tool_args),
                "supervisor_decision": decision.kind,
                "executed_from": executed_from,
                "ok": True,
            }
            if human_info is not None:
                trace_row["human_approved"] = bool(human_info.get("approved"))
            trace.append(trace_row)
        except StopRun as exc:
            trace_row = {
                "step": step,
                "tool": tool_name,
                "args_hash": args_hash(tool_args),
                "supervisor_decision": decision.kind,
                "executed_from": executed_from,
                "ok": False,
                "stop_reason": exc.reason,
            }
            if human_info is not None:
                trace_row["human_approved"] = bool(human_info.get("approved"))
            trace.append(trace_row)
            return {
                "status": "stopped",
                "stop_reason": exc.reason,
                "phase": "execution",
                "action": action_to_execute,
                "trace": trace,
                "history": history,
            }

        if tool_name == "get_refund_context" and "error" not in observation:
            state.has_context = True

        if tool_name == "issue_refund" and observation.get("status") == "ok":
            amount = float(observation.get("refund", {}).get("amount_usd", 0.0))
            state.executed_refund_total_usd += amount
            state.refund_executed = True

        history_entry = {
            "step": step,
            "action": action,
            "supervisor": supervisor_info,
            "executed_action": action_to_execute,
            "executed_from": executed_from,
            "observation": observation,
        }
        if human_info is not None:
            history_entry["human_approval"] = human_info
        history.append(history_entry)

    try:
        answer = compose_final_answer(goal=goal, history=history)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "finalize",
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "phase": "finalize",
            "trace": trace,
            "history": history,
        }

    return {
        "status": "ok",
        "stop_reason": "success",
        "answer": answer,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_supervised_flow(GOAL)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Was hier am wichtigsten ist (einfach erklärt)

  • Worker kann das Supervisor-Review nicht umgehen: Jeder Tool-Call durchläuft einen Policy-Check.
  • escalate führt zu Human Approval, und erst danach wird die Aktion ausgeführt.
  • In trace sieht man die Quelle der ausgeführten Aktion: original, supervisor_revised oder human_revised.
  • Ein frühes final ohne get_refund_context wird als supervisor_block:final_requires_context blockiert.
  • trace + history liefern ein vollständiges Audit: was Worker vorgeschlagen hat, was Supervisor entschieden hat und was tatsächlich ausgeführt wurde.

requirements.txt

TEXT
openai==2.21.0

Beispielausgabe

Unten ist ein Beispiel für einen validen Lauf, bei dem Worker direkt nach dem Sammeln des Kontexts eine partielle Rückerstattung von 1000 USD vorschlägt und Supervisor approve gibt.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "answer": "A partial refund of 1000 USD has been approved and processed for Anna as per supervisor policy, and a confirmation email has been sent.",
  "trace": [
    {
      "step": 1,
      "tool": "get_refund_context",
      "args_hash": "feaa769a39ae",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 2,
      "tool": "issue_refund",
      "args_hash": "36ffe69cb606",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 3,
      "tool": "send_refund_email",
      "args_hash": "ff6ec44bb2fa",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 4,
      "tool": "final",
      "supervisor_decision": "approve",
      "executed_from": "original",
      "ok": true
    }
  ],
  "history": [
    {"step": 1, "action": {"kind": "tool", "name": "get_refund_context", "args": {"user_id": 42}}, "supervisor": {"decision": "approve", "reason": "read_only_context"}, "executed_action": {"kind": "tool", "name": "get_refund_context", "args": {"user_id": 42}}, "executed_from": "original", "observation": {...}},
    {"step": 2, "action": {"kind": "tool", "name": "issue_refund", "args": {"user_id": 42, "amount_usd": 1000.0, "reason": "..."}}, "supervisor": {"decision": "approve", "reason": "refund_within_auto_limit"}, "executed_action": {"kind": "tool", "name": "issue_refund", "args": {"user_id": 42, "amount_usd": 1000.0, "reason": "..."}}, "executed_from": "original", "observation": {...}},
    {"step": 3, "action": {"kind": "tool", "name": "send_refund_email", "args": {"user_id": 42, "amount_usd": 1000.0, "message": "..."}}, "supervisor": {"decision": "approve", "reason": "email_after_refund"}, "executed_action": {"kind": "tool", "name": "send_refund_email", "args": {"user_id": 42, "amount_usd": 1000.0, "message": "..."}}, "executed_from": "original", "observation": {...}},
    {"step": 4, "action": {"kind": "final", "answer": "..."}, "supervisor": {"decision": "approve", "reason": "final_with_context"}, "executed_action": {"kind": "final", "answer": "..."}, "executed_from": "original", "observation": {"status": "final"}}
  ]
}

Dies ist ein gekürztes Beispiel: history wurde absichtlich auf Schlüsselfelder reduziert, um die Lesbarkeit zu verbessern.


Was hier NICHT gezeigt wird

  • Kein echtes integriertes Human-in-the-loop-UI/Queue.
  • Kein Role-based Access und keine Multi-Tenant-Isolation.
  • Keine komplexen Retry/Backoff-Policies für LLM.
  • Keine vollständige Retry/Backoff-Orchestrierung für Write-Tools; der Loop-Guard ist hier absichtlich konservativ.
  • Optional Args (zum Beispiel issue_refund.reason) werden hier über Supervisor-revise im Rahmen des Demo-Vertrags ergänzt.
  • Keine Token-/Kosten-Budgets (Cost Guardrails).

Typische stop_reason-Werte

  • success — Worker hat das Szenario abgeschlossen und die finale Antwort zurückgegeben
  • invalid_action:* — Worker hat ungültiges Action-JSON zurückgegeben
  • invalid_action:bad_arg_type:* — in Tool-Args hat ein Wert einen ungültigen Typ (zum Beispiel ist amount_usd kein Number)
  • supervisor_block:* — Supervisor hat die Aktion per Policy blockiert
  • human_rejected — Eskalation an einen Menschen wurde abgelehnt
  • max_tool_calls — Tool-Call-Limit ist ausgeschöpft
  • max_seconds — Time-Budget des Runs wurde überschritten
  • llm_timeout — LLM hat innerhalb von OPENAI_TIMEOUT_SECONDS nicht geantwortet
  • llm_empty — finale LLM-Antwort ist leer
  • tool_denied:* — Tool ist nicht in der Execution-Allowlist
  • tool_missing:* — Tool fehlt im Registry
  • tool_bad_args:* — ungültige Argumente für Tool
  • loop_detected:per_tool_limitper_tool_limit für ein Tool überschritten (Schutz vor Tool-Spam auch mit unterschiedlichen Args)
  • loop_detected:signature_repeat — dasselbe tool+args wurde über das erlaubte Repeat-Limit hinaus wiederholt

Was du als Nächstes ausprobieren kannst

  • Senke POLICY.auto_refund_limit_usd auf 500 und beobachte, wie escalate häufiger ausgelöst wird.
  • Setze simulate_human_approval auf Ablehnungsmodus und prüfe human_rejected.
  • Entferne send_refund_email aus ALLOWED_TOOLS und prüfe tool_denied:*.
  • Entferne reason in issue_refund und prüfe, wie Supervisor revise mit automatisch ergänztem Grund zurückgibt.
  • Füge ein Feld risk_score zu Supervisor-Entscheidungen hinzu und gib es in trace für Alerting aus.
⏱️ 18 Min. LesezeitAktualisiert Mär, 2026Schwierigkeit: ★★☆
Integriert: Production ControlOnceOnly
Guardrails für Tool-Calling-Agents
Shippe dieses Pattern mit Governance:
  • Budgets (Steps / Spend Caps)
  • Tool-Permissions (Allowlist / Blocklist)
  • Kill switch & Incident Stop
  • Idempotenz & Dedupe
  • Audit logs & Nachvollziehbarkeit
Integrierter Hinweis: OnceOnly ist eine Control-Layer für Production-Agent-Systeme.
Autor

Diese Dokumentation wird von Engineers kuratiert und gepflegt, die AI-Agenten in der Produktion betreiben.

Die Inhalte sind KI-gestützt, mit menschlicher redaktioneller Verantwortung für Genauigkeit, Klarheit und Produktionsrelevanz.

Patterns und Empfehlungen basieren auf Post-Mortems, Failure-Modes und operativen Incidents in produktiven Systemen, auch bei der Entwicklung und dem Betrieb von Governance-Infrastruktur für Agenten bei OnceOnly.