Reflection Agent — Python (повна реалізація з LLM)

Production-style runnable приклад Reflection агента на Python з одноразовим review, patch-only revise, policy/execution boundary і явними stop reasons.
На цій сторінці
  1. Суть патерна (коротко)
  2. Що демонструє цей приклад
  3. Архітектура
  4. Структура проєкту
  5. Як запустити
  6. Задача
  7. Рішення
  8. Код
  9. context.py — детермінований продакшен-контекст
  10. gateway.py — policy/execution boundary + patch guardrails
  11. llm.py — draft/review/revise виклики LLM
  12. main.py — оркестрація Reflection flow
  13. Приклад виводу
  14. Типові stop_reason
  15. Що тут НЕ показано
  16. Що спробувати далі

Суть патерна (коротко)

Reflection Agent — це патерн, у якому після чернетки відповіді агент робить один керований self-review, а потім або затверджує відповідь, або робить одну patch-only ревізію.

LLM вирішує, як сформулювати чернетку й review, а execution-layer контролює, чи можна це виконати в межах policy та runtime-обмежень.


Що демонструє цей приклад

  • production-like flow: Draft -> Review -> Revise (optional) -> Finalize
  • один review-прохід і максимум одна ревізія (без нескінченного loop)
  • policy boundary для review-рішення (approve | revise | escalate)
  • execution boundary, який enforce-ить runtime allowlist рішень
  • patch guardrails: no_new_facts, критичні claims/token checks, контроль розміру редагування
  • керовані бюджети (max_seconds, ліміти довжини, ліміти issue/fix)
  • явні stop_reason, trace, history для аудиту

Архітектура

  1. LLM генерує чернетку відповіді.
  2. LLM-review повертає структуроване рішення (approve/revise/escalate).
  3. Gateway валідовує review-контракт за policy.
  4. Gateway enforce-ить execution allowlist (runtime може відрізнятися від policy).
  5. Якщо revise, виконується одна patch-only ревізія.
  6. Gateway перевіряє patch (no_new_facts, критичні токени/claims, fix_plan hints, similarity) і фіналізує відповідь.

Ключовий контракт: LLM пропонує зміни, але фінальне право виконання належить execution-layer.


Структура проєкту

TEXT
examples/
└── agent-patterns/
    └── reflection-agent/
        └── python/
            ├── main.py          # Draft -> Review -> Revise -> Finalize flow
            ├── llm.py           # draft/review/revise LLM calls
            ├── gateway.py       # policy+execution validation, patch guards
            ├── context.py       # deterministic incident context
            ├── requirements.txt
            └── README.md

Як запустити

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/reflection-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Потрібен Python 3.11+.

Варіант через export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Варіант через .env (опційно)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Це shell-варіант (macOS/Linux). На Windows простіше використовувати set змінних або, за бажанням, python-dotenv, щоб підвантажувати .env автоматично.


Задача

Уяви продакшен-кейс для incident communication:

"Підготуй customer-facing оновлення про P1 проблему з платежами в US, без ризикових обіцянок і з чіткими next actions."

Одна чернетка часто виглядає нормально, але може містити:

  • overconfident формулювання
  • нечітку межу відповідальності
  • фрази, які звучать як гарантія

Reflection додає один контрольований review перед відправкою.

Рішення

У цьому прикладі:

  • draft створюється на основі детермінованого контексту
  • review повертає структурований JSON (не вільний текст)
  • revise дозволено максимум один раз
  • gateway блокує patch, якщо з’явились нові числа/факти або edit надто великий
  • escalate завершує run контрольовано (status=stopped), без прихованих автопереписувань

Код

context.py — детермінований продакшен-контекст

PYTHON
from __future__ import annotations

from typing import Any


def build_incident_context(*, report_date: str, region: str) -> dict[str, Any]:
    return {
        "report_date": report_date,
        "region": region,
        "incident": {
            "incident_id": "inc_payments_20260305",
            "severity": "P1",
            "status": "degraded",
            "affected_checkout_pct": 27,
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "eta_minutes": 45,
        },
        "policy_hints": {
            "avoid_absolute_guarantees": True,
            "required_sections": ["current_status", "customer_impact", "next_actions"],
        },
        "approved_actions": [
            "monitor payment failures every 15 minutes",
            "publish customer update via status page",
            "prepare support macro with workaround guidance",
        ],
    }

