Reflection Agent — Python (implémentation complète avec LLM)

Exemple exécutable d’agent Reflection en Python, style production, avec review en une passe, revise patch-only, policy/execution boundary et stop reasons explicites.
Sur cette page
  1. Essence du pattern (bref)
  2. Ce que cet exemple démontre
  3. Architecture
  4. Structure du projet
  5. Lancer le projet
  6. Tâche
  7. Solution
  8. Code
  9. context.py — contexte production déterministe
  10. gateway.py — policy/execution boundary + patch guardrails
  11. llm.py — appels LLM draft/review/revise
  12. main.py — orchestration du flow Reflection
  13. Exemple de sortie
  14. Valeurs stop_reason typiques
  15. Ce qui n’est PAS montré
  16. Ce que vous pouvez essayer ensuite

Essence du pattern (bref)

Reflection Agent est un pattern où, après un brouillon de réponse, l’agent effectue un self-review contrôlé, puis approuve la réponse ou fait une seule révision patch-only.

Le LLM décide comment formuler le brouillon et le review, tandis que l’execution layer contrôle si cela peut être exécuté dans les limites de policy et de runtime.


Ce que cet exemple démontre

  • production-like flow: Draft -> Review -> Revise (optional) -> Finalize
  • un passage de review et au maximum une révision (sans boucle infinie)
  • policy boundary pour la décision de review (approve | revise | escalate)
  • execution boundary qui enforce la runtime allowlist des décisions
  • patch guardrails : no_new_facts, checks claims/tokens critiques, contrôle de la taille d’édition
  • budgets contrôlés (max_seconds, limites de longueur, limites issue/fix)
  • stop_reason, trace, history explicites pour audit

Architecture

  1. Le LLM génère un brouillon de réponse.
  2. Le LLM review renvoie une décision structurée (approve/revise/escalate).
  3. Gateway valide le contrat de review selon la policy.
  4. Gateway enforce l’execution allowlist (le runtime peut différer de la policy).
  5. En cas de revise, une seule révision patch-only est exécutée.
  6. Gateway vérifie le patch (no_new_facts, tokens/claims critiques, fix_plan hints, similarity) et finalise la réponse.

Contrat clé : le LLM propose des changements, mais l’autorité finale d’exécution appartient à l’execution layer.


Structure du projet

TEXT
examples/
└── agent-patterns/
    └── reflection-agent/
        └── python/
            ├── main.py          # Draft -> Review -> Revise -> Finalize flow
            ├── llm.py           # draft/review/revise LLM calls
            ├── gateway.py       # policy+execution validation, patch guards
            ├── context.py       # deterministic incident context
            ├── requirements.txt
            └── README.md

Lancer le projet

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/reflection-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ est requis.

Option via export :

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Option via .env (optionnel)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

C’est la variante shell (macOS/Linux). Sur Windows, il est plus simple d’utiliser des variables set ou, si souhaité, python-dotenv pour charger .env automatiquement.


Tâche

Imagine un cas production pour incident communication :

"Prépare une mise à jour customer-facing sur un problème de paiement P1 en US, sans promesses risquées et avec des next actions claires."

Un seul brouillon semble souvent correct, mais peut contenir :

  • des formulations trop confiantes
  • une frontière de responsabilité floue
  • des formulations qui sonnent comme une garantie

Reflection ajoute un review contrôlé avant l’envoi.

Solution

Dans cet exemple :

  • le draft est créé à partir d’un contexte déterministe
  • le review renvoie un JSON structuré (pas du texte libre)
  • revise est autorisé au maximum une fois
  • gateway bloque le patch si de nouveaux chiffres/faits apparaissent ou si l’édition est trop grande
  • escalate termine le run de façon contrôlée (status=stopped), sans réécriture auto cachée

Code

context.py — contexte production déterministe

PYTHON
from __future__ import annotations

from typing import Any


def build_incident_context(*, report_date: str, region: str) -> dict[str, Any]:
    return {
        "report_date": report_date,
        "region": region,
        "incident": {
            "incident_id": "inc_payments_20260305",
            "severity": "P1",
            "status": "degraded",
            "affected_checkout_pct": 27,
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "eta_minutes": 45,
        },
        "policy_hints": {
            "avoid_absolute_guarantees": True,
            "required_sections": ["current_status", "customer_impact", "next_actions"],
        },
        "approved_actions": [
            "monitor payment failures every 15 minutes",
            "publish customer update via status page",
            "prepare support macro with workaround guidance",
        ],
    }

