Esencia del patrón (breve)
Orchestrator Agent es un patrón en el que el agente no ejecuta por sí mismo el trabajo de dominio, sino que coordina varios ejecutores especializados.
El LLM decide qué subtareas lanzar (qué hacer), mientras la capa de policy controla cómo lanzarlas de forma segura (qué está permitido, cuánto esperar, cuándo reintentar, qué límites aplicar).
Qué demuestra este ejemplo
- etapa
Planseparada para construir el conjunto de subtareas Dispatchparalelo de subtareas (ThreadPoolExecutor)- policy boundary entre orchestration decision (LLM) y worker execution
- validación estricta del plan (
kind,tasks, shape de tarea) - allowlists separadas para las capas de policy y execution
- timeout por subtarea +
retry_oncepara transient timeout - política de resultados parciales: una tarea no crítica puede fallar, pero el run continúa
- valores
stop_reasonexplícitos para depuración y monitoreo de producción
Arquitectura
- El LLM recibe el goal y devuelve un plan de orquestación en JSON (
kind="plan",tasks). - El policy boundary valida el plan y rechaza workers/argumentos no permitidos.
OrchestratorGatewaylanza subtareas en paralelo con timeout y retry.- Los resultados (successful y failed) se reúnen en un
history/traceunificado. - Si una tarea crítica no se completa, el run se detiene.
- Si están los hechos mínimos requeridos, el LLM compone un informe operativo final corto.
El LLM devuelve intent (plan), tratado como input no confiable: el policy boundary primero lo valida y luego (si está permitido) lanza workers.
Estructura del proyecto
examples/
└── agent-patterns/
└── orchestrator-agent/
└── python/
├── main.py # Plan -> Dispatch (parallel) -> Aggregate -> Finalize
├── llm.py # planner + final synthesis
├── gateway.py # policy boundary: validation + timeout/retry + budgets
├── workers.py # deterministic specialists (sales/payments/inventory)
└── requirements.txt
Cómo ejecutar
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/orchestrator-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Se requiere Python 3.11+.
Opción con export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Opción con .env (opcional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
Esta es la variante de shell (macOS/Linux). En Windows es más fácil usar variables de entorno con set o, si lo prefieres, python-dotenv para cargar .env automáticamente.
Tarea
Imagina un caso real de producción:
"Prepara un informe operativo matutino para el mercado US del 2026-02-26: ventas, incidentes de pago y riesgos de inventario."
El agente no debe hacerlo todo en un solo flujo. Debe:
- dividir la tarea en subtareas independientes
- ejecutar subtareas en paralelo
- relanzar una tarea con timeout una vez
- producir un único brief final corto
Solución
Aquí el Orchestrator funciona como coordinador:
- el LLM genera un plan con tareas para workers especializados
- la capa de policy revisa el plan y bloquea tareas no permitidas
- el gateway ejecuta workers en paralelo con timeout/retry
- los resultados se recopilan en un modelo agregado
- el texto final corto se genera solo después de recopilar hechos
Esto no es ReAct ni Routing:
- no es ReAct, porque el foco no está en un ciclo secuencial
Think -> Actde un solo agente - no es Routing, porque aquí se necesitan varios ejecutores a la vez, no un solo target
Código
workers.py — ejecutores especializados
from __future__ import annotations
import time
from typing import Any
SALES_METRICS = {
"2026-02-26:US": {
"gross_sales_usd": 182_450.0,
"orders": 4_820,
"aov_usd": 37.85,
}
}
PAYMENT_ALERTS = {
"2026-02-26:US": {
"failed_payment_rate": 0.023,
"chargeback_alerts": 3,
"gateway_incident": "none",
}
}
INVENTORY_ALERTS = {
"2026-02-26:US": {
"low_stock_skus": ["SKU-4411", "SKU-8820"],
"out_of_stock_skus": ["SKU-9033"],
"restock_eta_days": 2,
}
}
_ATTEMPT_STATE: dict[str, int] = {}
def _key(report_date: str, region: str) -> str:
return f"{report_date}:{region.upper()}"
def sales_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
time.sleep(0.4)
metrics = SALES_METRICS.get(_key(report_date, region))
if not metrics:
return {"status": "done", "worker": "sales_worker", "result": {"warning": "sales_data_missing"}}
return {"status": "done", "worker": "sales_worker", "result": metrics}
def payments_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
# Simulate a transient timeout on the first attempt within one request_id.
attempt_key = f"{request_id}:{_key(report_date, region)}:payments"
_ATTEMPT_STATE[attempt_key] = _ATTEMPT_STATE.get(attempt_key, 0) + 1
attempt = _ATTEMPT_STATE[attempt_key]
if attempt == 1:
time.sleep(2.6) # Exceeds gateway timeout on purpose.
else:
time.sleep(0.3)
alerts = PAYMENT_ALERTS.get(_key(report_date, region))
if not alerts:
return {"status": "done", "worker": "payments_worker", "result": {"warning": "payments_data_missing"}}
return {"status": "done", "worker": "payments_worker", "result": alerts}
def inventory_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
time.sleep(0.5)
alerts = INVENTORY_ALERTS.get(_key(report_date, region))
if not alerts:
return {"status": "done", "worker": "inventory_worker", "result": {"warning": "inventory_data_missing"}}
return {"status": "done", "worker": "inventory_worker", "result": alerts}
Qué es lo más importante aquí (en palabras simples)
- Cada worker es responsable solo de su parte de dominio.
payments_workerprovoca intencionalmente timeout en el 1er intento para mostrar la retry-policy.- Los workers no toman decisiones de orquestación: solo devuelven hechos.
Contract entre capas en este ejemplo:
- El LLM solo puede proponer un plan (
kind="plan",tasks). - El Gateway valida el plan y rechaza tareas peligrosas/inválidas.
- Las unknown keys en tareas se ignoran durante la normalización si están presentes los campos obligatorios.
- El Gateway aplica runtime-policy: allowlist, timeout, retry, dispatch budget, deadline.
- Los workers ejecutan solo su función de dominio y devuelven un
dictestructurado. - Orchestrator decide si el run puede finalizarse (por ejemplo, tras failed critical task -> stop).
gateway.py — policy boundary (capa más importante)
from __future__ import annotations
import hashlib
import json
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed
from dataclasses import dataclass
from typing import Any, Callable
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_tasks: int = 4
max_parallel: int = 3
max_retries_per_task: int = 1
max_dispatches: int = 8
task_timeout_seconds: float = 2.0
max_seconds: int = 25
def args_hash(args: dict[str, Any]) -> str:
stable = json.dumps(args, ensure_ascii=True, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(stable.encode("utf-8")).hexdigest()[:12]
def validate_orchestration_plan(
raw_plan: dict[str, Any], *, allowed_workers: set[str], max_tasks: int
) -> list[dict[str, Any]]:
if not isinstance(raw_plan, dict):
raise StopRun("invalid_plan:non_json")
if raw_plan.get("kind") != "plan":
raise StopRun("invalid_plan:kind")
tasks = raw_plan.get("tasks")
if not isinstance(tasks, list):
raise StopRun("invalid_plan:tasks")
if not (1 <= len(tasks) <= max_tasks):
raise StopRun("invalid_plan:max_tasks")
normalized: list[dict[str, Any]] = []
seen_ids: set[str] = set()
required_keys = {"id", "worker", "args", "critical"}
for task in tasks:
if not isinstance(task, dict):
raise StopRun("invalid_plan:task_shape")
if not required_keys.issubset(task.keys()):
raise StopRun("invalid_plan:missing_keys")
# Ignore unknown keys and keep only contract fields.
task_id = task["id"]
worker = task["worker"]
args = task["args"]
critical = task["critical"]
if not isinstance(task_id, str) or not task_id.strip():
raise StopRun("invalid_plan:task_id")
if task_id in seen_ids:
raise StopRun("invalid_plan:duplicate_task_id")
seen_ids.add(task_id)
if not isinstance(worker, str) or not worker.strip():
raise StopRun("invalid_plan:worker")
if worker not in allowed_workers:
raise StopRun(f"invalid_plan:worker_not_allowed:{worker}")
if not isinstance(args, dict):
raise StopRun("invalid_plan:args")
if not isinstance(critical, bool):
raise StopRun("invalid_plan:critical")
normalized.append(
{
"id": task_id.strip(),
"worker": worker.strip(),
"args": dict(args),
"critical": critical,
}
)
return normalized
class OrchestratorGateway:
def __init__(
self,
*,
allow: set[str],
registry: dict[str, Callable[..., dict[str, Any]]],
budget: Budget,
) -> None:
self.allow = allow
self.registry = registry
self.budget = budget
self.dispatches = 0
self._lock = threading.Lock()
def _consume_dispatch_budget(self) -> None:
with self._lock:
self.dispatches += 1
if self.dispatches > self.budget.max_dispatches:
raise StopRun("max_dispatches")
def _call_once(
self, worker_name: str, args: dict[str, Any], *, deadline_monotonic: float
) -> dict[str, Any]:
if worker_name not in self.allow:
raise StopRun(f"worker_denied:{worker_name}")
fn = self.registry.get(worker_name)
if fn is None:
raise StopRun(f"worker_missing:{worker_name}")
remaining = deadline_monotonic - time.monotonic()
if remaining <= 0:
raise StopRun("max_seconds")
task_timeout = min(self.budget.task_timeout_seconds, max(0.01, remaining))
with ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(fn, **args)
try:
result = future.result(timeout=task_timeout)
except TimeoutError as exc:
raise StopRun("task_timeout") from exc
except TypeError as exc:
raise StopRun(f"worker_bad_args:{worker_name}") from exc
if not isinstance(result, dict):
raise StopRun(f"worker_bad_result:{worker_name}")
return result
def _run_task_with_retry(
self, task: dict[str, Any], request_id: str, deadline_monotonic: float
) -> dict[str, Any]:
task_id = task["id"]
worker_name = task["worker"]
semantic_args = dict(task["args"])
semantic_hash = args_hash(semantic_args)
base_args = dict(semantic_args)
base_args["request_id"] = request_id
attempts_total = self.budget.max_retries_per_task + 1
last_reason = "unknown"
for attempt in range(1, attempts_total + 1):
try:
self._consume_dispatch_budget()
observation = self._call_once(
worker_name, base_args, deadline_monotonic=deadline_monotonic
)
return {
"task_id": task_id,
"worker": worker_name,
"critical": task["critical"],
"status": "done",
"attempts_used": attempt,
"retried": attempt > 1,
"args_hash": semantic_hash,
"observation": observation,
}
except StopRun as exc:
last_reason = exc.reason
if exc.reason == "task_timeout" and attempt < attempts_total:
continue
return {
"task_id": task_id,
"worker": worker_name,
"critical": task["critical"],
"status": "failed",
"attempts_used": attempt,
"retried": attempt > 1,
"args_hash": semantic_hash,
"stop_reason": last_reason,
}
return {
"task_id": task_id,
"worker": worker_name,
"critical": task["critical"],
"status": "failed",
"attempts_used": attempts_total,
"retried": True,
"args_hash": semantic_hash,
"stop_reason": last_reason,
}
def dispatch_parallel(
self,
tasks: list[dict[str, Any]],
*,
request_id: str,
deadline_monotonic: float,
) -> list[dict[str, Any]]:
if not tasks:
return []
indexed_tasks = list(enumerate(tasks))
output: list[tuple[int, dict[str, Any]]] = []
parallelism = min(self.budget.max_parallel, len(tasks))
with ThreadPoolExecutor(max_workers=parallelism) as pool:
future_to_idx = {
pool.submit(
self._run_task_with_retry, task, request_id, deadline_monotonic
): idx
for idx, task in indexed_tasks
}
remaining = deadline_monotonic - time.monotonic()
if remaining <= 0:
raise StopRun("max_seconds")
try:
for future in as_completed(future_to_idx, timeout=max(0.01, remaining)):
idx = future_to_idx[future]
output.append((idx, future.result()))
except TimeoutError as exc:
raise StopRun("max_seconds") from exc
output.sort(key=lambda item: item[0])
return [item[1] for item in output]
Qué es lo más importante aquí (en palabras simples)
- El gateway es donde se implementa la execution-policy: allowlist, timeout, retry, dispatch budget.
validate_orchestration_plan(...)verifica el contrato obligatorio de campos e ignora keys extra.dispatch_parallel(...)mantiene el paralelismo y además controla la deadline global (max_seconds) dentro del gateway.
llm.py — planificación y síntesis final
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
PLAN_SYSTEM_PROMPT = """
You are an orchestration planner.
Return only one JSON object in this exact shape:
{
"kind": "plan",
"tasks": [
{"id":"t1","worker":"sales_worker","args":{"report_date":"YYYY-MM-DD","region":"US"},"critical":true}
]
}
Rules:
- Create 2 to 4 independent tasks.
- Use only workers from available_workers.
- Keep args minimal and valid for the selected worker.
- Mark task as critical=true only if final brief is impossible without it.
- Do not output markdown or extra keys.
""".strip()
FINAL_SYSTEM_PROMPT = """
You are an operations assistant.
Write a short final briefing in English for an e-commerce operations manager.
Include:
- headline health status (green/yellow/red)
- key sales metrics
- payment risk note
- inventory risk note
- one concrete next action
Use only evidence from orchestrator output.
""".strip()
WORKER_CATALOG = [
{
"name": "sales_worker",
"description": "Provides sales KPIs for a date and region",
"args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
},
{
"name": "payments_worker",
"description": "Provides payment failure and chargeback signals",
"args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
},
{
"name": "inventory_worker",
"description": "Provides low-stock and out-of-stock risk signals",
"args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
},
]
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def create_plan(
*,
goal: str,
report_date: str,
region: str,
max_tasks: int,
) -> dict[str, Any]:
payload = {
"goal": goal,
"report_date": report_date,
"region": region,
"max_tasks": max_tasks,
"available_workers": WORKER_CATALOG,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": PLAN_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_final_answer(*, goal: str, aggregate: dict[str, Any]) -> str:
payload = {
"goal": goal,
"aggregate": aggregate,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or ""
text = text.strip()
if not text:
raise LLMEmpty("llm_empty")
return text
Qué es lo más importante aquí (en palabras simples)
- El LLM devuelve solo plan/answer, pero no lanza workers directamente.
- El planner recibe
max_tasksyavailable_workers, por lo que la selección ya queda limitada en la etapa de intent. - Ante non-JSON o timeout, se devuelven señales explícitas para una parada controlada.
main.py — Plan -> Dispatch (parallel) -> Aggregate -> Finalize
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from gateway import Budget, OrchestratorGateway, StopRun, validate_orchestration_plan
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from workers import inventory_worker, payments_worker, sales_worker
REPORT_DATE = "2026-02-26"
REGION = "US"
GOAL = (
"Prepare a morning operations report for e-commerce region US on 2026-02-26. "
"Include sales, payment risk, and inventory risk with one concrete action."
)
BUDGET = Budget(
max_tasks=4,
max_parallel=3,
max_retries_per_task=1,
max_dispatches=8,
task_timeout_seconds=2.0,
max_seconds=25,
)
WORKER_REGISTRY = {
"sales_worker": sales_worker,
"payments_worker": payments_worker,
"inventory_worker": inventory_worker,
}
ALLOWED_WORKERS_POLICY = {"sales_worker", "payments_worker", "inventory_worker"}
INVENTORY_RUNTIME_ENABLED = True
ALLOWED_WORKERS_EXECUTION = (
{"sales_worker", "payments_worker", "inventory_worker"}
if INVENTORY_RUNTIME_ENABLED
else {"sales_worker", "payments_worker"}
)
# Set INVENTORY_RUNTIME_ENABLED=False to get worker_denied:inventory_worker in trace.
def aggregate_results(task_results: list[dict[str, Any]]) -> dict[str, Any]:
done_by_worker: dict[str, dict[str, Any]] = {}
failed: list[dict[str, Any]] = []
for item in task_results:
if item["status"] == "done":
done_by_worker[item["worker"]] = item["observation"]
else:
failed.append(
{
"task_id": item["task_id"],
"worker": item["worker"],
"critical": item["critical"],
"stop_reason": item["stop_reason"],
}
)
sales = done_by_worker.get("sales_worker", {}).get("result", {})
payments = done_by_worker.get("payments_worker", {}).get("result", {})
inventory = done_by_worker.get("inventory_worker", {}).get("result", {})
health = "green"
if payments.get("failed_payment_rate", 0) >= 0.03 or inventory.get("out_of_stock_skus"):
health = "yellow"
if payments.get("gateway_incident") not in (None, "none"):
health = "red"
return {
"report_date": REPORT_DATE,
"region": REGION,
"health": health,
"sales": sales,
"payments": payments,
"inventory": inventory,
"failed_tasks": failed,
}
def run_orchestrator(goal: str) -> dict[str, Any]:
started = time.monotonic()
deadline = started + BUDGET.max_seconds
request_id = uuid.uuid4().hex[:10]
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = OrchestratorGateway(
allow=ALLOWED_WORKERS_EXECUTION,
registry=WORKER_REGISTRY,
budget=BUDGET,
)
try:
raw_plan = create_plan(
goal=goal,
report_date=REPORT_DATE,
region=REGION,
max_tasks=BUDGET.max_tasks,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "plan",
"trace": trace,
"history": history,
}
try:
tasks = validate_orchestration_plan(
raw_plan,
allowed_workers=ALLOWED_WORKERS_POLICY,
max_tasks=BUDGET.max_tasks,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "plan",
"raw_plan": raw_plan,
"trace": trace,
"history": history,
}
if time.monotonic() > deadline:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"phase": "dispatch",
"plan": tasks,
"trace": trace,
"history": history,
}
try:
task_results = gateway.dispatch_parallel(
tasks,
request_id=request_id,
deadline_monotonic=deadline,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "dispatch",
"plan": tasks,
"trace": trace,
"history": history,
}
for item in task_results:
trace.append(
{
"task_id": item["task_id"],
"worker": item["worker"],
"status": item["status"],
"attempts_used": item["attempts_used"],
"retried": item["retried"],
"args_hash": item["args_hash"],
"stop_reason": item.get("stop_reason"),
"critical": item["critical"],
}
)
history.append(item)
failed_critical = [
item
for item in task_results
if item["status"] == "failed" and item["critical"]
]
if failed_critical:
return {
"status": "stopped",
"stop_reason": "critical_task_failed",
"phase": "dispatch",
"plan": tasks,
"failed_critical": failed_critical,
"trace": trace,
"history": history,
}
aggregate = aggregate_results(task_results)
if time.monotonic() > deadline:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"phase": "finalize",
"aggregate": aggregate,
"trace": trace,
"history": history,
}
try:
answer = compose_final_answer(goal=goal, aggregate=aggregate)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "finalize",
"aggregate": aggregate,
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"phase": "finalize",
"aggregate": aggregate,
"trace": trace,
"history": history,
}
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"plan": tasks,
"aggregate": aggregate,
"trace": trace,
"history": history,
}
def main() -> None:
result = run_orchestrator(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Qué es lo más importante aquí (en palabras simples)
run_orchestrator(...)gestiona el ciclo de vida completo del orchestration run.- La ejecución en paralelo ocurre solo en el gateway, no dentro de decisiones del LLM.
ALLOWED_WORKERS_POLICYyALLOWED_WORKERS_EXECUTIONpueden diferir (runtime feature-flag/tenant gate).- Hay una política clara: si falla una tarea crítica, la finalización no se ejecuta.
traceyhistorydan una auditoría transparente: qué se ejecutó, cuántos intentos, por qué falló.
requirements.txt
openai==2.21.0
Ejemplo de salida
Debido a la simulación de timeout en payments_worker en el primer intento, un trace típico contiene retried=true.
{
"status": "ok",
"stop_reason": "success",
"answer": "Morning Operations Report - US Region (2026-02-26): Health=yellow. Gross sales=$182,450, orders=4,820, AOV=$37.85. Payment failed rate=2.3% with 3 chargeback alerts. Inventory has one out-of-stock SKU (SKU-9033). Next action: prioritize immediate restock for SKU-9033.",
"plan": [
{
"id": "t1",
"worker": "sales_worker",
"args": {"report_date": "2026-02-26", "region": "US"},
"critical": true
},
{
"id": "t2",
"worker": "payments_worker",
"args": {"report_date": "2026-02-26", "region": "US"},
"critical": true
},
{
"id": "t3",
"worker": "inventory_worker",
"args": {"report_date": "2026-02-26", "region": "US"},
"critical": true
}
],
"aggregate": {
"report_date": "2026-02-26",
"region": "US",
"health": "yellow",
"sales": {"gross_sales_usd": 182450.0, "orders": 4820, "aov_usd": 37.85},
"payments": {"failed_payment_rate": 0.023, "chargeback_alerts": 3, "gateway_incident": "none"},
"inventory": {"low_stock_skus": ["SKU-4411", "SKU-8820"], "out_of_stock_skus": ["SKU-9033"], "restock_eta_days": 2}
},
"trace": [
{
"task_id": "t1",
"worker": "sales_worker",
"status": "done",
"attempts_used": 1,
"retried": false
},
{
"task_id": "t2",
"worker": "payments_worker",
"status": "done",
"attempts_used": 2,
"retried": true
},
{
"task_id": "t3",
"worker": "inventory_worker",
"status": "done",
"attempts_used": 1,
"retried": false
}
],
"history": [{...}]
}
Este es un ejemplo abreviado: history se muestra de forma resumida y el formato de answer puede variar ligeramente entre runs.
Si estableces INVENTORY_RUNTIME_ENABLED=False, aparecerá worker_denied:inventory_worker en trace - un ejemplo claro de la diferencia entre policy y execution allowlist.
stop_reason típicos
success- plan ejecutado, respuesta final generadainvalid_plan:*- el plan del LLM no pasó policy validationinvalid_plan:missing_keys- a una tarea le faltan campos obligatorios del contratollm_timeout- el LLM no respondió dentro deOPENAI_TIMEOUT_SECONDSllm_empty- el LLM devolvió una respuesta final vacíamax_seconds- se superó el presupuesto total de tiempo del runmax_dispatches- se superó el límite de dispatch de subtareas (incluyendo retries)worker_denied:<name>- worker no está en execution allowlistworker_missing:<name>- worker ausente en registryworker_bad_args:<name>- la tarea contiene argumentos inválidosworker_bad_result:<name>- worker devolvió datos fuera del contratotask_timeout- la subtarea no terminó dentro detask_timeout_secondscritical_task_failed- falló al menos una subtarea crítica
Qué NO se muestra aquí
- No hay auth/PII ni aislamiento de tenant.
- No hay cola/job-runner persistente (RabbitMQ/SQS/Kafka).
- No hay circuit breaker ni políticas backoff complejas.
- No hay presupuestos de tokens/costo (cost guardrails).
Qué probar después
- Haz
payments_workerno crítico (critical=false) y mira cómo el run devuelve partial-result. - Establece
INVENTORY_RUNTIME_ENABLED=Falsey verificaworker_denied:inventory_worker. - Reduce
task_timeout_secondsa0.2y comprueba cómo aumenta la cantidad de timeout/retry. - Agrega un cuarto worker (
fraud_worker) y compara el tiempo de run conmax_parallel=2ymax_parallel=4.