Esencia del patrón (breve)
Reflection Agent es un patrón en el que, después del borrador de respuesta, el agente hace un self-review controlado y luego aprueba la respuesta o realiza una única revisión patch-only.
El LLM decide cómo formular el borrador y el review, y la execution layer controla si eso puede ejecutarse dentro de las restricciones de policy y runtime.
Qué demuestra este ejemplo
- production-like flow:
Draft -> Review -> Revise (optional) -> Finalize - una pasada de review y como máximo una revisión (sin loop infinito)
- policy boundary para la decisión de review (
approve | revise | escalate) - execution boundary que enforce la runtime allowlist de decisiones
- patch guardrails:
no_new_facts, checks de claims/tokens críticos y control del tamaño de edición - presupuestos controlados (
max_seconds, límites de longitud, límites de issue/fix) stop_reason,trace,historyexplícitos para auditoría
Arquitectura
- El LLM genera un borrador de respuesta.
- El LLM review devuelve una decisión estructurada (
approve/revise/escalate). - Gateway valida el contrato de review según policy.
- Gateway enforce la execution allowlist (runtime puede diferir de policy).
- Si hay
revise, se ejecuta una sola revisión patch-only. - Gateway verifica el patch (
no_new_facts, tokens/claims críticos, hints de fix_plan, similarity) y finaliza la respuesta.
Contrato clave: el LLM propone cambios, pero la autoridad final de ejecución pertenece a la execution layer.
Estructura del proyecto
examples/
└── agent-patterns/
└── reflection-agent/
└── python/
├── main.py # Draft -> Review -> Revise -> Finalize flow
├── llm.py # draft/review/revise LLM calls
├── gateway.py # policy+execution validation, patch guards
├── context.py # deterministic incident context
├── requirements.txt
└── README.md
Cómo ejecutar
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/reflection-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Se requiere Python 3.11+.
Opción con export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Opción con .env (opcional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
Esta es la variante de shell (macOS/Linux). En Windows es más fácil usar variables con set o, si quieres, python-dotenv para cargar .env automáticamente.
Tarea
Imagina un caso de producción para incident communication:
"Prepara una actualización customer-facing sobre un problema P1 de pagos en US, sin promesas riesgosas y con next actions claros."
Un solo borrador suele verse bien, pero puede contener:
- wording demasiado confiado
- límite de responsabilidad poco claro
- frases que suenan a garantía
Reflection añade un review controlado antes de enviar.
Solución
En este ejemplo:
- el draft se crea a partir de contexto determinista
- el review devuelve JSON estructurado (no texto libre)
revisese permite como máximo una vez- gateway bloquea el patch si aparecen números/hechos nuevos o si la edición es demasiado grande
escalatetermina el run de forma controlada (status=stopped), sin reescrituras automáticas ocultas
Código
context.py — contexto de producción determinista
from __future__ import annotations
from typing import Any
def build_incident_context(*, report_date: str, region: str) -> dict[str, Any]:
return {
"report_date": report_date,
"region": region,
"incident": {
"incident_id": "inc_payments_20260305",
"severity": "P1",
"status": "degraded",
"affected_checkout_pct": 27,
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"eta_minutes": 45,
},
"policy_hints": {
"avoid_absolute_guarantees": True,
"required_sections": ["current_status", "customer_impact", "next_actions"],
},
"approved_actions": [
"monitor payment failures every 15 minutes",
"publish customer update via status page",
"prepare support macro with workaround guidance",
],
}
Lo más importante aquí (en simple)
- El contexto es fijo y repetible: esto es útil para tests y debugging.
- El LLM no inventa source data; solo formula la respuesta sobre este contexto.
gateway.py — policy/execution boundary + patch guardrails
from __future__ import annotations
import hashlib
import json
import re
from dataclasses import dataclass
from difflib import SequenceMatcher
from typing import Any
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_seconds: int = 30
max_draft_chars: int = 900
max_review_issues: int = 4
max_fix_items: int = 4
max_answer_chars: int = 900
min_patch_similarity: float = 0.45
NUMBER_TOKEN_RE = re.compile(r"\b\d+(?:\.\d+)?%?\b")
INCIDENT_ID_RE = re.compile(r"\binc_[a-z0-9_]+\b", re.IGNORECASE)
SEVERITY_RE = re.compile(r"\bp[0-5]\b", re.IGNORECASE)
REGION_RE = re.compile(r"\b(us|eu|uk|ua|apac|global|emea|latam)\b", re.IGNORECASE)
QUOTED_PHRASE_RE = re.compile(r"['\"]([^'\"]{3,120})['\"]")
RESTRICTED_CLAIMS_RE = [
re.compile(r"\bresolved\b", re.IGNORECASE),
re.compile(r"\bfully[-\s]+recovered\b", re.IGNORECASE),
re.compile(r"\bincident\s+closed\b", re.IGNORECASE),
re.compile(r"\ball payments (?:are|is)\s+stable\b", re.IGNORECASE),
]
def _stable_json(value: Any) -> str:
if value is None or isinstance(value, (bool, int, float, str)):
return json.dumps(value, ensure_ascii=True, sort_keys=True)
if isinstance(value, list):
return "[" + ",".join(_stable_json(v) for v in value) + "]"
if isinstance(value, dict):
parts = []
for key in sorted(value):
parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
return "{" + ",".join(parts) + "}"
return json.dumps(str(value), ensure_ascii=True)
def _normalize_space(text: str) -> str:
return " ".join((text or "").strip().split())
def text_hash(text: str) -> str:
normalized = _normalize_space(text)
raw = _stable_json(normalized)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
def _extract_number_tokens(text: str) -> set[str]:
normalized = _normalize_space(text).lower()
return set(NUMBER_TOKEN_RE.findall(normalized))
def _extract_incident_ids(text: str) -> set[str]:
normalized = _normalize_space(text).lower()
return set(INCIDENT_ID_RE.findall(normalized))
def _extract_severity_labels(text: str) -> set[str]:
normalized = _normalize_space(text).upper()
return {match.upper() for match in SEVERITY_RE.findall(normalized)}
def _extract_regions(text: str) -> set[str]:
normalized = _normalize_space(text).upper()
return {value.upper() for value in REGION_RE.findall(normalized)}
def _extract_fix_plan_phrase_rules(fix_plan: list[str]) -> dict[str, list[str]]:
must_include: list[str] = []
must_remove: list[str] = []
def _append_unique(target: list[str], value: str) -> None:
if value and value not in target:
target.append(value)
for item in fix_plan:
item_norm = _normalize_space(item).lower()
quoted = [_normalize_space(match).lower() for match in QUOTED_PHRASE_RE.findall(item)]
quoted = [value for value in quoted if value]
if not quoted:
continue
is_replace = "replace" in item_norm
is_modify = any(word in item_norm for word in ("modify", "change", "update", "rewrite"))
has_with = " with " in f" {item_norm} "
has_example_marker = any(
marker in item_norm for marker in ("such as", "for example", "e.g.", "e.g")
)
if is_replace or is_modify:
_append_unique(must_remove, quoted[0])
# Enforce phrase add only for strict replace-with instructions, not modify/example hints.
if is_replace and len(quoted) >= 2 and has_with and not has_example_marker:
_append_unique(must_include, quoted[1])
continue
for phrase in quoted:
_append_unique(must_include, phrase)
return {
"must_include": must_include,
"must_remove": must_remove,
}
def _context_claim_text(value: Any) -> str:
if value is None:
return ""
if isinstance(value, str):
return value
if isinstance(value, (bool, int, float)):
return str(value)
if isinstance(value, list):
return " ".join(_context_claim_text(item) for item in value)
if isinstance(value, dict):
parts: list[str] = []
for key, item in value.items():
parts.append(str(key))
parts.append(_context_claim_text(item))
return " ".join(parts)
return str(value)
def _is_high_risk_issue(issue_type: str) -> bool:
return issue_type in {"legal_risk", "policy_violation"}
def validate_draft(draft: Any, *, max_chars: int) -> str:
if not isinstance(draft, str) or not draft.strip():
raise StopRun("invalid_draft:empty")
normalized = draft.strip()
if len(normalized) > max_chars:
raise StopRun("invalid_draft:too_long")
return normalized
def validate_review(
raw: Any,
*,
allowed_decisions_policy: set[str],
allowed_issue_types_policy: set[str],
max_review_issues: int,
max_fix_items: int,
) -> dict[str, Any]:
if not isinstance(raw, dict):
raise StopRun("invalid_review:not_object")
decision = raw.get("decision")
if not isinstance(decision, str) or not decision.strip():
raise StopRun("invalid_review:decision")
decision = decision.strip()
if decision not in allowed_decisions_policy:
raise StopRun(f"review_decision_not_allowed_policy:{decision}")
issues_raw = raw.get("issues", [])
if not isinstance(issues_raw, list):
raise StopRun("invalid_review:issues")
if len(issues_raw) > max_review_issues:
raise StopRun("invalid_review:too_many_issues")
issues: list[dict[str, str]] = []
for item in issues_raw:
if not isinstance(item, dict):
raise StopRun("invalid_review:issue_item")
issue_type = item.get("type")
note = item.get("note")
if not isinstance(issue_type, str) or not issue_type.strip():
raise StopRun("invalid_review:issue_type")
issue_type = issue_type.strip()
if issue_type not in allowed_issue_types_policy:
raise StopRun(f"review_issue_not_allowed_policy:{issue_type}")
if not isinstance(note, str) or not note.strip():
raise StopRun("invalid_review:issue_note")
issues.append({"type": issue_type, "note": note.strip()})
fix_plan_raw = raw.get("fix_plan", [])
if not isinstance(fix_plan_raw, list):
raise StopRun("invalid_review:fix_plan")
if len(fix_plan_raw) > max_fix_items:
raise StopRun("invalid_review:too_many_fix_items")
fix_plan: list[str] = []
for item in fix_plan_raw:
if not isinstance(item, str) or not item.strip():
raise StopRun("invalid_review:fix_item")
fix_plan.append(item.strip())
reason = raw.get("reason", "")
if reason is None:
reason = ""
if not isinstance(reason, str):
raise StopRun("invalid_review:reason")
reason = reason.strip()
if decision == "approve":
if issues and any(_is_high_risk_issue(issue["type"]) for issue in issues):
raise StopRun("invalid_review:approve_with_high_risk_issue")
return {
"decision": "approve",
"issues": issues,
"fix_plan": [],
"reason": reason,
"high_risk": False,
}
if decision == "revise":
if not issues:
raise StopRun("invalid_review:revise_without_issues")
if not fix_plan:
raise StopRun("invalid_review:revise_without_fix_plan")
if any(_is_high_risk_issue(issue["type"]) for issue in issues):
raise StopRun("invalid_review:high_risk_requires_escalate")
return {
"decision": "revise",
"issues": issues,
"fix_plan": fix_plan,
"reason": reason,
"high_risk": False,
}
if decision == "escalate":
if not reason:
raise StopRun("invalid_review:escalate_reason_required")
return {
"decision": "escalate",
"issues": issues,
"fix_plan": [],
"reason": reason,
"high_risk": True,
}
raise StopRun("invalid_review:unknown_decision")
class ReflectionGateway:
def __init__(
self,
*,
allow_execution_decisions: set[str],
budget: Budget,
):
self.allow_execution_decisions = set(allow_execution_decisions)
self.budget = budget
def enforce_execution_decision(self, decision: str) -> None:
if decision not in self.allow_execution_decisions:
raise StopRun(f"review_decision_denied_execution:{decision}")
def validate_revision(
self,
*,
original: str,
revised: str,
context: dict[str, Any],
fix_plan: list[str] | None = None,
) -> dict[str, Any]:
if not isinstance(revised, str) or not revised.strip():
raise StopRun("invalid_revised:empty")
revised_clean = revised.strip()
if len(revised_clean) > self.budget.max_answer_chars:
raise StopRun("invalid_revised:too_long")
normalized_original = _normalize_space(original)
normalized_revised = _normalize_space(revised_clean)
if normalized_original == normalized_revised:
raise StopRun("invalid_revised:no_changes")
similarity = SequenceMatcher(a=normalized_original, b=normalized_revised).ratio()
if similarity < self.budget.min_patch_similarity:
raise StopRun("patch_violation:too_large_edit")
allowed_text_tokens = _stable_json(context) + " " + original
allowed_text_claims = _normalize_space(_context_claim_text(context) + " " + original)
revised_numbers = _extract_number_tokens(revised_clean)
allowed_numbers = _extract_number_tokens(allowed_text_tokens)
if revised_numbers - allowed_numbers:
raise StopRun("patch_violation:no_new_facts")
revised_ids = _extract_incident_ids(revised_clean)
allowed_ids = _extract_incident_ids(allowed_text_tokens)
if revised_ids - allowed_ids:
raise StopRun("patch_violation:new_incident_id")
revised_severity = _extract_severity_labels(revised_clean)
allowed_severity = _extract_severity_labels(allowed_text_tokens)
if revised_severity - allowed_severity:
raise StopRun("patch_violation:new_severity_label")
revised_regions = _extract_regions(revised_clean)
allowed_regions = _extract_regions(allowed_text_tokens)
if revised_regions - allowed_regions:
raise StopRun("patch_violation:new_region")
for claim_re in RESTRICTED_CLAIMS_RE:
if claim_re.search(revised_clean) and not claim_re.search(allowed_text_claims):
raise StopRun("patch_violation:restricted_claims")
phrase_rules = _extract_fix_plan_phrase_rules(fix_plan or [])
must_include = phrase_rules["must_include"]
must_remove = phrase_rules["must_remove"]
if must_include or must_remove:
revised_lower = _normalize_space(revised_clean).lower()
missing = [phrase for phrase in must_include if phrase not in revised_lower]
if missing:
raise StopRun("patch_violation:fix_plan_not_applied")
still_present = [phrase for phrase in must_remove if phrase in revised_lower]
if still_present:
raise StopRun("patch_violation:fix_plan_not_applied")
return {
"answer": revised_clean,
"patch_similarity": round(similarity, 3),
"fix_plan_quoted_checks": len(must_include) + len(must_remove),
}
def validate_final(self, answer: str) -> str:
if not isinstance(answer, str) or not answer.strip():
raise StopRun("invalid_answer:empty")
cleaned = answer.strip()
if len(cleaned) > self.budget.max_answer_chars:
raise StopRun("invalid_answer:too_long")
return cleaned
Lo más importante aquí (en simple)
- Gateway solo enforce las decisiones de ejecución que recibe desde
main.py. - Policy allowlist y execution allowlist están separadas: runtime puede ser más estricto.
- Tras la revisión, gateway bloquea hechos nuevos por números, tokens críticos (
incident_id/region/severity) y restricted claims. - Para token checks se usa un contexto JSON estable, y para claim checks texto plano del contexto, para que los checks con regex sean más confiables.
- Si en
fix_planhay frases entre comillas, gateway construye reglasmust_include/must_removey las verifica en la revisión. - Para instrucciones
replace/modify/change/update/rewrite, la primera frase entre comillas se verifica comomust_remove. - Para
replace "A" with "B", la fraseBse verifica comomust_includesolo cuando no es un ejemplo (such as/for example/e.g.). fix_plan_quoted_checkscuenta solo estas reglas enforce, por eso puede ser menor que la cantidad de ítems enfix_plan.
llm.py — llamadas LLM de draft/review/revise
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
class LLMInvalid(Exception):
pass
DRAFT_SYSTEM_PROMPT = """
You are an operations communications writer.
Return exactly one JSON object:
{
"draft": "short customer-safe incident update"
}
Rules:
- Use only facts from provided incident_context.
- Include current status, customer impact, and next actions.
- Avoid absolute guarantees and overconfident claims.
- Keep draft concise and actionable.
- Do not output markdown or extra keys.
""".strip()
REVIEW_SYSTEM_PROMPT = """
You are a reflection reviewer.
Return exactly one JSON object:
{
"decision": "approve|revise|escalate",
"issues": [{"type":"overconfidence","note":"..."}],
"fix_plan": ["patch instruction"],
"reason": "for escalate only"
}
Rules:
- Review exactly once.
- decision=approve: fix_plan must be empty.
- decision=revise: provide 1-4 concrete patch-only instructions.
- For enforceable instructions, include quoted target phrases in fix_plan.
- decision=escalate: use only for high-risk or policy-unsafe content.
- Do not add new facts in fix_plan.
- Use only issue types from allowed_issue_types.
- Do not output markdown or extra keys.
""".strip()
REVISE_SYSTEM_PROMPT = """
You are an editor applying one controlled patch.
Return exactly one JSON object:
{
"revised_answer": "updated answer"
}
Rules:
- Edit only what is needed to satisfy fix_plan.
- Keep scope and intent of original draft.
- Do not introduce new facts or numbers.
- Keep answer concise and customer-safe.
- Do not output markdown or extra keys.
""".strip()
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
data = json.loads(text)
except json.JSONDecodeError as exc:
raise LLMInvalid("llm_invalid_json") from exc
if not isinstance(data, dict):
raise LLMInvalid("llm_invalid_json")
return data
def generate_draft(*, goal: str, incident_context: dict[str, Any]) -> str:
payload = {
"goal": goal,
"incident_context": incident_context,
}
data = _chat_json(system_prompt=DRAFT_SYSTEM_PROMPT, payload=payload)
draft = data.get("draft")
if not isinstance(draft, str):
raise LLMInvalid("llm_invalid_schema")
draft = draft.strip()
if not draft:
raise LLMEmpty("llm_empty")
return draft
def review_draft(
*,
goal: str,
incident_context: dict[str, Any],
draft: str,
allowed_issue_types: list[str],
) -> dict[str, Any]:
payload = {
"goal": goal,
"incident_context": incident_context,
"draft": draft,
"allowed_issue_types": allowed_issue_types,
}
return _chat_json(system_prompt=REVIEW_SYSTEM_PROMPT, payload=payload)
def revise_once(
*,
goal: str,
incident_context: dict[str, Any],
draft: str,
fix_plan: list[str],
) -> str:
payload = {
"goal": goal,
"incident_context": incident_context,
"draft": draft,
"fix_plan": fix_plan,
}
data = _chat_json(system_prompt=REVISE_SYSTEM_PROMPT, payload=payload)
revised = data.get("revised_answer")
if not isinstance(revised, str):
raise LLMInvalid("llm_invalid_schema")
revised = revised.strip()
if not revised:
raise LLMEmpty("llm_empty")
return revised
Lo más importante aquí (en simple)
- Cada etapa devuelve un contrato JSON, no texto libre.
llm_invalid_jsonyllm_invalid_schemase separan para diagnóstico limpio.
main.py — orquestación del flow Reflection
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from context import build_incident_context
from gateway import Budget, ReflectionGateway, StopRun, text_hash, validate_draft, validate_review
from llm import LLMEmpty, LLMInvalid, LLMTimeout, generate_draft, review_draft, revise_once
GOAL = (
"Draft a customer-facing payment incident update for US enterprise customers. "
"Keep it accurate, avoid overconfident language, and include next actions."
)
INCIDENT_CONTEXT = build_incident_context(report_date="2026-03-05", region="US")
BUDGET = Budget(
max_seconds=30,
max_draft_chars=900,
max_review_issues=4,
max_fix_items=4,
max_answer_chars=900,
min_patch_similarity=0.45,
)
ALLOWED_REVIEW_DECISIONS_POLICY = {"approve", "revise", "escalate"}
AUTO_REVISION_ENABLED = True
ALLOWED_REVIEW_DECISIONS_EXECUTION = (
ALLOWED_REVIEW_DECISIONS_POLICY if AUTO_REVISION_ENABLED else {"approve", "escalate"}
)
ALLOWED_ISSUE_TYPES_POLICY = {
"overconfidence",
"missing_uncertainty",
"contradiction",
"scope_leak",
"policy_violation",
"legal_risk",
}
def run_reflection_agent(*, goal: str, incident_context: dict[str, Any]) -> dict[str, Any]:
run_id = str(uuid.uuid4())
started = time.monotonic()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = ReflectionGateway(
allow_execution_decisions=ALLOWED_REVIEW_DECISIONS_EXECUTION,
budget=BUDGET,
)
def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
payload = {
"run_id": run_id,
"status": "stopped",
"stop_reason": stop_reason,
"phase": phase,
"trace": trace,
"history": history,
}
payload.update(extra)
return payload
try:
draft_raw = generate_draft(goal=goal, incident_context=incident_context)
draft = validate_draft(draft_raw, max_chars=BUDGET.max_draft_chars)
except LLMTimeout:
return stopped("llm_timeout", phase="draft")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="draft")
except LLMEmpty:
return stopped("llm_empty", phase="draft")
except StopRun as exc:
return stopped(exc.reason, phase="draft")
trace.append(
{
"step": 1,
"phase": "draft",
"draft_hash": text_hash(draft),
"chars": len(draft),
"ok": True,
}
)
history.append(
{
"step": 1,
"action": "draft_once",
"draft": draft,
}
)
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="review")
try:
raw_review = review_draft(
goal=goal,
incident_context=incident_context,
draft=draft,
allowed_issue_types=sorted(ALLOWED_ISSUE_TYPES_POLICY),
)
except LLMTimeout:
return stopped("llm_timeout", phase="review")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="review")
try:
review = validate_review(
raw_review,
allowed_decisions_policy=ALLOWED_REVIEW_DECISIONS_POLICY,
allowed_issue_types_policy=ALLOWED_ISSUE_TYPES_POLICY,
max_review_issues=BUDGET.max_review_issues,
max_fix_items=BUDGET.max_fix_items,
)
gateway.enforce_execution_decision(review["decision"])
except StopRun as exc:
return stopped(exc.reason, phase="review", raw_review=raw_review)
trace.append(
{
"step": 2,
"phase": "review",
"decision": review["decision"],
"issues": len(review["issues"]),
"fix_items": len(review["fix_plan"]),
"ok": True,
}
)
history.append(
{
"step": 2,
"action": "review_once",
"review": review,
}
)
if review["decision"] == "escalate":
escalation_reason = str(review.get("reason", "")).strip()
return {
"run_id": run_id,
"status": "stopped",
"stop_reason": "policy_escalation",
"escalation_reason": escalation_reason[:120],
"phase": "review",
"review": review,
"trace": trace,
"history": history,
}
final_answer = draft
revised = False
if review["decision"] == "revise":
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="revise")
try:
revised_raw = revise_once(
goal=goal,
incident_context=incident_context,
draft=draft,
fix_plan=review["fix_plan"],
)
revised_payload = gateway.validate_revision(
original=draft,
revised=revised_raw,
context=incident_context,
fix_plan=review["fix_plan"],
)
except LLMTimeout:
return stopped("llm_timeout", phase="revise")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="revise")
except LLMEmpty:
return stopped("llm_empty", phase="revise")
except StopRun as exc:
return stopped(exc.reason, phase="revise")
final_answer = revised_payload["answer"]
revised = True
trace.append(
{
"step": 3,
"phase": "revise",
"patch_similarity": revised_payload["patch_similarity"],
"fix_plan_quoted_checks": revised_payload["fix_plan_quoted_checks"],
"revised_hash": text_hash(final_answer),
"ok": True,
}
)
history.append(
{
"step": 3,
"action": "revise_once",
"fix_plan": review["fix_plan"],
"revised_answer": final_answer,
}
)
try:
final_answer = gateway.validate_final(final_answer)
except StopRun as exc:
return stopped(exc.reason, phase="finalize")
trace.append(
{
"step": 4 if revised else 3,
"phase": "finalize",
"final_hash": text_hash(final_answer),
"ok": True,
}
)
history.append(
{
"step": 4 if revised else 3,
"action": "finalize",
"status": "final",
}
)
return {
"run_id": run_id,
"status": "ok",
"stop_reason": "success",
"outcome": "revised_once" if revised else "approved_direct",
"answer": final_answer,
"review_decision": review["decision"],
"issues": review["issues"],
"fix_plan": review["fix_plan"],
"trace": trace,
"history": history,
}
def main() -> None:
result = run_reflection_agent(goal=GOAL, incident_context=INCIDENT_CONTEXT)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Lo más importante aquí (en simple)
- Hay separación clara: policy allowlist define lo permitido conceptualmente, execution allowlist define lo que runtime permite realmente.
- El run termina de forma predecible:
successo unstop_reasoncontrolado. traceehistorypermiten entender cada paso de review/revise.
Ejemplo de salida
{
"run_id": "f67eaad4-1c3e-4160-9d8b-1e2e9af42c82",
"status": "ok",
"stop_reason": "success",
"outcome": "revised_once",
"answer": "We are currently experiencing payment service degradation affecting ~27% of US enterprise checkouts, with failed payments at 3.4% and 5 chargeback alerts. We expect recovery in approximately 45 minutes, but this may change. Next actions: monitor failures every 15 minutes, publish status page updates every 15 minutes, and support users with workaround macros.",
"review_decision": "revise",
"issues": [
{
"type": "overconfidence",
"note": "ETA wording sounded too certain and could be read as a guarantee."
}
],
"fix_plan": [
"Add uncertainty wording for ETA (for example: 'recovery in approximately 45 minutes, but this may change').",
"State explicitly that status page updates will be published every 15 minutes.",
"Keep a clear 'Next actions' section with monitoring, updates, and support macros."
],
"trace": [
{
"step": 1,
"phase": "draft",
"draft_hash": "f4b3f386c80a",
"chars": 547,
"ok": true
},
{
"step": 2,
"phase": "review",
"decision": "revise",
"issues": 1,
"fix_items": 3,
"ok": true
},
{
"step": 3,
"phase": "revise",
"patch_similarity": 0.564,
"fix_plan_quoted_checks": 1,
"revised_hash": "2a5ac4952ae0",
"ok": true
},
{
"step": 4,
"phase": "finalize",
"final_hash": "2a5ac4952ae0",
"ok": true
}
],
"history": [{...}]
}
Valores típicos de stop_reason
success— el run terminó correctamentellm_timeout— el LLM no respondió dentro deOPENAI_TIMEOUT_SECONDSllm_empty— respuesta vacía del LLM en draft/revisellm_invalid_json— el LLM devolvió JSON inválidollm_invalid_schema— el JSON no cumple el contratoinvalid_draft:*— el borrador no pasó validación básicainvalid_review:*— el review no pasó el contrato de policy-layerreview_decision_not_allowed_policy:*— la decisión de review está fuera de policy allowlistreview_decision_denied_execution:*— runtime denegó la decisión de ejecuciónpatch_violation:no_new_facts— la revisión añadió nuevos hechos numéricospatch_violation:new_incident_id|new_severity_label|new_region— la revisión añadió nuevos identificadores críticospatch_violation:restricted_claims— la revisión añadió claims restringidos (resolved,fully recovered, etc.) fuera de contextopatch_violation:fix_plan_not_applied— la revisión no aplicó hints entre comillas defix_planpatch_violation:too_large_edit— la revisión excedió el límite patch-onlypolicy_escalation— el review devolvió escalación como decisión final; detalle enescalation_reasonmax_seconds— se excedió el presupuesto total de tiempo del runinvalid_answer:*— la respuesta final no pasó validación
Qué NO se muestra
- almacenamiento persistido para trace/history (en el ejemplo todo ocurre en un solo run)
- retry/backoff para llamadas LLM
- cola human-in-the-loop para
escalate - render automático de diff de patch para UI
Qué probar después
- Desactivar
AUTO_REVISION_ENABLEDy observarreview_decision_denied_execution:revise. - Añadir verificación estricta de cambios permitidos por
fix_plan(contrato patch a nivel de línea). - Enviar
policy_escalationa una cola externa para review manual.