Ce qui compte le plus ici (en clair)

  • Le contexte est fixe et répétable : pratique pour les tests et le debug.
  • Le LLM n’invente pas de source data ; il formule seulement la réponse à partir de ce contexte.

gateway.py — policy/execution boundary + patch guardrails

PYTHON
from __future__ import annotations

import hashlib
import json
import re
from dataclasses import dataclass
from difflib import SequenceMatcher
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 30
    max_draft_chars: int = 900
    max_review_issues: int = 4
    max_fix_items: int = 4
    max_answer_chars: int = 900
    min_patch_similarity: float = 0.45


NUMBER_TOKEN_RE = re.compile(r"\b\d+(?:\.\d+)?%?\b")
INCIDENT_ID_RE = re.compile(r"\binc_[a-z0-9_]+\b", re.IGNORECASE)
SEVERITY_RE = re.compile(r"\bp[0-5]\b", re.IGNORECASE)
REGION_RE = re.compile(r"\b(us|eu|uk|ua|apac|global|emea|latam)\b", re.IGNORECASE)
QUOTED_PHRASE_RE = re.compile(r"['\"]([^'\"]{3,120})['\"]")

RESTRICTED_CLAIMS_RE = [
    re.compile(r"\bresolved\b", re.IGNORECASE),
    re.compile(r"\bfully[-\s]+recovered\b", re.IGNORECASE),
    re.compile(r"\bincident\s+closed\b", re.IGNORECASE),
    re.compile(r"\ball payments (?:are|is)\s+stable\b", re.IGNORECASE),
]


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(v) for v in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def _normalize_space(text: str) -> str:
    return " ".join((text or "").strip().split())


def text_hash(text: str) -> str:
    normalized = _normalize_space(text)
    raw = _stable_json(normalized)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def _extract_number_tokens(text: str) -> set[str]:
    normalized = _normalize_space(text).lower()
    return set(NUMBER_TOKEN_RE.findall(normalized))


def _extract_incident_ids(text: str) -> set[str]:
    normalized = _normalize_space(text).lower()
    return set(INCIDENT_ID_RE.findall(normalized))