Що тут найважливіше (простими словами)

  • Контекст фіксований і повторюваний: це зручно для тестів і дебагу.
  • LLM не вигадує source data, а тільки формулює відповідь поверх цього контексту.

gateway.py — policy/execution boundary + patch guardrails

PYTHON
from __future__ import annotations

import hashlib
import json
import re
from dataclasses import dataclass
from difflib import SequenceMatcher
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 30
    max_draft_chars: int = 900
    max_review_issues: int = 4
    max_fix_items: int = 4
    max_answer_chars: int = 900
    min_patch_similarity: float = 0.45


NUMBER_TOKEN_RE = re.compile(r"\b\d+(?:\.\d+)?%?\b")
INCIDENT_ID_RE = re.compile(r"\binc_[a-z0-9_]+\b", re.IGNORECASE)
SEVERITY_RE = re.compile(r"\bp[0-5]\b", re.IGNORECASE)
REGION_RE = re.compile(r"\b(us|eu|uk|ua|apac|global|emea|latam)\b", re.IGNORECASE)
QUOTED_PHRASE_RE = re.compile(r"['\"]([^'\"]{3,120})['\"]")

RESTRICTED_CLAIMS_RE = [
    re.compile(r"\bresolved\b", re.IGNORECASE),
    re.compile(r"\bfully[-\s]+recovered\b", re.IGNORECASE),
    re.compile(r"\bincident\s+closed\b", re.IGNORECASE),
    re.compile(r"\ball payments (?:are|is)\s+stable\b", re.IGNORECASE),
]


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(v) for v in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def _normalize_space(text: str) -> str:
    return " ".join((text or "").strip().split())


def text_hash(text: str) -> str:
    normalized = _normalize_space(text)
    raw = _stable_json(normalized)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def _extract_number_tokens(text: str) -> set[str]:
    normalized = _normalize_space(text).lower()
    return set(NUMBER_TOKEN_RE.findall(normalized))


def _extract_incident_ids(text: str) -> set[str]:
    normalized = _normalize_space(text).lower()
    return set(INCIDENT_ID_RE.findall(normalized))


