Reflection Agent — Python (vollständige Implementierung mit LLM)

Ausführbares Reflection-Agent-Beispiel in Python im Production-Stil mit einmaligem Review, patch-only revise, Policy/Execution Boundary und expliziten Stop-Reasons.
Auf dieser Seite
  1. Kern des Musters (Kurz)
  2. Was dieses Beispiel zeigt
  3. Architektur
  4. Projektstruktur
  5. Ausführen
  6. Aufgabe
  7. Lösung
  8. Code
  9. context.py — deterministischer Production-Kontext
  10. gateway.py — policy/execution boundary + patch guardrails
  11. llm.py — Draft/Review/Revise LLM-Aufrufe
  12. main.py — Orchestrierung des Reflection-Flows
  13. Beispielausgabe
  14. Typische stop_reason-Werte
  15. Was hier NICHT gezeigt wird
  16. Was du als Nächstes ausprobieren kannst

Kern des Musters (Kurz)

Reflection Agent ist ein Muster, bei dem der Agent nach einem Antwortentwurf ein kontrolliertes Self-Review macht und danach die Antwort entweder freigibt oder genau eine patch-only-Revision ausführt.

Das LLM entscheidet, wie Entwurf und Review formuliert werden, und die Execution-Layer kontrolliert, ob das innerhalb der Policy- und Runtime-Grenzen ausgeführt werden darf.


Was dieses Beispiel zeigt

  • production-like flow: Draft -> Review -> Revise (optional) -> Finalize
  • ein Review-Durchlauf und maximal eine Revision (keine Endlosschleife)
  • Policy Boundary für die Review-Entscheidung (approve | revise | escalate)
  • Execution Boundary, die die Runtime-Allowlist für Entscheidungen enforced
  • Patch-Guardrails: no_new_facts, Checks für kritische Claims/Tokens, Kontrolle der Edit-Größe
  • kontrollierte Budgets (max_seconds, Längenlimits, Issue/Fix-Limits)
  • explizite stop_reason, trace, history für Audit

Architektur

  1. LLM erzeugt einen Antwortentwurf.
  2. LLM-Review gibt eine strukturierte Entscheidung zurück (approve/revise/escalate).
  3. Gateway validiert den Review-Vertrag gemäß Policy.
  4. Gateway enforced die Execution-Allowlist (Runtime kann von Policy abweichen).
  5. Bei revise wird genau eine patch-only Revision ausgeführt.
  6. Gateway prüft den Patch (no_new_facts, kritische Tokens/Claims, fix_plan hints, similarity) und finalisiert die Antwort.

Schlüsselvertrag: LLM schlägt Änderungen vor, aber die finale Ausführungsautorität liegt bei der Execution-Layer.


Projektstruktur

TEXT
examples/
└── agent-patterns/
    └── reflection-agent/
        └── python/
            ├── main.py          # Draft -> Review -> Revise -> Finalize flow
            ├── llm.py           # draft/review/revise LLM calls
            ├── gateway.py       # policy+execution validation, patch guards
            ├── context.py       # deterministic incident context
            ├── requirements.txt
            └── README.md

Ausführen

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/reflection-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ ist erforderlich.

