Essence du pattern (bref)
Reflection Agent est un pattern où, après un brouillon de réponse, l’agent effectue un self-review contrôlé, puis approuve la réponse ou fait une seule révision patch-only.
Le LLM décide comment formuler le brouillon et le review, tandis que l’execution layer contrôle si cela peut être exécuté dans les limites de policy et de runtime.
Ce que cet exemple démontre
- production-like flow:
Draft -> Review -> Revise (optional) -> Finalize - un passage de review et au maximum une révision (sans boucle infinie)
- policy boundary pour la décision de review (
approve | revise | escalate) - execution boundary qui enforce la runtime allowlist des décisions
- patch guardrails :
no_new_facts, checks claims/tokens critiques, contrôle de la taille d’édition - budgets contrôlés (
max_seconds, limites de longueur, limites issue/fix) stop_reason,trace,historyexplicites pour audit
Architecture
- Le LLM génère un brouillon de réponse.
- Le LLM review renvoie une décision structurée (
approve/revise/escalate). - Gateway valide le contrat de review selon la policy.
- Gateway enforce l’execution allowlist (le runtime peut différer de la policy).
- En cas de
revise, une seule révision patch-only est exécutée. - Gateway vérifie le patch (
no_new_facts, tokens/claims critiques, fix_plan hints, similarity) et finalise la réponse.
Contrat clé : le LLM propose des changements, mais l’autorité finale d’exécution appartient à l’execution layer.
Structure du projet
examples/
└── agent-patterns/
└── reflection-agent/
└── python/
├── main.py # Draft -> Review -> Revise -> Finalize flow
├── llm.py # draft/review/revise LLM calls
├── gateway.py # policy+execution validation, patch guards
├── context.py # deterministic incident context
├── requirements.txt
└── README.md
Lancer le projet
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/reflection-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ est requis.
Option via export :
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Option via .env (optionnel)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
C’est la variante shell (macOS/Linux). Sur Windows, il est plus simple d’utiliser des variables set ou, si souhaité, python-dotenv pour charger .env automatiquement.
Tâche
Imagine un cas production pour incident communication :
"Prépare une mise à jour customer-facing sur un problème de paiement P1 en US, sans promesses risquées et avec des next actions claires."
Un seul brouillon semble souvent correct, mais peut contenir :
- des formulations trop confiantes
- une frontière de responsabilité floue
- des formulations qui sonnent comme une garantie
Reflection ajoute un review contrôlé avant l’envoi.
Solution
Dans cet exemple :
- le draft est créé à partir d’un contexte déterministe
- le review renvoie un JSON structuré (pas du texte libre)
reviseest autorisé au maximum une fois- gateway bloque le patch si de nouveaux chiffres/faits apparaissent ou si l’édition est trop grande
escalatetermine le run de façon contrôlée (status=stopped), sans réécriture auto cachée
Code
context.py — contexte production déterministe
from __future__ import annotations
from typing import Any
def build_incident_context(*, report_date: str, region: str) -> dict[str, Any]:
return {
"report_date": report_date,
"region": region,
"incident": {
"incident_id": "inc_payments_20260305",
"severity": "P1",
"status": "degraded",
"affected_checkout_pct": 27,
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"eta_minutes": 45,
},
"policy_hints": {
"avoid_absolute_guarantees": True,
"required_sections": ["current_status", "customer_impact", "next_actions"],
},
"approved_actions": [
"monitor payment failures every 15 minutes",
"publish customer update via status page",
"prepare support macro with workaround guidance",
],
}
Ce qui compte le plus ici (en clair)
- Le contexte est fixe et répétable : pratique pour les tests et le debug.
- Le LLM n’invente pas de source data ; il formule seulement la réponse à partir de ce contexte.
gateway.py — policy/execution boundary + patch guardrails
from __future__ import annotations
import hashlib
import json
import re
from dataclasses import dataclass
from difflib import SequenceMatcher
from typing import Any
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_seconds: int = 30
max_draft_chars: int = 900
max_review_issues: int = 4
max_fix_items: int = 4
max_answer_chars: int = 900
min_patch_similarity: float = 0.45
NUMBER_TOKEN_RE = re.compile(r"\b\d+(?:\.\d+)?%?\b")
INCIDENT_ID_RE = re.compile(r"\binc_[a-z0-9_]+\b", re.IGNORECASE)
SEVERITY_RE = re.compile(r"\bp[0-5]\b", re.IGNORECASE)
REGION_RE = re.compile(r"\b(us|eu|uk|ua|apac|global|emea|latam)\b", re.IGNORECASE)
QUOTED_PHRASE_RE = re.compile(r"['\"]([^'\"]{3,120})['\"]")
RESTRICTED_CLAIMS_RE = [
re.compile(r"\bresolved\b", re.IGNORECASE),
re.compile(r"\bfully[-\s]+recovered\b", re.IGNORECASE),
re.compile(r"\bincident\s+closed\b", re.IGNORECASE),
re.compile(r"\ball payments (?:are|is)\s+stable\b", re.IGNORECASE),
]
def _stable_json(value: Any) -> str:
if value is None or isinstance(value, (bool, int, float, str)):
return json.dumps(value, ensure_ascii=True, sort_keys=True)
if isinstance(value, list):
return "[" + ",".join(_stable_json(v) for v in value) + "]"
if isinstance(value, dict):
parts = []
for key in sorted(value):
parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
return "{" + ",".join(parts) + "}"
return json.dumps(str(value), ensure_ascii=True)
def _normalize_space(text: str) -> str:
return " ".join((text or "").strip().split())
def text_hash(text: str) -> str:
normalized = _normalize_space(text)
raw = _stable_json(normalized)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
def _extract_number_tokens(text: str) -> set[str]:
normalized = _normalize_space(text).lower()
return set(NUMBER_TOKEN_RE.findall(normalized))
def _extract_incident_ids(text: str) -> set[str]:
normalized = _normalize_space(text).lower()
return set(INCIDENT_ID_RE.findall(normalized))
def _extract_severity_labels(text: str) -> set[str]:
normalized = _normalize_space(text).upper()
return {match.upper() for match in SEVERITY_RE.findall(normalized)}
def _extract_regions(text: str) -> set[str]:
normalized = _normalize_space(text).upper()
return {value.upper() for value in REGION_RE.findall(normalized)}
def _extract_fix_plan_phrase_rules(fix_plan: list[str]) -> dict[str, list[str]]:
must_include: list[str] = []
must_remove: list[str] = []
def _append_unique(target: list[str], value: str) -> None:
if value and value not in target:
target.append(value)
for item in fix_plan:
item_norm = _normalize_space(item).lower()
quoted = [_normalize_space(match).lower() for match in QUOTED_PHRASE_RE.findall(item)]
quoted = [value for value in quoted if value]
if not quoted:
continue
is_replace = "replace" in item_norm
is_modify = any(word in item_norm for word in ("modify", "change", "update", "rewrite"))
has_with = " with " in f" {item_norm} "
has_example_marker = any(
marker in item_norm for marker in ("such as", "for example", "e.g.", "e.g")
)
if is_replace or is_modify:
_append_unique(must_remove, quoted[0])
# Enforce phrase add only for strict replace-with instructions, not modify/example hints.
if is_replace and len(quoted) >= 2 and has_with and not has_example_marker:
_append_unique(must_include, quoted[1])
continue
for phrase in quoted:
_append_unique(must_include, phrase)
return {
"must_include": must_include,
"must_remove": must_remove,
}
def _context_claim_text(value: Any) -> str:
if value is None:
return ""
if isinstance(value, str):
return value
if isinstance(value, (bool, int, float)):
return str(value)
if isinstance(value, list):
return " ".join(_context_claim_text(item) for item in value)
if isinstance(value, dict):
parts: list[str] = []
for key, item in value.items():
parts.append(str(key))
parts.append(_context_claim_text(item))
return " ".join(parts)
return str(value)
def _is_high_risk_issue(issue_type: str) -> bool:
return issue_type in {"legal_risk", "policy_violation"}
def validate_draft(draft: Any, *, max_chars: int) -> str:
if not isinstance(draft, str) or not draft.strip():
raise StopRun("invalid_draft:empty")
normalized = draft.strip()
if len(normalized) > max_chars:
raise StopRun("invalid_draft:too_long")
return normalized
def validate_review(
raw: Any,
*,
allowed_decisions_policy: set[str],
allowed_issue_types_policy: set[str],
max_review_issues: int,
max_fix_items: int,
) -> dict[str, Any]:
if not isinstance(raw, dict):
raise StopRun("invalid_review:not_object")
decision = raw.get("decision")
if not isinstance(decision, str) or not decision.strip():
raise StopRun("invalid_review:decision")
decision = decision.strip()
if decision not in allowed_decisions_policy:
raise StopRun(f"review_decision_not_allowed_policy:{decision}")
issues_raw = raw.get("issues", [])
if not isinstance(issues_raw, list):
raise StopRun("invalid_review:issues")
if len(issues_raw) > max_review_issues:
raise StopRun("invalid_review:too_many_issues")
issues: list[dict[str, str]] = []
for item in issues_raw:
if not isinstance(item, dict):
raise StopRun("invalid_review:issue_item")
issue_type = item.get("type")
note = item.get("note")
if not isinstance(issue_type, str) or not issue_type.strip():
raise StopRun("invalid_review:issue_type")
issue_type = issue_type.strip()
if issue_type not in allowed_issue_types_policy:
raise StopRun(f"review_issue_not_allowed_policy:{issue_type}")
if not isinstance(note, str) or not note.strip():
raise StopRun("invalid_review:issue_note")
issues.append({"type": issue_type, "note": note.strip()})
fix_plan_raw = raw.get("fix_plan", [])
if not isinstance(fix_plan_raw, list):
raise StopRun("invalid_review:fix_plan")
if len(fix_plan_raw) > max_fix_items:
raise StopRun("invalid_review:too_many_fix_items")
fix_plan: list[str] = []
for item in fix_plan_raw:
if not isinstance(item, str) or not item.strip():
raise StopRun("invalid_review:fix_item")
fix_plan.append(item.strip())
reason = raw.get("reason", "")
if reason is None:
reason = ""
if not isinstance(reason, str):
raise StopRun("invalid_review:reason")
reason = reason.strip()
if decision == "approve":
if issues and any(_is_high_risk_issue(issue["type"]) for issue in issues):
raise StopRun("invalid_review:approve_with_high_risk_issue")
return {
"decision": "approve",
"issues": issues,
"fix_plan": [],
"reason": reason,
"high_risk": False,
}
if decision == "revise":
if not issues:
raise StopRun("invalid_review:revise_without_issues")
if not fix_plan:
raise StopRun("invalid_review:revise_without_fix_plan")
if any(_is_high_risk_issue(issue["type"]) for issue in issues):
raise StopRun("invalid_review:high_risk_requires_escalate")
return {
"decision": "revise",
"issues": issues,
"fix_plan": fix_plan,
"reason": reason,
"high_risk": False,
}
if decision == "escalate":
if not reason:
raise StopRun("invalid_review:escalate_reason_required")
return {
"decision": "escalate",
"issues": issues,
"fix_plan": [],
"reason": reason,
"high_risk": True,
}
raise StopRun("invalid_review:unknown_decision")
class ReflectionGateway:
def __init__(
self,
*,
allow_execution_decisions: set[str],
budget: Budget,
):
self.allow_execution_decisions = set(allow_execution_decisions)
self.budget = budget
def enforce_execution_decision(self, decision: str) -> None:
if decision not in self.allow_execution_decisions:
raise StopRun(f"review_decision_denied_execution:{decision}")
def validate_revision(
self,
*,
original: str,
revised: str,
context: dict[str, Any],
fix_plan: list[str] | None = None,
) -> dict[str, Any]:
if not isinstance(revised, str) or not revised.strip():
raise StopRun("invalid_revised:empty")
revised_clean = revised.strip()
if len(revised_clean) > self.budget.max_answer_chars:
raise StopRun("invalid_revised:too_long")
normalized_original = _normalize_space(original)
normalized_revised = _normalize_space(revised_clean)
if normalized_original == normalized_revised:
raise StopRun("invalid_revised:no_changes")
similarity = SequenceMatcher(a=normalized_original, b=normalized_revised).ratio()
if similarity < self.budget.min_patch_similarity:
raise StopRun("patch_violation:too_large_edit")
allowed_text_tokens = _stable_json(context) + " " + original
allowed_text_claims = _normalize_space(_context_claim_text(context) + " " + original)
revised_numbers = _extract_number_tokens(revised_clean)
allowed_numbers = _extract_number_tokens(allowed_text_tokens)
if revised_numbers - allowed_numbers:
raise StopRun("patch_violation:no_new_facts")
revised_ids = _extract_incident_ids(revised_clean)
allowed_ids = _extract_incident_ids(allowed_text_tokens)
if revised_ids - allowed_ids:
raise StopRun("patch_violation:new_incident_id")
revised_severity = _extract_severity_labels(revised_clean)
allowed_severity = _extract_severity_labels(allowed_text_tokens)
if revised_severity - allowed_severity:
raise StopRun("patch_violation:new_severity_label")
revised_regions = _extract_regions(revised_clean)
allowed_regions = _extract_regions(allowed_text_tokens)
if revised_regions - allowed_regions:
raise StopRun("patch_violation:new_region")
for claim_re in RESTRICTED_CLAIMS_RE:
if claim_re.search(revised_clean) and not claim_re.search(allowed_text_claims):
raise StopRun("patch_violation:restricted_claims")
phrase_rules = _extract_fix_plan_phrase_rules(fix_plan or [])
must_include = phrase_rules["must_include"]
must_remove = phrase_rules["must_remove"]
if must_include or must_remove:
revised_lower = _normalize_space(revised_clean).lower()
missing = [phrase for phrase in must_include if phrase not in revised_lower]
if missing:
raise StopRun("patch_violation:fix_plan_not_applied")
still_present = [phrase for phrase in must_remove if phrase in revised_lower]
if still_present:
raise StopRun("patch_violation:fix_plan_not_applied")
return {
"answer": revised_clean,
"patch_similarity": round(similarity, 3),
"fix_plan_quoted_checks": len(must_include) + len(must_remove),
}
def validate_final(self, answer: str) -> str:
if not isinstance(answer, str) or not answer.strip():
raise StopRun("invalid_answer:empty")
cleaned = answer.strip()
if len(cleaned) > self.budget.max_answer_chars:
raise StopRun("invalid_answer:too_long")
return cleaned
Ce qui compte le plus ici (en clair)
- Gateway enforce uniquement les décisions d’exécution passées depuis
main.py. - Policy allowlist et execution allowlist sont séparées : le runtime peut être plus strict.
- Après révision, gateway bloque les nouveaux faits via les nombres, les tokens critiques (
incident_id/region/severity) et les restricted claims. - Pour les token checks, un contexte JSON stable est utilisé ; pour les claim checks, du texte contexte en clair, afin que les vérifications regex soient plus fiables.
- Si
fix_plancontient des quoted phrases, gateway construit des règlesmust_include/must_removeet les vérifie dans la révision. - Pour les instructions
replace/modify/change/update/rewrite, la première quoted phrase est vérifiée commemust_remove. - Pour
replace "A" with "B", la phraseBest vérifiée commemust_includeseulement quand ce n’est pas un exemple (such as/for example/e.g.). fix_plan_quoted_checkscompte uniquement ces règles enforce ; il peut donc être inférieur au nombre d’itemsfix_plan.
llm.py — appels LLM draft/review/revise
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
class LLMInvalid(Exception):
pass
DRAFT_SYSTEM_PROMPT = """
You are an operations communications writer.
Return exactly one JSON object:
{
"draft": "short customer-safe incident update"
}
Rules:
- Use only facts from provided incident_context.
- Include current status, customer impact, and next actions.
- Avoid absolute guarantees and overconfident claims.
- Keep draft concise and actionable.
- Do not output markdown or extra keys.
""".strip()
REVIEW_SYSTEM_PROMPT = """
You are a reflection reviewer.
Return exactly one JSON object:
{
"decision": "approve|revise|escalate",
"issues": [{"type":"overconfidence","note":"..."}],
"fix_plan": ["patch instruction"],
"reason": "for escalate only"
}
Rules:
- Review exactly once.
- decision=approve: fix_plan must be empty.
- decision=revise: provide 1-4 concrete patch-only instructions.
- For enforceable instructions, include quoted target phrases in fix_plan.
- decision=escalate: use only for high-risk or policy-unsafe content.
- Do not add new facts in fix_plan.
- Use only issue types from allowed_issue_types.
- Do not output markdown or extra keys.
""".strip()
REVISE_SYSTEM_PROMPT = """
You are an editor applying one controlled patch.
Return exactly one JSON object:
{
"revised_answer": "updated answer"
}
Rules:
- Edit only what is needed to satisfy fix_plan.
- Keep scope and intent of original draft.
- Do not introduce new facts or numbers.
- Keep answer concise and customer-safe.
- Do not output markdown or extra keys.
""".strip()
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
data = json.loads(text)
except json.JSONDecodeError as exc:
raise LLMInvalid("llm_invalid_json") from exc
if not isinstance(data, dict):
raise LLMInvalid("llm_invalid_json")
return data
def generate_draft(*, goal: str, incident_context: dict[str, Any]) -> str:
payload = {
"goal": goal,
"incident_context": incident_context,
}
data = _chat_json(system_prompt=DRAFT_SYSTEM_PROMPT, payload=payload)
draft = data.get("draft")
if not isinstance(draft, str):
raise LLMInvalid("llm_invalid_schema")
draft = draft.strip()
if not draft:
raise LLMEmpty("llm_empty")
return draft
def review_draft(
*,
goal: str,
incident_context: dict[str, Any],
draft: str,
allowed_issue_types: list[str],
) -> dict[str, Any]:
payload = {
"goal": goal,
"incident_context": incident_context,
"draft": draft,
"allowed_issue_types": allowed_issue_types,
}
return _chat_json(system_prompt=REVIEW_SYSTEM_PROMPT, payload=payload)
def revise_once(
*,
goal: str,
incident_context: dict[str, Any],
draft: str,
fix_plan: list[str],
) -> str:
payload = {
"goal": goal,
"incident_context": incident_context,
"draft": draft,
"fix_plan": fix_plan,
}
data = _chat_json(system_prompt=REVISE_SYSTEM_PROMPT, payload=payload)
revised = data.get("revised_answer")
if not isinstance(revised, str):
raise LLMInvalid("llm_invalid_schema")
revised = revised.strip()
if not revised:
raise LLMEmpty("llm_empty")
return revised
Ce qui compte le plus ici (en clair)
- Chaque étape retourne un contrat JSON, pas du texte libre.
llm_invalid_jsonetllm_invalid_schemasont séparés pour un diagnostic propre.
main.py — orchestration du flow Reflection
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from context import build_incident_context
from gateway import Budget, ReflectionGateway, StopRun, text_hash, validate_draft, validate_review
from llm import LLMEmpty, LLMInvalid, LLMTimeout, generate_draft, review_draft, revise_once
GOAL = (
"Draft a customer-facing payment incident update for US enterprise customers. "
"Keep it accurate, avoid overconfident language, and include next actions."
)
INCIDENT_CONTEXT = build_incident_context(report_date="2026-03-05", region="US")
BUDGET = Budget(
max_seconds=30,
max_draft_chars=900,
max_review_issues=4,
max_fix_items=4,
max_answer_chars=900,
min_patch_similarity=0.45,
)
ALLOWED_REVIEW_DECISIONS_POLICY = {"approve", "revise", "escalate"}
AUTO_REVISION_ENABLED = True
ALLOWED_REVIEW_DECISIONS_EXECUTION = (
ALLOWED_REVIEW_DECISIONS_POLICY if AUTO_REVISION_ENABLED else {"approve", "escalate"}
)
ALLOWED_ISSUE_TYPES_POLICY = {
"overconfidence",
"missing_uncertainty",
"contradiction",
"scope_leak",
"policy_violation",
"legal_risk",
}
def run_reflection_agent(*, goal: str, incident_context: dict[str, Any]) -> dict[str, Any]:
run_id = str(uuid.uuid4())
started = time.monotonic()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = ReflectionGateway(
allow_execution_decisions=ALLOWED_REVIEW_DECISIONS_EXECUTION,
budget=BUDGET,
)
def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
payload = {
"run_id": run_id,
"status": "stopped",
"stop_reason": stop_reason,
"phase": phase,
"trace": trace,
"history": history,
}
payload.update(extra)
return payload
try:
draft_raw = generate_draft(goal=goal, incident_context=incident_context)
draft = validate_draft(draft_raw, max_chars=BUDGET.max_draft_chars)
except LLMTimeout:
return stopped("llm_timeout", phase="draft")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="draft")
except LLMEmpty:
return stopped("llm_empty", phase="draft")
except StopRun as exc:
return stopped(exc.reason, phase="draft")
trace.append(
{
"step": 1,
"phase": "draft",
"draft_hash": text_hash(draft),
"chars": len(draft),
"ok": True,
}
)
history.append(
{
"step": 1,
"action": "draft_once",
"draft": draft,
}
)
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="review")
try:
raw_review = review_draft(
goal=goal,
incident_context=incident_context,
draft=draft,
allowed_issue_types=sorted(ALLOWED_ISSUE_TYPES_POLICY),
)
except LLMTimeout:
return stopped("llm_timeout", phase="review")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="review")
try:
review = validate_review(
raw_review,
allowed_decisions_policy=ALLOWED_REVIEW_DECISIONS_POLICY,
allowed_issue_types_policy=ALLOWED_ISSUE_TYPES_POLICY,
max_review_issues=BUDGET.max_review_issues,
max_fix_items=BUDGET.max_fix_items,
)
gateway.enforce_execution_decision(review["decision"])
except StopRun as exc:
return stopped(exc.reason, phase="review", raw_review=raw_review)
trace.append(
{
"step": 2,
"phase": "review",
"decision": review["decision"],
"issues": len(review["issues"]),
"fix_items": len(review["fix_plan"]),
"ok": True,
}
)
history.append(
{
"step": 2,
"action": "review_once",
"review": review,
}
)
if review["decision"] == "escalate":
escalation_reason = str(review.get("reason", "")).strip()
return {
"run_id": run_id,
"status": "stopped",
"stop_reason": "policy_escalation",
"escalation_reason": escalation_reason[:120],
"phase": "review",
"review": review,
"trace": trace,
"history": history,
}
final_answer = draft
revised = False
if review["decision"] == "revise":
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="revise")
try:
revised_raw = revise_once(
goal=goal,
incident_context=incident_context,
draft=draft,
fix_plan=review["fix_plan"],
)
revised_payload = gateway.validate_revision(
original=draft,
revised=revised_raw,
context=incident_context,
fix_plan=review["fix_plan"],
)
except LLMTimeout:
return stopped("llm_timeout", phase="revise")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="revise")
except LLMEmpty:
return stopped("llm_empty", phase="revise")
except StopRun as exc:
return stopped(exc.reason, phase="revise")
final_answer = revised_payload["answer"]
revised = True
trace.append(
{
"step": 3,
"phase": "revise",
"patch_similarity": revised_payload["patch_similarity"],
"fix_plan_quoted_checks": revised_payload["fix_plan_quoted_checks"],
"revised_hash": text_hash(final_answer),
"ok": True,
}
)
history.append(
{
"step": 3,
"action": "revise_once",
"fix_plan": review["fix_plan"],
"revised_answer": final_answer,
}
)
try:
final_answer = gateway.validate_final(final_answer)
except StopRun as exc:
return stopped(exc.reason, phase="finalize")
trace.append(
{
"step": 4 if revised else 3,
"phase": "finalize",
"final_hash": text_hash(final_answer),
"ok": True,
}
)
history.append(
{
"step": 4 if revised else 3,
"action": "finalize",
"status": "final",
}
)
return {
"run_id": run_id,
"status": "ok",
"stop_reason": "success",
"outcome": "revised_once" if revised else "approved_direct",
"answer": final_answer,
"review_decision": review["decision"],
"issues": review["issues"],
"fix_plan": review["fix_plan"],
"trace": trace,
"history": history,
}
def main() -> None:
result = run_reflection_agent(goal=GOAL, incident_context=INCIDENT_CONTEXT)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Ce qui compte le plus ici (en clair)
- Il y a une séparation nette : policy allowlist définit ce qui est conceptuellement permis, execution allowlist définit ce que le runtime autorise réellement.
- Le run se termine de façon prévisible :
successoustop_reasoncontrôlé. traceethistorypermettent de comprendre chaque étape review/revise.
Exemple de sortie
{
"run_id": "f67eaad4-1c3e-4160-9d8b-1e2e9af42c82",
"status": "ok",
"stop_reason": "success",
"outcome": "revised_once",
"answer": "We are currently experiencing payment service degradation affecting ~27% of US enterprise checkouts, with failed payments at 3.4% and 5 chargeback alerts. We expect recovery in approximately 45 minutes, but this may change. Next actions: monitor failures every 15 minutes, publish status page updates every 15 minutes, and support users with workaround macros.",
"review_decision": "revise",
"issues": [
{
"type": "overconfidence",
"note": "ETA wording sounded too certain and could be read as a guarantee."
}
],
"fix_plan": [
"Add uncertainty wording for ETA (for example: 'recovery in approximately 45 minutes, but this may change').",
"State explicitly that status page updates will be published every 15 minutes.",
"Keep a clear 'Next actions' section with monitoring, updates, and support macros."
],
"trace": [
{
"step": 1,
"phase": "draft",
"draft_hash": "f4b3f386c80a",
"chars": 547,
"ok": true
},
{
"step": 2,
"phase": "review",
"decision": "revise",
"issues": 1,
"fix_items": 3,
"ok": true
},
{
"step": 3,
"phase": "revise",
"patch_similarity": 0.564,
"fix_plan_quoted_checks": 1,
"revised_hash": "2a5ac4952ae0",
"ok": true
},
{
"step": 4,
"phase": "finalize",
"final_hash": "2a5ac4952ae0",
"ok": true
}
],
"history": [{...}]
}
Valeurs stop_reason typiques
success— le run est terminé correctementllm_timeout— le LLM n’a pas répondu dansOPENAI_TIMEOUT_SECONDSllm_empty— réponse LLM vide en draft/revisellm_invalid_json— le LLM a renvoyé un JSON invalidellm_invalid_schema— le JSON ne correspond pas au contratinvalid_draft:*— le brouillon n’a pas passé la validation de baseinvalid_review:*— le review n’a pas passé le contrat policy-layerreview_decision_not_allowed_policy:*— décision de review hors policy allowlistreview_decision_denied_execution:*— décision d’exécution refusée par le runtimepatch_violation:no_new_facts— la révision a ajouté de nouveaux faits numériquespatch_violation:new_incident_id|new_severity_label|new_region— la révision a ajouté de nouveaux identifiants critiquespatch_violation:restricted_claims— la révision a ajouté des claims interdits (resolved,fully recovered, etc.) hors contextepatch_violation:fix_plan_not_applied— la révision n’a pas appliqué les quoted hints defix_planpatch_violation:too_large_edit— la révision dépasse la limite patch-onlypolicy_escalation— le review a renvoyé une escalade comme décision finale ; détail dansescalation_reasonmax_seconds— budget total de temps du run dépasséinvalid_answer:*— la réponse finale n’a pas passé la validation
Ce qui n’est PAS montré
- stockage persistant pour trace/history (dans l’exemple, tout se passe en un run)
- retry/backoff pour appels LLM
- human-in-the-loop queue pour
escalate - rendu automatique du diff patch pour UI
Ce que vous pouvez essayer ensuite
- Désactiver
AUTO_REVISION_ENABLEDet observerreview_decision_denied_execution:revise. - Ajouter une vérification stricte des changements autorisés via
fix_plan(line-level patch contract). - Envoyer
policy_escalationvers une queue externe pour review manuel.