def _extract_severity_labels(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {match.upper() for match in SEVERITY_RE.findall(normalized)}


def _extract_regions(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {value.upper() for value in REGION_RE.findall(normalized)}


def _extract_fix_plan_phrase_rules(fix_plan: list[str]) -> dict[str, list[str]]:
    must_include: list[str] = []
    must_remove: list[str] = []

    def _append_unique(target: list[str], value: str) -> None:
        if value and value not in target:
            target.append(value)

    for item in fix_plan:
        item_norm = _normalize_space(item).lower()
        quoted = [_normalize_space(match).lower() for match in QUOTED_PHRASE_RE.findall(item)]
        quoted = [value for value in quoted if value]
        if not quoted:
            continue

        is_replace = "replace" in item_norm
        is_modify = any(word in item_norm for word in ("modify", "change", "update", "rewrite"))
        has_with = " with " in f" {item_norm} "
        has_example_marker = any(
            marker in item_norm for marker in ("such as", "for example", "e.g.", "e.g")
        )

        if is_replace or is_modify:
            _append_unique(must_remove, quoted[0])

            # Enforce phrase add only for strict replace-with instructions, not modify/example hints.
            if is_replace and len(quoted) >= 2 and has_with and not has_example_marker:
                _append_unique(must_include, quoted[1])
            continue

        for phrase in quoted:
            _append_unique(must_include, phrase)

    return {
        "must_include": must_include,
        "must_remove": must_remove,
    }


def _context_claim_text(value: Any) -> str:
    if value is None:
        return ""
    if isinstance(value, str):
        return value
    if isinstance(value, (bool, int, float)):
        return str(value)
    if isinstance(value, list):
        return " ".join(_context_claim_text(item) for item in value)
    if isinstance(value, dict):
        parts: list[str] = []
        for key, item in value.items():
            parts.append(str(key))
            parts.append(_context_claim_text(item))
        return " ".join(parts)
    return str(value)


def _is_high_risk_issue(issue_type: str) -> bool:
    return issue_type in {"legal_risk", "policy_violation"}


def validate_draft(draft: Any, *, max_chars: int) -> str:
    if not isinstance(draft, str) or not draft.strip():
        raise StopRun("invalid_draft:empty")
    normalized = draft.strip()
    if len(normalized) > max_chars:
        raise StopRun("invalid_draft:too_long")
    return normalized


def validate_review(
    raw: Any,
    *,
    allowed_decisions_policy: set[str],
    allowed_issue_types_policy: set[str],
    max_review_issues: int,
    max_fix_items: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_review:not_object")

    decision = raw.get("decision")
    if not isinstance(decision, str) or not decision.strip():
        raise StopRun("invalid_review:decision")
    decision = decision.strip()
    if decision not in allowed_decisions_policy:
        raise StopRun(f"review_decision_not_allowed_policy:{decision}")

    issues_raw = raw.get("issues", [])
    if not isinstance(issues_raw, list):
        raise StopRun("invalid_review:issues")
    if len(issues_raw) > max_review_issues:
        raise StopRun("invalid_review:too_many_issues")

    issues: list[dict[str, str]] = []
    for item in issues_raw:
        if not isinstance(item, dict):
            raise StopRun("invalid_review:issue_item")

        issue_type = item.get("type")
        note = item.get("note")

        if not isinstance(issue_type, str) or not issue_type.strip():
            raise StopRun("invalid_review:issue_type")
        issue_type = issue_type.strip()
        if issue_type not in allowed_issue_types_policy:
            raise StopRun(f"review_issue_not_allowed_policy:{issue_type}")

        if not isinstance(note, str) or not note.strip():
            raise StopRun("invalid_review:issue_note")

        issues.append({"type": issue_type, "note": note.strip()})

    fix_plan_raw = raw.get("fix_plan", [])
    if not isinstance(fix_plan_raw, list):
        raise StopRun("invalid_review:fix_plan")
    if len(fix_plan_raw) > max_fix_items:
        raise StopRun("invalid_review:too_many_fix_items")

    fix_plan: list[str] = []
    for item in fix_plan_raw:
        if not isinstance(item, str) or not item.strip():
            raise StopRun("invalid_review:fix_item")
        fix_plan.append(item.strip())

    reason = raw.get("reason", "")
    if reason is None:
        reason = ""
    if not isinstance(reason, str):
        raise StopRun("invalid_review:reason")
    reason = reason.strip()

    if decision == "approve":
        if issues and any(_is_high_risk_issue(issue["type"]) for issue in issues):
            raise StopRun("invalid_review:approve_with_high_risk_issue")
        return {
            "decision": "approve",
            "issues": issues,
            "fix_plan": [],
            "reason": reason,
            "high_risk": False,
        }

    if decision == "revise":
        if not issues:
            raise StopRun("invalid_review:revise_without_issues")
        if not fix_plan:
            raise StopRun("invalid_review:revise_without_fix_plan")
        if any(_is_high_risk_issue(issue["type"]) for issue in issues):
            raise StopRun("invalid_review:high_risk_requires_escalate")
        return {
            "decision": "revise",
            "issues": issues,
            "fix_plan": fix_plan,
            "reason": reason,
            "high_risk": False,
        }

    if decision == "escalate":
        if not reason:
            raise StopRun("invalid_review:escalate_reason_required")
        return {
            "decision": "escalate",
            "issues": issues,
            "fix_plan": [],
            "reason": reason,
            "high_risk": True,
        }

    raise StopRun("invalid_review:unknown_decision")


class ReflectionGateway:
    def __init__(
        self,
        *,
        allow_execution_decisions: set[str],
        budget: Budget,
    ):
        self.allow_execution_decisions = set(allow_execution_decisions)
        self.budget = budget

    def enforce_execution_decision(self, decision: str) -> None:
        if decision not in self.allow_execution_decisions:
            raise StopRun(f"review_decision_denied_execution:{decision}")

    def validate_revision(
        self,
        *,
        original: str,
        revised: str,
        context: dict[str, Any],
        fix_plan: list[str] | None = None,
    ) -> dict[str, Any]:
        if not isinstance(revised, str) or not revised.strip():
            raise StopRun("invalid_revised:empty")

        revised_clean = revised.strip()
        if len(revised_clean) > self.budget.max_answer_chars:
            raise StopRun("invalid_revised:too_long")

        normalized_original = _normalize_space(original)
        normalized_revised = _normalize_space(revised_clean)
        if normalized_original == normalized_revised:
            raise StopRun("invalid_revised:no_changes")

        similarity = SequenceMatcher(a=normalized_original, b=normalized_revised).ratio()
        if similarity < self.budget.min_patch_similarity:
            raise StopRun("patch_violation:too_large_edit")

        allowed_text_tokens = _stable_json(context) + " " + original
        allowed_text_claims = _normalize_space(_context_claim_text(context) + " " + original)
        revised_numbers = _extract_number_tokens(revised_clean)
        allowed_numbers = _extract_number_tokens(allowed_text_tokens)
        if revised_numbers - allowed_numbers:
            raise StopRun("patch_violation:no_new_facts")

        revised_ids = _extract_incident_ids(revised_clean)
        allowed_ids = _extract_incident_ids(allowed_text_tokens)
        if revised_ids - allowed_ids:
            raise StopRun("patch_violation:new_incident_id")

        revised_severity = _extract_severity_labels(revised_clean)
        allowed_severity = _extract_severity_labels(allowed_text_tokens)
        if revised_severity - allowed_severity:
            raise StopRun("patch_violation:new_severity_label")

        revised_regions = _extract_regions(revised_clean)
        allowed_regions = _extract_regions(allowed_text_tokens)
        if revised_regions - allowed_regions:
            raise StopRun("patch_violation:new_region")

        for claim_re in RESTRICTED_CLAIMS_RE:
            if claim_re.search(revised_clean) and not claim_re.search(allowed_text_claims):
                raise StopRun("patch_violation:restricted_claims")

        phrase_rules = _extract_fix_plan_phrase_rules(fix_plan or [])
        must_include = phrase_rules["must_include"]
        must_remove = phrase_rules["must_remove"]
        if must_include or must_remove:
            revised_lower = _normalize_space(revised_clean).lower()
            missing = [phrase for phrase in must_include if phrase not in revised_lower]
            if missing:
                raise StopRun("patch_violation:fix_plan_not_applied")
            still_present = [phrase for phrase in must_remove if phrase in revised_lower]
            if still_present:
                raise StopRun("patch_violation:fix_plan_not_applied")

        return {
            "answer": revised_clean,
            "patch_similarity": round(similarity, 3),
            "fix_plan_quoted_checks": len(must_include) + len(must_remove),
        }

    def validate_final(self, answer: str) -> str:
        if not isinstance(answer, str) or not answer.strip():
            raise StopRun("invalid_answer:empty")

        cleaned = answer.strip()
        if len(cleaned) > self.budget.max_answer_chars:
            raise StopRun("invalid_answer:too_long")
        return cleaned

Що тут найважливіше (простими словами)

  • Gateway тільки enforce-ить execution-рішення, які передає main.py.
  • Policy allowlist і execution allowlist розділені: runtime може бути суворішим.
  • Після ревізії gateway блокує нові факти за числами, критичними токенами (incident_id/region/severity) і restricted claims.
  • Для token-checks використовується стабільний JSON-контекст, а для claim-checks — plain текст контексту, щоб regex-перевірки були надійніші.
  • Якщо в fix_plan є quoted phrases, gateway формує правила must_include/must_remove і перевіряє їх у ревізії.
  • Для інструкцій replace/modify/change/update/rewrite перша quoted фраза перевіряється як must_remove.
  • Для replace 'A' with 'B' фраза B перевіряється як must_include лише коли це не приклад (such as/for example/e.g.).
  • fix_plan_quoted_checks рахує тільки ці enforce-правила, тому може бути меншим за кількість пунктів fix_plan.

llm.py — draft/review/revise виклики LLM

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


DRAFT_SYSTEM_PROMPT = """
You are an operations communications writer.
Return exactly one JSON object:
{
  "draft": "short customer-safe incident update"
}

Rules:
- Use only facts from provided incident_context.
- Include current status, customer impact, and next actions.
- Avoid absolute guarantees and overconfident claims.
- Keep draft concise and actionable.
- Do not output markdown or extra keys.
""".strip()

REVIEW_SYSTEM_PROMPT = """
You are a reflection reviewer.
Return exactly one JSON object:
{
  "decision": "approve|revise|escalate",
  "issues": [{"type":"overconfidence","note":"..."}],
  "fix_plan": ["patch instruction"],
  "reason": "for escalate only"
}

Rules:
- Review exactly once.
- decision=approve: fix_plan must be empty.
- decision=revise: provide 1-4 concrete patch-only instructions.
- For enforceable instructions, include quoted target phrases in fix_plan.
- decision=escalate: use only for high-risk or policy-unsafe content.
- Do not add new facts in fix_plan.
- Use only issue types from allowed_issue_types.
- Do not output markdown or extra keys.
""".strip()

REVISE_SYSTEM_PROMPT = """
You are an editor applying one controlled patch.
Return exactly one JSON object:
{
  "revised_answer": "updated answer"
}

Rules:
- Edit only what is needed to satisfy fix_plan.
- Keep scope and intent of original draft.
- Do not introduce new facts or numbers.
- Keep answer concise and customer-safe.
- Do not output markdown or extra keys.
""".strip()


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")
    return data


def generate_draft(*, goal: str, incident_context: dict[str, Any]) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
    }
    data = _chat_json(system_prompt=DRAFT_SYSTEM_PROMPT, payload=payload)

    draft = data.get("draft")
    if not isinstance(draft, str):
        raise LLMInvalid("llm_invalid_schema")

    draft = draft.strip()
    if not draft:
        raise LLMEmpty("llm_empty")
    return draft


def review_draft(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    allowed_issue_types: list[str],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "allowed_issue_types": allowed_issue_types,
    }
    return _chat_json(system_prompt=REVIEW_SYSTEM_PROMPT, payload=payload)


def revise_once(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    fix_plan: list[str],
) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "fix_plan": fix_plan,
    }
    data = _chat_json(system_prompt=REVISE_SYSTEM_PROMPT, payload=payload)

    revised = data.get("revised_answer")
    if not isinstance(revised, str):
        raise LLMInvalid("llm_invalid_schema")

    revised = revised.strip()
    if not revised:
        raise LLMEmpty("llm_empty")
    return revised

Що тут найважливіше (простими словами)

  • Кожен етап повертає JSON-контракт, не вільний текст.
  • llm_invalid_json і llm_invalid_schema відділені для чистої діагностики.

main.py — оркестрація Reflection flow

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from context import build_incident_context
from gateway import Budget, ReflectionGateway, StopRun, text_hash, validate_draft, validate_review
from llm import LLMEmpty, LLMInvalid, LLMTimeout, generate_draft, review_draft, revise_once

GOAL = (
    "Draft a customer-facing payment incident update for US enterprise customers. "
    "Keep it accurate, avoid overconfident language, and include next actions."
)
INCIDENT_CONTEXT = build_incident_context(report_date="2026-03-05", region="US")

BUDGET = Budget(
    max_seconds=30,
    max_draft_chars=900,
    max_review_issues=4,
    max_fix_items=4,
    max_answer_chars=900,
    min_patch_similarity=0.45,
)

ALLOWED_REVIEW_DECISIONS_POLICY = {"approve", "revise", "escalate"}
AUTO_REVISION_ENABLED = True
ALLOWED_REVIEW_DECISIONS_EXECUTION = (
    ALLOWED_REVIEW_DECISIONS_POLICY if AUTO_REVISION_ENABLED else {"approve", "escalate"}
)

ALLOWED_ISSUE_TYPES_POLICY = {
    "overconfidence",
    "missing_uncertainty",
    "contradiction",
    "scope_leak",
    "policy_violation",
    "legal_risk",
}


def run_reflection_agent(*, goal: str, incident_context: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = ReflectionGateway(
        allow_execution_decisions=ALLOWED_REVIEW_DECISIONS_EXECUTION,
        budget=BUDGET,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    try:
        draft_raw = generate_draft(goal=goal, incident_context=incident_context)
        draft = validate_draft(draft_raw, max_chars=BUDGET.max_draft_chars)
    except LLMTimeout:
        return stopped("llm_timeout", phase="draft")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="draft")
    except LLMEmpty:
        return stopped("llm_empty", phase="draft")
    except StopRun as exc:
        return stopped(exc.reason, phase="draft")

    trace.append(
        {
            "step": 1,
            "phase": "draft",
            "draft_hash": text_hash(draft),
            "chars": len(draft),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 1,
            "action": "draft_once",
            "draft": draft,
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="review")

    try:
        raw_review = review_draft(
            goal=goal,
            incident_context=incident_context,
            draft=draft,
            allowed_issue_types=sorted(ALLOWED_ISSUE_TYPES_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="review")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="review")

    try:
        review = validate_review(
            raw_review,
            allowed_decisions_policy=ALLOWED_REVIEW_DECISIONS_POLICY,
            allowed_issue_types_policy=ALLOWED_ISSUE_TYPES_POLICY,
            max_review_issues=BUDGET.max_review_issues,
            max_fix_items=BUDGET.max_fix_items,
        )
        gateway.enforce_execution_decision(review["decision"])
    except StopRun as exc:
        return stopped(exc.reason, phase="review", raw_review=raw_review)

    trace.append(
        {
            "step": 2,
            "phase": "review",
            "decision": review["decision"],
            "issues": len(review["issues"]),
            "fix_items": len(review["fix_plan"]),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 2,
            "action": "review_once",
            "review": review,
        }
    )

    if review["decision"] == "escalate":
        escalation_reason = str(review.get("reason", "")).strip()
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "policy_escalation",
            "escalation_reason": escalation_reason[:120],
            "phase": "review",
            "review": review,
            "trace": trace,
            "history": history,
        }

    final_answer = draft
    revised = False

    if review["decision"] == "revise":
        if (time.monotonic() - started) > BUDGET.max_seconds:
            return stopped("max_seconds", phase="revise")

        try:
            revised_raw = revise_once(
                goal=goal,
                incident_context=incident_context,
                draft=draft,
                fix_plan=review["fix_plan"],
            )
            revised_payload = gateway.validate_revision(
                original=draft,
                revised=revised_raw,
                context=incident_context,
                fix_plan=review["fix_plan"],
            )
        except LLMTimeout:
            return stopped("llm_timeout", phase="revise")
        except LLMInvalid as exc:
            return stopped(exc.args[0], phase="revise")
        except LLMEmpty:
            return stopped("llm_empty", phase="revise")
        except StopRun as exc:
            return stopped(exc.reason, phase="revise")

        final_answer = revised_payload["answer"]
        revised = True

        trace.append(
            {
                "step": 3,
                "phase": "revise",
                "patch_similarity": revised_payload["patch_similarity"],
                "fix_plan_quoted_checks": revised_payload["fix_plan_quoted_checks"],
                "revised_hash": text_hash(final_answer),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 3,
                "action": "revise_once",
                "fix_plan": review["fix_plan"],
                "revised_answer": final_answer,
            }
        )

    try:
        final_answer = gateway.validate_final(final_answer)
    except StopRun as exc:
        return stopped(exc.reason, phase="finalize")

    trace.append(
        {
            "step": 4 if revised else 3,
            "phase": "finalize",
            "final_hash": text_hash(final_answer),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 4 if revised else 3,
            "action": "finalize",
            "status": "final",
        }
    )

    return {
        "run_id": run_id,
        "status": "ok",
        "stop_reason": "success",
        "outcome": "revised_once" if revised else "approved_direct",
        "answer": final_answer,
        "review_decision": review["decision"],
        "issues": review["issues"],
        "fix_plan": review["fix_plan"],
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_reflection_agent(goal=GOAL, incident_context=INCIDENT_CONTEXT)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Що тут найважливіше (простими словами)

  • Є чітке розділення: policy allowlist визначає, що допустимо концептуально, execution allowlist — що реально дозволено runtime-ом.
  • Run завершується передбачувано: або success, або контрольований stop_reason.
  • trace і history дозволяють зрозуміти кожен крок review/revise.

Приклад виводу

JSON
{
  "run_id": "f67eaad4-1c3e-4160-9d8b-1e2e9af42c82",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "revised_once",
  "answer": "We are currently experiencing payment service degradation affecting ~27% of US enterprise checkouts, with failed payments at 3.4% and 5 chargeback alerts. We expect recovery in approximately 45 minutes, but this may change. Next actions: monitor failures every 15 minutes, publish status page updates every 15 minutes, and support users with workaround macros.",
  "review_decision": "revise",
  "issues": [
    {
      "type": "overconfidence",
      "note": "ETA wording sounded too certain and could be read as a guarantee."
    }
  ],
  "fix_plan": [
    "Add uncertainty wording for ETA (for example: 'recovery in approximately 45 minutes, but this may change').",
    "State explicitly that status page updates will be published every 15 minutes.",
    "Keep a clear 'Next actions' section with monitoring, updates, and support macros."
  ],
  "trace": [
    {
      "step": 1,
      "phase": "draft",
      "draft_hash": "f4b3f386c80a",
      "chars": 547,
      "ok": true
    },
    {
      "step": 2,
      "phase": "review",
      "decision": "revise",
      "issues": 1,
      "fix_items": 3,
      "ok": true
    },
    {
      "step": 3,
      "phase": "revise",
      "patch_similarity": 0.564,
      "fix_plan_quoted_checks": 1,
      "revised_hash": "2a5ac4952ae0",
      "ok": true
    },
    {
      "step": 4,
      "phase": "finalize",
      "final_hash": "2a5ac4952ae0",
      "ok": true
    }
  ],
  "history": [{...}]
}

Типові stop_reason

  • success — run завершено коректно
  • llm_timeout — LLM не відповів у межах OPENAI_TIMEOUT_SECONDS
  • llm_empty — порожня відповідь від LLM у draft/revise
  • llm_invalid_json — LLM повернув невалідний JSON
  • llm_invalid_schema — JSON не відповідає контракту
  • invalid_draft:* — чернетка не пройшла базову валідацію
  • invalid_review:* — review не пройшов контракт policy-layer
  • review_decision_not_allowed_policy:* — рішення review за межами policy allowlist
  • review_decision_denied_execution:* — runtime заборонив execution рішення
  • patch_violation:no_new_facts — ревізія додала нові числові факти
  • patch_violation:new_incident_id|new_severity_label|new_region — ревізія додала нові критичні ідентифікатори
  • patch_violation:restricted_claims — ревізія додала заборонені claims (resolved, fully recovered тощо) поза контекстом
  • patch_violation:fix_plan_not_applied — ревізія не застосувала quoted hints із fix_plan
  • patch_violation:too_large_edit — ревізія вийшла за межі patch-only
  • policy_escalation — review повернув ескалацію як фінальне рішення; деталь у escalation_reason
  • max_seconds — перевищено загальний time budget run
  • invalid_answer:* — фінальна відповідь не пройшла валідацію

Що тут НЕ показано

  • persisted storage для trace/history (у прикладі все в одному run)
  • retry/backoff для LLM-викликів
  • human-in-the-loop queue для escalate
  • автоматичний diff-рендер patch для UI

Що спробувати далі

  1. Вимкнути AUTO_REVISION_ENABLED і подивитися review_decision_denied_execution:revise.
  2. Додати strict перевірку allowed changes за fix_plan (line-level patch contract).
  3. Вивести policy_escalation у зовнішню queue для ручного review.
⏱️ 16 хв читанняОновлено Бер, 2026Складність: ★★☆
Інтегровано: продакшен-контрольOnceOnly
Додай guardrails до агентів з tool-calling
Зашип цей патерн з governance:
  • Бюджетами (кроки / ліміти витрат)
  • Дозволами на інструменти (allowlist / blocklist)
  • Kill switch та аварійна зупинка
  • Ідемпотентність і dedupe
  • Audit logs та трасування
Інтегрована згадка: OnceOnly — контрольний шар для продакшен агент-систем.
Автор

Цю документацію курують і підтримують інженери, які запускають AI-агентів у продакшені.

Контент створено з допомогою AI, із людською редакторською відповідальністю за точність, ясність і продакшн-релевантність.

Патерни та рекомендації базуються на постмортемах, режимах відмов і операційних інцидентах у розгорнутих системах, зокрема під час розробки та експлуатації governance-інфраструктури для агентів у OnceOnly.