def _extract_severity_labels(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {match.upper() for match in SEVERITY_RE.findall(normalized)}


def _extract_regions(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {value.upper() for value in REGION_RE.findall(normalized)}


def _extract_fix_plan_phrase_rules(fix_plan: list[str]) -> dict[str, list[str]]:
    must_include: list[str] = []
    must_remove: list[str] = []

    def _append_unique(target: list[str], value: str) -> None:
        if value and value not in target:
            target.append(value)

    for item in fix_plan:
        item_norm = _normalize_space(item).lower()
        quoted = [_normalize_space(match).lower() for match in QUOTED_PHRASE_RE.findall(item)]
        quoted = [value for value in quoted if value]
        if not quoted:
            continue

        is_replace = "replace" in item_norm
        is_modify = any(word in item_norm for word in ("modify", "change", "update", "rewrite"))
        has_with = " with " in f" {item_norm} "
        has_example_marker = any(
            marker in item_norm for marker in ("such as", "for example", "e.g.", "e.g")
        )

        if is_replace or is_modify:
            _append_unique(must_remove, quoted[0])

            # Enforce phrase add only for strict replace-with instructions, not modify/example hints.
            if is_replace and len(quoted) >= 2 and has_with and not has_example_marker:
                _append_unique(must_include, quoted[1])
            continue

        for phrase in quoted:
            _append_unique(must_include, phrase)

    return {
        "must_include": must_include,
        "must_remove": must_remove,
    }


def _context_claim_text(value: Any) -> str:
    if value is None:
        return ""
    if isinstance(value, str):
        return value
    if isinstance(value, (bool, int, float)):
        return str(value)
    if isinstance(value, list):
        return " ".join(_context_claim_text(item) for item in value)
    if isinstance(value, dict):
        parts: list[str] = []
        for key, item in value.items():
            parts.append(str(key))
            parts.append(_context_claim_text(item))
        return " ".join(parts)
    return str(value)


def _is_high_risk_issue(issue_type: str) -> bool:
    return issue_type in {"legal_risk", "policy_violation"}


def validate_draft(draft: Any, *, max_chars: int) -> str:
    if not isinstance(draft, str) or not draft.strip():
        raise StopRun("invalid_draft:empty")
    normalized = draft.strip()
    if len(normalized) > max_chars:
        raise StopRun("invalid_draft:too_long")
    return normalized


def validate_review(
    raw: Any,
    *,
    allowed_decisions_policy: set[str],
    allowed_issue_types_policy: set[str],
    max_review_issues: int,
    max_fix_items: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_review:not_object")

    decision = raw.get("decision")
    if not isinstance(decision, str) or not decision.strip():
        raise StopRun("invalid_review:decision")
    decision = decision.strip()
    if decision not in allowed_decisions_policy:
        raise StopRun(f"review_decision_not_allowed_policy:{decision}")

    issues_raw = raw.get("issues", [])
    if not isinstance(issues_raw, list):
        raise StopRun("invalid_review:issues")
    if len(issues_raw) > max_review_issues:
        raise StopRun("invalid_review:too_many_issues")

    issues: list[dict[str, str]] = []
    for item in issues_raw:
        if not isinstance(item, dict):
            raise StopRun("invalid_review:issue_item")

        issue_type = item.get("type")
        note = item.get("note")

        if not isinstance(issue_type, str) or not issue_type.strip():
            raise StopRun("invalid_review:issue_type")
        issue_type = issue_type.strip()
        if issue_type not in allowed_issue_types_policy:
            raise StopRun(f"review_issue_not_allowed_policy:{issue_type}")

        if not isinstance(note, str) or not note.strip():
            raise StopRun("invalid_review:issue_note")

        issues.append({"type": issue_type, "note": note.strip()})

    fix_plan_raw = raw.get("fix_plan", [])
    if not isinstance(fix_plan_raw, list):
        raise StopRun("invalid_review:fix_plan")
    if len(fix_plan_raw) > max_fix_items:
        raise StopRun("invalid_review:too_many_fix_items")

    fix_plan: list[str] = []
    for item in fix_plan_raw:
        if not isinstance(item, str) or not item.strip():
            raise StopRun("invalid_review:fix_item")
        fix_plan.append(item.strip())

    reason = raw.get("reason", "")
    if reason is None:
        reason = ""
    if not isinstance(reason, str):
        raise StopRun("invalid_review:reason")
    reason = reason.strip()

    if decision == "approve":
        if issues and any(_is_high_risk_issue(issue["type"]) for issue in issues):
            raise StopRun("invalid_review:approve_with_high_risk_issue")
        return {
            "decision": "approve",
            "issues": issues,
            "fix_plan": [],
            "reason": reason,
            "high_risk": False,
        }

    if decision == "revise":
        if not issues:
            raise StopRun("invalid_review:revise_without_issues")
        if not fix_plan:
            raise StopRun("invalid_review:revise_without_fix_plan")
        if any(_is_high_risk_issue(issue["type"]) for issue in issues):
            raise StopRun("invalid_review:high_risk_requires_escalate")
        return {
            "decision": "revise",
            "issues": issues,
            "fix_plan": fix_plan,
            "reason": reason,
            "high_risk": False,
        }

    if decision == "escalate":
        if not reason:
            raise StopRun("invalid_review:escalate_reason_required")
        return {
            "decision": "escalate",
            "issues": issues,
            "fix_plan": [],
            "reason": reason,
            "high_risk": True,
        }

    raise StopRun("invalid_review:unknown_decision")


class ReflectionGateway:
    def __init__(
        self,
        *,
        allow_execution_decisions: set[str],
        budget: Budget,
    ):
        self.allow_execution_decisions = set(allow_execution_decisions)
        self.budget = budget

    def enforce_execution_decision(self, decision: str) -> None:
        if decision not in self.allow_execution_decisions:
            raise StopRun(f"review_decision_denied_execution:{decision}")

    def validate_revision(
        self,
        *,
        original: str,
        revised: str,
        context: dict[str, Any],
        fix_plan: list[str] | None = None,
    ) -> dict[str, Any]:
        if not isinstance(revised, str) or not revised.strip():
            raise StopRun("invalid_revised:empty")

        revised_clean = revised.strip()
        if len(revised_clean) > self.budget.max_answer_chars:
            raise StopRun("invalid_revised:too_long")

        normalized_original = _normalize_space(original)
        normalized_revised = _normalize_space(revised_clean)
        if normalized_original == normalized_revised:
            raise StopRun("invalid_revised:no_changes")

        similarity = SequenceMatcher(a=normalized_original, b=normalized_revised).ratio()
        if similarity < self.budget.min_patch_similarity:
            raise StopRun("patch_violation:too_large_edit")

        allowed_text_tokens = _stable_json(context) + " " + original
        allowed_text_claims = _normalize_space(_context_claim_text(context) + " " + original)
        revised_numbers = _extract_number_tokens(revised_clean)
        allowed_numbers = _extract_number_tokens(allowed_text_tokens)
        if revised_numbers - allowed_numbers:
            raise StopRun("patch_violation:no_new_facts")

        revised_ids = _extract_incident_ids(revised_clean)
        allowed_ids = _extract_incident_ids(allowed_text_tokens)
        if revised_ids - allowed_ids:
            raise StopRun("patch_violation:new_incident_id")

        revised_severity = _extract_severity_labels(revised_clean)
        allowed_severity = _extract_severity_labels(allowed_text_tokens)
        if revised_severity - allowed_severity:
            raise StopRun("patch_violation:new_severity_label")

        revised_regions = _extract_regions(revised_clean)
        allowed_regions = _extract_regions(allowed_text_tokens)
        if revised_regions - allowed_regions:
            raise StopRun("patch_violation:new_region")

        for claim_re in RESTRICTED_CLAIMS_RE:
            if claim_re.search(revised_clean) and not claim_re.search(allowed_text_claims):
                raise StopRun("patch_violation:restricted_claims")

        phrase_rules = _extract_fix_plan_phrase_rules(fix_plan or [])
        must_include = phrase_rules["must_include"]
        must_remove = phrase_rules["must_remove"]
        if must_include or must_remove:
            revised_lower = _normalize_space(revised_clean).lower()
            missing = [phrase for phrase in must_include if phrase not in revised_lower]
            if missing:
                raise StopRun("patch_violation:fix_plan_not_applied")
            still_present = [phrase for phrase in must_remove if phrase in revised_lower]
            if still_present:
                raise StopRun("patch_violation:fix_plan_not_applied")

        return {
            "answer": revised_clean,
            "patch_similarity": round(similarity, 3),
            "fix_plan_quoted_checks": len(must_include) + len(must_remove),
        }

    def validate_final(self, answer: str) -> str:
        if not isinstance(answer, str) or not answer.strip():
            raise StopRun("invalid_answer:empty")

        cleaned = answer.strip()
        if len(cleaned) > self.budget.max_answer_chars:
            raise StopRun("invalid_answer:too_long")
        return cleaned

Ce qui compte le plus ici (en clair)

  • Gateway enforce uniquement les décisions d’exécution passées depuis main.py.
  • Policy allowlist et execution allowlist sont séparées : le runtime peut être plus strict.
  • Après révision, gateway bloque les nouveaux faits via les nombres, les tokens critiques (incident_id/region/severity) et les restricted claims.
  • Pour les token checks, un contexte JSON stable est utilisé ; pour les claim checks, du texte contexte en clair, afin que les vérifications regex soient plus fiables.
  • Si fix_plan contient des quoted phrases, gateway construit des règles must_include/must_remove et les vérifie dans la révision.
  • Pour les instructions replace/modify/change/update/rewrite, la première quoted phrase est vérifiée comme must_remove.
  • Pour replace "A" with "B", la phrase B est vérifiée comme must_include seulement quand ce n’est pas un exemple (such as/for example/e.g.).
  • fix_plan_quoted_checks compte uniquement ces règles enforce ; il peut donc être inférieur au nombre d’items fix_plan.

llm.py — appels LLM draft/review/revise

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


DRAFT_SYSTEM_PROMPT = """
You are an operations communications writer.
Return exactly one JSON object:
{
  "draft": "short customer-safe incident update"
}

Rules:
- Use only facts from provided incident_context.
- Include current status, customer impact, and next actions.
- Avoid absolute guarantees and overconfident claims.
- Keep draft concise and actionable.
- Do not output markdown or extra keys.
""".strip()

REVIEW_SYSTEM_PROMPT = """
You are a reflection reviewer.
Return exactly one JSON object:
{
  "decision": "approve|revise|escalate",
  "issues": [{"type":"overconfidence","note":"..."}],
  "fix_plan": ["patch instruction"],
  "reason": "for escalate only"
}

Rules:
- Review exactly once.
- decision=approve: fix_plan must be empty.
- decision=revise: provide 1-4 concrete patch-only instructions.
- For enforceable instructions, include quoted target phrases in fix_plan.
- decision=escalate: use only for high-risk or policy-unsafe content.
- Do not add new facts in fix_plan.
- Use only issue types from allowed_issue_types.
- Do not output markdown or extra keys.
""".strip()

REVISE_SYSTEM_PROMPT = """
You are an editor applying one controlled patch.
Return exactly one JSON object:
{
  "revised_answer": "updated answer"
}

Rules:
- Edit only what is needed to satisfy fix_plan.
- Keep scope and intent of original draft.
- Do not introduce new facts or numbers.
- Keep answer concise and customer-safe.
- Do not output markdown or extra keys.
""".strip()


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")
    return data


def generate_draft(*, goal: str, incident_context: dict[str, Any]) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
    }
    data = _chat_json(system_prompt=DRAFT_SYSTEM_PROMPT, payload=payload)

    draft = data.get("draft")
    if not isinstance(draft, str):
        raise LLMInvalid("llm_invalid_schema")

    draft = draft.strip()
    if not draft:
        raise LLMEmpty("llm_empty")
    return draft


def review_draft(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    allowed_issue_types: list[str],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "allowed_issue_types": allowed_issue_types,
    }
    return _chat_json(system_prompt=REVIEW_SYSTEM_PROMPT, payload=payload)


def revise_once(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    fix_plan: list[str],
) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "fix_plan": fix_plan,
    }
    data = _chat_json(system_prompt=REVISE_SYSTEM_PROMPT, payload=payload)

    revised = data.get("revised_answer")
    if not isinstance(revised, str):
        raise LLMInvalid("llm_invalid_schema")

    revised = revised.strip()
    if not revised:
        raise LLMEmpty("llm_empty")
    return revised

Ce qui compte le plus ici (en clair)

  • Chaque étape retourne un contrat JSON, pas du texte libre.
  • llm_invalid_json et llm_invalid_schema sont séparés pour un diagnostic propre.

main.py — orchestration du flow Reflection

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from context import build_incident_context
from gateway import Budget, ReflectionGateway, StopRun, text_hash, validate_draft, validate_review
from llm import LLMEmpty, LLMInvalid, LLMTimeout, generate_draft, review_draft, revise_once

GOAL = (
    "Draft a customer-facing payment incident update for US enterprise customers. "
    "Keep it accurate, avoid overconfident language, and include next actions."
)
INCIDENT_CONTEXT = build_incident_context(report_date="2026-03-05", region="US")

BUDGET = Budget(
    max_seconds=30,
    max_draft_chars=900,
    max_review_issues=4,
    max_fix_items=4,
    max_answer_chars=900,
    min_patch_similarity=0.45,
)

ALLOWED_REVIEW_DECISIONS_POLICY = {"approve", "revise", "escalate"}
AUTO_REVISION_ENABLED = True
ALLOWED_REVIEW_DECISIONS_EXECUTION = (
    ALLOWED_REVIEW_DECISIONS_POLICY if AUTO_REVISION_ENABLED else {"approve", "escalate"}
)

ALLOWED_ISSUE_TYPES_POLICY = {
    "overconfidence",
    "missing_uncertainty",
    "contradiction",
    "scope_leak",
    "policy_violation",
    "legal_risk",
}


def run_reflection_agent(*, goal: str, incident_context: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = ReflectionGateway(
        allow_execution_decisions=ALLOWED_REVIEW_DECISIONS_EXECUTION,
        budget=BUDGET,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    try:
        draft_raw = generate_draft(goal=goal, incident_context=incident_context)
        draft = validate_draft(draft_raw, max_chars=BUDGET.max_draft_chars)
    except LLMTimeout:
        return stopped("llm_timeout", phase="draft")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="draft")
    except LLMEmpty:
        return stopped("llm_empty", phase="draft")
    except StopRun as exc:
        return stopped(exc.reason, phase="draft")

    trace.append(
        {
            "step": 1,
            "phase": "draft",
            "draft_hash": text_hash(draft),
            "chars": len(draft),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 1,
            "action": "draft_once",
            "draft": draft,
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="review")

    try:
        raw_review = review_draft(
            goal=goal,
            incident_context=incident_context,
            draft=draft,
            allowed_issue_types=sorted(ALLOWED_ISSUE_TYPES_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="review")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="review")

    try:
        review = validate_review(
            raw_review,
            allowed_decisions_policy=ALLOWED_REVIEW_DECISIONS_POLICY,
            allowed_issue_types_policy=ALLOWED_ISSUE_TYPES_POLICY,
            max_review_issues=BUDGET.max_review_issues,
            max_fix_items=BUDGET.max_fix_items,
        )
        gateway.enforce_execution_decision(review["decision"])
    except StopRun as exc:
        return stopped(exc.reason, phase="review", raw_review=raw_review)

    trace.append(
        {
            "step": 2,
            "phase": "review",
            "decision": review["decision"],
            "issues": len(review["issues"]),
            "fix_items": len(review["fix_plan"]),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 2,
            "action": "review_once",
            "review": review,
        }
    )

    if review["decision"] == "escalate":
        escalation_reason = str(review.get("reason", "")).strip()
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "policy_escalation",
            "escalation_reason": escalation_reason[:120],
            "phase": "review",
            "review": review,
            "trace": trace,
            "history": history,
        }

    final_answer = draft
    revised = False

    if review["decision"] == "revise":
        if (time.monotonic() - started) > BUDGET.max_seconds:
            return stopped("max_seconds", phase="revise")

        try:
            revised_raw = revise_once(
                goal=goal,
                incident_context=incident_context,
                draft=draft,
                fix_plan=review["fix_plan"],
            )
            revised_payload = gateway.validate_revision(
                original=draft,
                revised=revised_raw,
                context=incident_context,
                fix_plan=review["fix_plan"],
            )
        except LLMTimeout:
            return stopped("llm_timeout", phase="revise")
        except LLMInvalid as exc:
            return stopped(exc.args[0], phase="revise")
        except LLMEmpty:
            return stopped("llm_empty", phase="revise")
        except StopRun as exc:
            return stopped(exc.reason, phase="revise")

        final_answer = revised_payload["answer"]
        revised = True

        trace.append(
            {
                "step": 3,
                "phase": "revise",
                "patch_similarity": revised_payload["patch_similarity"],
                "fix_plan_quoted_checks": revised_payload["fix_plan_quoted_checks"],
                "revised_hash": text_hash(final_answer),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 3,
                "action": "revise_once",
                "fix_plan": review["fix_plan"],
                "revised_answer": final_answer,
            }
        )

    try:
        final_answer = gateway.validate_final(final_answer)
    except StopRun as exc:
        return stopped(exc.reason, phase="finalize")

    trace.append(
        {
            "step": 4 if revised else 3,
            "phase": "finalize",
            "final_hash": text_hash(final_answer),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 4 if revised else 3,
            "action": "finalize",
            "status": "final",
        }
    )

    return {
        "run_id": run_id,
        "status": "ok",
        "stop_reason": "success",
        "outcome": "revised_once" if revised else "approved_direct",
        "answer": final_answer,
        "review_decision": review["decision"],
        "issues": review["issues"],
        "fix_plan": review["fix_plan"],
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_reflection_agent(goal=GOAL, incident_context=INCIDENT_CONTEXT)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Ce qui compte le plus ici (en clair)

  • Il y a une séparation nette : policy allowlist définit ce qui est conceptuellement permis, execution allowlist définit ce que le runtime autorise réellement.
  • Le run se termine de façon prévisible : success ou stop_reason contrôlé.
  • trace et history permettent de comprendre chaque étape review/revise.

Exemple de sortie

JSON
{
  "run_id": "f67eaad4-1c3e-4160-9d8b-1e2e9af42c82",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "revised_once",
  "answer": "We are currently experiencing payment service degradation affecting ~27% of US enterprise checkouts, with failed payments at 3.4% and 5 chargeback alerts. We expect recovery in approximately 45 minutes, but this may change. Next actions: monitor failures every 15 minutes, publish status page updates every 15 minutes, and support users with workaround macros.",
  "review_decision": "revise",
  "issues": [
    {
      "type": "overconfidence",
      "note": "ETA wording sounded too certain and could be read as a guarantee."
    }
  ],
  "fix_plan": [
    "Add uncertainty wording for ETA (for example: 'recovery in approximately 45 minutes, but this may change').",
    "State explicitly that status page updates will be published every 15 minutes.",
    "Keep a clear 'Next actions' section with monitoring, updates, and support macros."
  ],
  "trace": [
    {
      "step": 1,
      "phase": "draft",
      "draft_hash": "f4b3f386c80a",
      "chars": 547,
      "ok": true
    },
    {
      "step": 2,
      "phase": "review",
      "decision": "revise",
      "issues": 1,
      "fix_items": 3,
      "ok": true
    },
    {
      "step": 3,
      "phase": "revise",
      "patch_similarity": 0.564,
      "fix_plan_quoted_checks": 1,
      "revised_hash": "2a5ac4952ae0",
      "ok": true
    },
    {
      "step": 4,
      "phase": "finalize",
      "final_hash": "2a5ac4952ae0",
      "ok": true
    }
  ],
  "history": [{...}]
}

Valeurs stop_reason typiques

  • success — le run est terminé correctement
  • llm_timeout — le LLM n’a pas répondu dans OPENAI_TIMEOUT_SECONDS
  • llm_empty — réponse LLM vide en draft/revise
  • llm_invalid_json — le LLM a renvoyé un JSON invalide
  • llm_invalid_schema — le JSON ne correspond pas au contrat
  • invalid_draft:* — le brouillon n’a pas passé la validation de base
  • invalid_review:* — le review n’a pas passé le contrat policy-layer
  • review_decision_not_allowed_policy:* — décision de review hors policy allowlist
  • review_decision_denied_execution:* — décision d’exécution refusée par le runtime
  • patch_violation:no_new_facts — la révision a ajouté de nouveaux faits numériques
  • patch_violation:new_incident_id|new_severity_label|new_region — la révision a ajouté de nouveaux identifiants critiques
  • patch_violation:restricted_claims — la révision a ajouté des claims interdits (resolved, fully recovered, etc.) hors contexte
  • patch_violation:fix_plan_not_applied — la révision n’a pas appliqué les quoted hints de fix_plan
  • patch_violation:too_large_edit — la révision dépasse la limite patch-only
  • policy_escalation — le review a renvoyé une escalade comme décision finale ; détail dans escalation_reason
  • max_seconds — budget total de temps du run dépassé
  • invalid_answer:* — la réponse finale n’a pas passé la validation

Ce qui n’est PAS montré

  • stockage persistant pour trace/history (dans l’exemple, tout se passe en un run)
  • retry/backoff pour appels LLM
  • human-in-the-loop queue pour escalate
  • rendu automatique du diff patch pour UI

Ce que vous pouvez essayer ensuite

  1. Désactiver AUTO_REVISION_ENABLED et observer review_decision_denied_execution:revise.
  2. Ajouter une vérification stricte des changements autorisés via fix_plan (line-level patch contract).
  3. Envoyer policy_escalation vers une queue externe pour review manuel.
⏱️ 17 min de lectureMis à jour Mars, 2026Difficulté: ★★☆
Intégré : contrôle en productionOnceOnly
Ajoutez des garde-fous aux agents tool-calling
Livrez ce pattern avec de la gouvernance :
  • Budgets (steps / plafonds de coût)
  • Permissions outils (allowlist / blocklist)
  • Kill switch & arrêt incident
  • Idempotence & déduplication
  • Audit logs & traçabilité
Mention intégrée : OnceOnly est une couche de contrôle pour des systèmes d’agents en prod.
Auteur

Cette documentation est organisée et maintenue par des ingénieurs qui déploient des agents IA en production.

Le contenu est assisté par l’IA, avec une responsabilité éditoriale humaine quant à l’exactitude, la clarté et la pertinence en production.

Les patterns et recommandations s’appuient sur des post-mortems, des modes de défaillance et des incidents opérationnels dans des systèmes déployés, notamment lors du développement et de l’exploitation d’une infrastructure de gouvernance pour les agents chez OnceOnly.