Variante über export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Variante über .env (optional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Das ist die Shell-Variante (macOS/Linux). Unter Windows ist es einfacher, set-Variablen zu verwenden oder optional python-dotenv, um .env automatisch zu laden.


Aufgabe

Stell dir einen Production-Fall für Incident Communication vor:

"Erstelle ein customer-facing Update zu einem P1-Zahlungsproblem in der US-Region, ohne riskante Zusagen und mit klaren next actions."

Ein einzelner Entwurf wirkt oft gut, kann aber enthalten:

  • übermäßig selbstsichere Formulierungen
  • unklare Verantwortungsgrenzen
  • Formulierungen, die wie eine Garantie klingen

Reflection fügt vor dem Versand ein kontrolliertes Review hinzu.

Lösung

In diesem Beispiel:

  • der Draft wird aus deterministischem Kontext erzeugt
  • das Review liefert strukturiertes JSON (kein Freitext)
  • revise ist höchstens einmal erlaubt
  • Gateway blockiert den Patch, wenn neue Zahlen/Fakten erscheinen oder die Änderung zu groß ist
  • escalate beendet den Run kontrolliert (status=stopped), ohne versteckte Auto-Umschreibungen

Code

context.py — deterministischer Production-Kontext

PYTHON
from __future__ import annotations

from typing import Any


def build_incident_context(*, report_date: str, region: str) -> dict[str, Any]:
    return {
        "report_date": report_date,
        "region": region,
        "incident": {
            "incident_id": "inc_payments_20260305",
            "severity": "P1",
            "status": "degraded",
            "affected_checkout_pct": 27,
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "eta_minutes": 45,
        },
        "policy_hints": {
            "avoid_absolute_guarantees": True,
            "required_sections": ["current_status", "customer_impact", "next_actions"],
        },
        "approved_actions": [
            "monitor payment failures every 15 minutes",
            "publish customer update via status page",
            "prepare support macro with workaround guidance",
        ],
    }

Was hier am wichtigsten ist (einfach erklärt)

  • Der Kontext ist fix und wiederholbar: das ist praktisch für Tests und Debugging.
  • Das LLM erfindet keine source data, sondern formuliert die Antwort nur auf diesem Kontext.

gateway.py — policy/execution boundary + patch guardrails

PYTHON
from __future__ import annotations

import hashlib
import json
import re
from dataclasses import dataclass
from difflib import SequenceMatcher
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 30
    max_draft_chars: int = 900
    max_review_issues: int = 4
    max_fix_items: int = 4
    max_answer_chars: int = 900
    min_patch_similarity: float = 0.45


NUMBER_TOKEN_RE = re.compile(r"\b\d+(?:\.\d+)?%?\b")
INCIDENT_ID_RE = re.compile(r"\binc_[a-z0-9_]+\b", re.IGNORECASE)
SEVERITY_RE = re.compile(r"\bp[0-5]\b", re.IGNORECASE)
REGION_RE = re.compile(r"\b(us|eu|uk|ua|apac|global|emea|latam)\b", re.IGNORECASE)
QUOTED_PHRASE_RE = re.compile(r"['\"]([^'\"]{3,120})['\"]")

RESTRICTED_CLAIMS_RE = [
    re.compile(r"\bresolved\b", re.IGNORECASE),
    re.compile(r"\bfully[-\s]+recovered\b", re.IGNORECASE),
    re.compile(r"\bincident\s+closed\b", re.IGNORECASE),
    re.compile(r"\ball payments (?:are|is)\s+stable\b", re.IGNORECASE),
]


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(v) for v in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def _normalize_space(text: str) -> str:
    return " ".join((text or "").strip().split())


def text_hash(text: str) -> str:
    normalized = _normalize_space(text)
    raw = _stable_json(normalized)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def _extract_number_tokens(text: str) -> set[str]:
    normalized = _normalize_space(text).lower()
    return set(NUMBER_TOKEN_RE.findall(normalized))


def _extract_incident_ids(text: str) -> set[str]:
    normalized = _normalize_space(text).lower()
    return set(INCIDENT_ID_RE.findall(normalized))


def _extract_severity_labels(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {match.upper() for match in SEVERITY_RE.findall(normalized)}


def _extract_regions(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {value.upper() for value in REGION_RE.findall(normalized)}


def _extract_fix_plan_phrase_rules(fix_plan: list[str]) -> dict[str, list[str]]:
    must_include: list[str] = []
    must_remove: list[str] = []

    def _append_unique(target: list[str], value: str) -> None:
        if value and value not in target:
            target.append(value)

    for item in fix_plan:
        item_norm = _normalize_space(item).lower()
        quoted = [_normalize_space(match).lower() for match in QUOTED_PHRASE_RE.findall(item)]
        quoted = [value for value in quoted if value]
        if not quoted:
            continue

        is_replace = "replace" in item_norm
        is_modify = any(word in item_norm for word in ("modify", "change", "update", "rewrite"))
        has_with = " with " in f" {item_norm} "
        has_example_marker = any(
            marker in item_norm for marker in ("such as", "for example", "e.g.", "e.g")
        )

        if is_replace or is_modify:
            _append_unique(must_remove, quoted[0])

            # Enforce phrase add only for strict replace-with instructions, not modify/example hints.
            if is_replace and len(quoted) >= 2 and has_with and not has_example_marker:
                _append_unique(must_include, quoted[1])
            continue

        for phrase in quoted:
            _append_unique(must_include, phrase)

    return {
        "must_include": must_include,
        "must_remove": must_remove,
    }


def _context_claim_text(value: Any) -> str:
    if value is None:
        return ""
    if isinstance(value, str):
        return value
    if isinstance(value, (bool, int, float)):
        return str(value)
    if isinstance(value, list):
        return " ".join(_context_claim_text(item) for item in value)
    if isinstance(value, dict):
        parts: list[str] = []
        for key, item in value.items():
            parts.append(str(key))
            parts.append(_context_claim_text(item))
        return " ".join(parts)
    return str(value)


def _is_high_risk_issue(issue_type: str) -> bool:
    return issue_type in {"legal_risk", "policy_violation"}


def validate_draft(draft: Any, *, max_chars: int) -> str:
    if not isinstance(draft, str) or not draft.strip():
        raise StopRun("invalid_draft:empty")
    normalized = draft.strip()
    if len(normalized) > max_chars:
        raise StopRun("invalid_draft:too_long")
    return normalized


def validate_review(
    raw: Any,
    *,
    allowed_decisions_policy: set[str],
    allowed_issue_types_policy: set[str],
    max_review_issues: int,
    max_fix_items: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_review:not_object")

    decision = raw.get("decision")
    if not isinstance(decision, str) or not decision.strip():
        raise StopRun("invalid_review:decision")
    decision = decision.strip()
    if decision not in allowed_decisions_policy:
        raise StopRun(f"review_decision_not_allowed_policy:{decision}")

    issues_raw = raw.get("issues", [])
    if not isinstance(issues_raw, list):
        raise StopRun("invalid_review:issues")
    if len(issues_raw) > max_review_issues:
        raise StopRun("invalid_review:too_many_issues")

    issues: list[dict[str, str]] = []
    for item in issues_raw:
        if not isinstance(item, dict):
            raise StopRun("invalid_review:issue_item")

        issue_type = item.get("type")
        note = item.get("note")

        if not isinstance(issue_type, str) or not issue_type.strip():
            raise StopRun("invalid_review:issue_type")
        issue_type = issue_type.strip()
        if issue_type not in allowed_issue_types_policy:
            raise StopRun(f"review_issue_not_allowed_policy:{issue_type}")

        if not isinstance(note, str) or not note.strip():
            raise StopRun("invalid_review:issue_note")

        issues.append({"type": issue_type, "note": note.strip()})

    fix_plan_raw = raw.get("fix_plan", [])
    if not isinstance(fix_plan_raw, list):
        raise StopRun("invalid_review:fix_plan")
    if len(fix_plan_raw) > max_fix_items:
        raise StopRun("invalid_review:too_many_fix_items")

    fix_plan: list[str] = []
    for item in fix_plan_raw:
        if not isinstance(item, str) or not item.strip():
            raise StopRun("invalid_review:fix_item")
        fix_plan.append(item.strip())

    reason = raw.get("reason", "")
    if reason is None:
        reason = ""
    if not isinstance(reason, str):
        raise StopRun("invalid_review:reason")
    reason = reason.strip()

    if decision == "approve":
        if issues and any(_is_high_risk_issue(issue["type"]) for issue in issues):
            raise StopRun("invalid_review:approve_with_high_risk_issue")
        return {
            "decision": "approve",
            "issues": issues,
            "fix_plan": [],
            "reason": reason,
            "high_risk": False,
        }

    if decision == "revise":
        if not issues:
            raise StopRun("invalid_review:revise_without_issues")
        if not fix_plan:
            raise StopRun("invalid_review:revise_without_fix_plan")
        if any(_is_high_risk_issue(issue["type"]) for issue in issues):
            raise StopRun("invalid_review:high_risk_requires_escalate")
        return {
            "decision": "revise",
            "issues": issues,
            "fix_plan": fix_plan,
            "reason": reason,
            "high_risk": False,
        }

    if decision == "escalate":
        if not reason:
            raise StopRun("invalid_review:escalate_reason_required")
        return {
            "decision": "escalate",
            "issues": issues,
            "fix_plan": [],
            "reason": reason,
            "high_risk": True,
        }

    raise StopRun("invalid_review:unknown_decision")


class ReflectionGateway:
    def __init__(
        self,
        *,
        allow_execution_decisions: set[str],
        budget: Budget,
    ):
        self.allow_execution_decisions = set(allow_execution_decisions)
        self.budget = budget

    def enforce_execution_decision(self, decision: str) -> None:
        if decision not in self.allow_execution_decisions:
            raise StopRun(f"review_decision_denied_execution:{decision}")

    def validate_revision(
        self,
        *,
        original: str,
        revised: str,
        context: dict[str, Any],
        fix_plan: list[str] | None = None,
    ) -> dict[str, Any]:
        if not isinstance(revised, str) or not revised.strip():
            raise StopRun("invalid_revised:empty")

        revised_clean = revised.strip()
        if len(revised_clean) > self.budget.max_answer_chars:
            raise StopRun("invalid_revised:too_long")

        normalized_original = _normalize_space(original)
        normalized_revised = _normalize_space(revised_clean)
        if normalized_original == normalized_revised:
            raise StopRun("invalid_revised:no_changes")

        similarity = SequenceMatcher(a=normalized_original, b=normalized_revised).ratio()
        if similarity < self.budget.min_patch_similarity:
            raise StopRun("patch_violation:too_large_edit")

        allowed_text_tokens = _stable_json(context) + " " + original
        allowed_text_claims = _normalize_space(_context_claim_text(context) + " " + original)
        revised_numbers = _extract_number_tokens(revised_clean)
        allowed_numbers = _extract_number_tokens(allowed_text_tokens)
        if revised_numbers - allowed_numbers:
            raise StopRun("patch_violation:no_new_facts")

        revised_ids = _extract_incident_ids(revised_clean)
        allowed_ids = _extract_incident_ids(allowed_text_tokens)
        if revised_ids - allowed_ids:
            raise StopRun("patch_violation:new_incident_id")

        revised_severity = _extract_severity_labels(revised_clean)
        allowed_severity = _extract_severity_labels(allowed_text_tokens)
        if revised_severity - allowed_severity:
            raise StopRun("patch_violation:new_severity_label")

        revised_regions = _extract_regions(revised_clean)
        allowed_regions = _extract_regions(allowed_text_tokens)
        if revised_regions - allowed_regions:
            raise StopRun("patch_violation:new_region")

        for claim_re in RESTRICTED_CLAIMS_RE:
            if claim_re.search(revised_clean) and not claim_re.search(allowed_text_claims):
                raise StopRun("patch_violation:restricted_claims")

        phrase_rules = _extract_fix_plan_phrase_rules(fix_plan or [])
        must_include = phrase_rules["must_include"]
        must_remove = phrase_rules["must_remove"]
        if must_include or must_remove:
            revised_lower = _normalize_space(revised_clean).lower()
            missing = [phrase for phrase in must_include if phrase not in revised_lower]
            if missing:
                raise StopRun("patch_violation:fix_plan_not_applied")
            still_present = [phrase for phrase in must_remove if phrase in revised_lower]
            if still_present:
                raise StopRun("patch_violation:fix_plan_not_applied")

        return {
            "answer": revised_clean,
            "patch_similarity": round(similarity, 3),
            "fix_plan_quoted_checks": len(must_include) + len(must_remove),
        }

    def validate_final(self, answer: str) -> str:
        if not isinstance(answer, str) or not answer.strip():
            raise StopRun("invalid_answer:empty")

        cleaned = answer.strip()
        if len(cleaned) > self.budget.max_answer_chars:
            raise StopRun("invalid_answer:too_long")
        return cleaned

Was hier am wichtigsten ist (einfach erklärt)

  • Gateway enforced nur Execution-Entscheidungen, die aus main.py kommen.
  • Policy-Allowlist und Execution-Allowlist sind getrennt: Runtime kann strenger sein.
  • Nach der Revision blockiert Gateway neue Fakten über Zahlen, kritische Tokens (incident_id/region/severity) und restricted claims.
  • Für Token-Checks wird stabiler JSON-Kontext verwendet, für Claim-Checks Plain-Text-Kontext, damit Regex-Checks zuverlässiger sind.
  • Wenn fix_plan quoted phrases enthält, baut Gateway must_include/must_remove-Regeln und prüft sie in der Revision.
  • Bei replace/modify/change/update/rewrite-Anweisungen wird die erste quoted phrase als must_remove geprüft.
  • Bei replace "A" with "B" wird phrase B nur dann als must_include geprüft, wenn es kein Beispiel ist (such as/for example/e.g.).
  • fix_plan_quoted_checks zählt nur diese enforce-Regeln und kann daher kleiner als die Anzahl der fix_plan-Punkte sein.

llm.py — Draft/Review/Revise LLM-Aufrufe

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


DRAFT_SYSTEM_PROMPT = """
You are an operations communications writer.
Return exactly one JSON object:
{
  "draft": "short customer-safe incident update"
}

Rules:
- Use only facts from provided incident_context.
- Include current status, customer impact, and next actions.
- Avoid absolute guarantees and overconfident claims.
- Keep draft concise and actionable.
- Do not output markdown or extra keys.
""".strip()

REVIEW_SYSTEM_PROMPT = """
You are a reflection reviewer.
Return exactly one JSON object:
{
  "decision": "approve|revise|escalate",
  "issues": [{"type":"overconfidence","note":"..."}],
  "fix_plan": ["patch instruction"],
  "reason": "for escalate only"
}

Rules:
- Review exactly once.
- decision=approve: fix_plan must be empty.
- decision=revise: provide 1-4 concrete patch-only instructions.
- For enforceable instructions, include quoted target phrases in fix_plan.
- decision=escalate: use only for high-risk or policy-unsafe content.
- Do not add new facts in fix_plan.
- Use only issue types from allowed_issue_types.
- Do not output markdown or extra keys.
""".strip()

REVISE_SYSTEM_PROMPT = """
You are an editor applying one controlled patch.
Return exactly one JSON object:
{
  "revised_answer": "updated answer"
}

Rules:
- Edit only what is needed to satisfy fix_plan.
- Keep scope and intent of original draft.
- Do not introduce new facts or numbers.
- Keep answer concise and customer-safe.
- Do not output markdown or extra keys.
""".strip()


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")
    return data


def generate_draft(*, goal: str, incident_context: dict[str, Any]) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
    }
    data = _chat_json(system_prompt=DRAFT_SYSTEM_PROMPT, payload=payload)

    draft = data.get("draft")
    if not isinstance(draft, str):
        raise LLMInvalid("llm_invalid_schema")

    draft = draft.strip()
    if not draft:
        raise LLMEmpty("llm_empty")
    return draft


def review_draft(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    allowed_issue_types: list[str],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "allowed_issue_types": allowed_issue_types,
    }
    return _chat_json(system_prompt=REVIEW_SYSTEM_PROMPT, payload=payload)


def revise_once(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    fix_plan: list[str],
) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "fix_plan": fix_plan,
    }
    data = _chat_json(system_prompt=REVISE_SYSTEM_PROMPT, payload=payload)

    revised = data.get("revised_answer")
    if not isinstance(revised, str):
        raise LLMInvalid("llm_invalid_schema")

    revised = revised.strip()
    if not revised:
        raise LLMEmpty("llm_empty")
    return revised

Was hier am wichtigsten ist (einfach erklärt)

  • Jede Stufe liefert einen JSON-Vertrag, keinen Freitext.
  • llm_invalid_json und llm_invalid_schema sind für saubere Diagnose getrennt.

main.py — Orchestrierung des Reflection-Flows

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from context import build_incident_context
from gateway import Budget, ReflectionGateway, StopRun, text_hash, validate_draft, validate_review
from llm import LLMEmpty, LLMInvalid, LLMTimeout, generate_draft, review_draft, revise_once

GOAL = (
    "Draft a customer-facing payment incident update for US enterprise customers. "
    "Keep it accurate, avoid overconfident language, and include next actions."
)
INCIDENT_CONTEXT = build_incident_context(report_date="2026-03-05", region="US")

BUDGET = Budget(
    max_seconds=30,
    max_draft_chars=900,
    max_review_issues=4,
    max_fix_items=4,
    max_answer_chars=900,
    min_patch_similarity=0.45,
)

ALLOWED_REVIEW_DECISIONS_POLICY = {"approve", "revise", "escalate"}
AUTO_REVISION_ENABLED = True
ALLOWED_REVIEW_DECISIONS_EXECUTION = (
    ALLOWED_REVIEW_DECISIONS_POLICY if AUTO_REVISION_ENABLED else {"approve", "escalate"}
)

ALLOWED_ISSUE_TYPES_POLICY = {
    "overconfidence",
    "missing_uncertainty",
    "contradiction",
    "scope_leak",
    "policy_violation",
    "legal_risk",
}


def run_reflection_agent(*, goal: str, incident_context: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = ReflectionGateway(
        allow_execution_decisions=ALLOWED_REVIEW_DECISIONS_EXECUTION,
        budget=BUDGET,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    try:
        draft_raw = generate_draft(goal=goal, incident_context=incident_context)
        draft = validate_draft(draft_raw, max_chars=BUDGET.max_draft_chars)
    except LLMTimeout:
        return stopped("llm_timeout", phase="draft")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="draft")
    except LLMEmpty:
        return stopped("llm_empty", phase="draft")
    except StopRun as exc:
        return stopped(exc.reason, phase="draft")

    trace.append(
        {
            "step": 1,
            "phase": "draft",
            "draft_hash": text_hash(draft),
            "chars": len(draft),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 1,
            "action": "draft_once",
            "draft": draft,
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="review")

    try:
        raw_review = review_draft(
            goal=goal,
            incident_context=incident_context,
            draft=draft,
            allowed_issue_types=sorted(ALLOWED_ISSUE_TYPES_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="review")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="review")

    try:
        review = validate_review(
            raw_review,
            allowed_decisions_policy=ALLOWED_REVIEW_DECISIONS_POLICY,
            allowed_issue_types_policy=ALLOWED_ISSUE_TYPES_POLICY,
            max_review_issues=BUDGET.max_review_issues,
            max_fix_items=BUDGET.max_fix_items,
        )
        gateway.enforce_execution_decision(review["decision"])
    except StopRun as exc:
        return stopped(exc.reason, phase="review", raw_review=raw_review)

    trace.append(
        {
            "step": 2,
            "phase": "review",
            "decision": review["decision"],
            "issues": len(review["issues"]),
            "fix_items": len(review["fix_plan"]),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 2,
            "action": "review_once",
            "review": review,
        }
    )

    if review["decision"] == "escalate":
        escalation_reason = str(review.get("reason", "")).strip()
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "policy_escalation",
            "escalation_reason": escalation_reason[:120],
            "phase": "review",
            "review": review,
            "trace": trace,
            "history": history,
        }

    final_answer = draft
    revised = False

    if review["decision"] == "revise":
        if (time.monotonic() - started) > BUDGET.max_seconds:
            return stopped("max_seconds", phase="revise")

        try:
            revised_raw = revise_once(
                goal=goal,
                incident_context=incident_context,
                draft=draft,
                fix_plan=review["fix_plan"],
            )
            revised_payload = gateway.validate_revision(
                original=draft,
                revised=revised_raw,
                context=incident_context,
                fix_plan=review["fix_plan"],
            )
        except LLMTimeout:
            return stopped("llm_timeout", phase="revise")
        except LLMInvalid as exc:
            return stopped(exc.args[0], phase="revise")
        except LLMEmpty:
            return stopped("llm_empty", phase="revise")
        except StopRun as exc:
            return stopped(exc.reason, phase="revise")

        final_answer = revised_payload["answer"]
        revised = True

        trace.append(
            {
                "step": 3,
                "phase": "revise",
                "patch_similarity": revised_payload["patch_similarity"],
                "fix_plan_quoted_checks": revised_payload["fix_plan_quoted_checks"],
                "revised_hash": text_hash(final_answer),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 3,
                "action": "revise_once",
                "fix_plan": review["fix_plan"],
                "revised_answer": final_answer,
            }
        )

    try:
        final_answer = gateway.validate_final(final_answer)
    except StopRun as exc:
        return stopped(exc.reason, phase="finalize")

    trace.append(
        {
            "step": 4 if revised else 3,
            "phase": "finalize",
            "final_hash": text_hash(final_answer),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 4 if revised else 3,
            "action": "finalize",
            "status": "final",
        }
    )

    return {
        "run_id": run_id,
        "status": "ok",
        "stop_reason": "success",
        "outcome": "revised_once" if revised else "approved_direct",
        "answer": final_answer,
        "review_decision": review["decision"],
        "issues": review["issues"],
        "fix_plan": review["fix_plan"],
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_reflection_agent(goal=GOAL, incident_context=INCIDENT_CONTEXT)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Was hier am wichtigsten ist (einfach erklärt)

  • Es gibt eine klare Trennung: Policy-Allowlist definiert, was konzeptionell zulässig ist; Execution-Allowlist definiert, was Runtime real erlaubt.
  • Der Run endet vorhersehbar: entweder success oder kontrollierter stop_reason.
  • trace und history machen jeden Review/Revise-Schritt nachvollziehbar.

Beispielausgabe

JSON
{
  "run_id": "f67eaad4-1c3e-4160-9d8b-1e2e9af42c82",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "revised_once",
  "answer": "We are currently experiencing payment service degradation affecting ~27% of US enterprise checkouts, with failed payments at 3.4% and 5 chargeback alerts. We expect recovery in approximately 45 minutes, but this may change. Next actions: monitor failures every 15 minutes, publish status page updates every 15 minutes, and support users with workaround macros.",
  "review_decision": "revise",
  "issues": [
    {
      "type": "overconfidence",
      "note": "ETA wording sounded too certain and could be read as a guarantee."
    }
  ],
  "fix_plan": [
    "Add uncertainty wording for ETA (for example: 'recovery in approximately 45 minutes, but this may change').",
    "State explicitly that status page updates will be published every 15 minutes.",
    "Keep a clear 'Next actions' section with monitoring, updates, and support macros."
  ],
  "trace": [
    {
      "step": 1,
      "phase": "draft",
      "draft_hash": "f4b3f386c80a",
      "chars": 547,
      "ok": true
    },
    {
      "step": 2,
      "phase": "review",
      "decision": "revise",
      "issues": 1,
      "fix_items": 3,
      "ok": true
    },
    {
      "step": 3,
      "phase": "revise",
      "patch_similarity": 0.564,
      "fix_plan_quoted_checks": 1,
      "revised_hash": "2a5ac4952ae0",
      "ok": true
    },
    {
      "step": 4,
      "phase": "finalize",
      "final_hash": "2a5ac4952ae0",
      "ok": true
    }
  ],
  "history": [{...}]
}

Typische stop_reason-Werte

  • success — Run wurde korrekt beendet
  • llm_timeout — LLM hat innerhalb von OPENAI_TIMEOUT_SECONDS nicht geantwortet
  • llm_empty — leere LLM-Antwort in Draft/Revise
  • llm_invalid_json — LLM hat ungültiges JSON zurückgegeben
  • llm_invalid_schema — JSON entspricht nicht dem Vertrag
  • invalid_draft:* — Entwurf hat die Basisvalidierung nicht bestanden
  • invalid_review:* — Review hat den Policy-Layer-Vertrag nicht bestanden
  • review_decision_not_allowed_policy:* — Review-Entscheidung liegt außerhalb der Policy-Allowlist
  • review_decision_denied_execution:* — Runtime hat die Execution-Entscheidung abgelehnt
  • patch_violation:no_new_facts — Revision hat neue numerische Fakten hinzugefügt
  • patch_violation:new_incident_id|new_severity_label|new_region — Revision hat neue kritische Identifikatoren hinzugefügt
  • patch_violation:restricted_claims — Revision hat verbotene Claims (resolved, fully recovered usw.) außerhalb des Kontexts hinzugefügt
  • patch_violation:fix_plan_not_applied — Revision hat quoted hints aus fix_plan nicht angewandt
  • patch_violation:too_large_edit — Revision überschreitet den patch-only Rahmen
  • policy_escalation — Review gab Eskalation als finale Entscheidung zurück; Detail in escalation_reason
  • max_seconds — gesamtes Run-Zeitbudget überschritten
  • invalid_answer:* — finale Antwort hat die Validierung nicht bestanden

Was hier NICHT gezeigt wird

  • persistenter Speicher für trace/history (im Beispiel läuft alles in einem Run)
  • retry/backoff für LLM-Aufrufe
  • human-in-the-loop queue für escalate
  • automatisches Patch-Diff-Rendering für UI

Was du als Nächstes ausprobieren kannst

  1. AUTO_REVISION_ENABLED deaktivieren und review_decision_denied_execution:revise beobachten.
  2. Strikte Prüfung erlaubter Änderungen per fix_plan hinzufügen (line-level patch contract).
  3. policy_escalation in eine externe queue für manuelles Review ausleiten.
⏱️ 16 Min. LesezeitAktualisiert Mär, 2026Schwierigkeit: ★★☆
Integriert: Production ControlOnceOnly
Guardrails für Tool-Calling-Agents
Shippe dieses Pattern mit Governance:
  • Budgets (Steps / Spend Caps)
  • Tool-Permissions (Allowlist / Blocklist)
  • Kill switch & Incident Stop
  • Idempotenz & Dedupe
  • Audit logs & Nachvollziehbarkeit
Integrierter Hinweis: OnceOnly ist eine Control-Layer für Production-Agent-Systeme.
Autor

Diese Dokumentation wird von Engineers kuratiert und gepflegt, die AI-Agenten in der Produktion betreiben.

Die Inhalte sind KI-gestützt, mit menschlicher redaktioneller Verantwortung für Genauigkeit, Klarheit und Produktionsrelevanz.

Patterns und Empfehlungen basieren auf Post-Mortems, Failure-Modes und operativen Incidents in produktiven Systemen, auch bei der Entwicklung und dem Betrieb von Governance-Infrastruktur für Agenten bei OnceOnly.