Orchestrator Agent — Python (ejemplo de producción simple con LLM)

Ejemplo ejecutable de agente Orchestrator en Python con estilo de producción, con plan de subtareas, dispatch paralelo, política timeout/retry y agregación final.
En esta página
  1. Esencia del patrón (breve)
  2. Qué demuestra este ejemplo
  3. Arquitectura
  4. Estructura del proyecto
  5. Cómo ejecutar
  6. Tarea
  7. Solución
  8. Código
  9. workers.py — ejecutores especializados
  10. gateway.py — policy boundary (capa más importante)
  11. llm.py — planificación y síntesis final
  12. main.py — Plan -> Dispatch (parallel) -> Aggregate -> Finalize
  13. requirements.txt
  14. Ejemplo de salida
  15. stop_reason típicos
  16. Qué NO se muestra aquí
  17. Qué probar después

Esencia del patrón (breve)

Orchestrator Agent es un patrón en el que el agente no ejecuta por sí mismo el trabajo de dominio, sino que coordina varios ejecutores especializados.

El LLM decide qué subtareas lanzar (qué hacer), mientras la capa de policy controla cómo lanzarlas de forma segura (qué está permitido, cuánto esperar, cuándo reintentar, qué límites aplicar).


Qué demuestra este ejemplo

  • etapa Plan separada para construir el conjunto de subtareas
  • Dispatch paralelo de subtareas (ThreadPoolExecutor)
  • policy boundary entre orchestration decision (LLM) y worker execution
  • validación estricta del plan (kind, tasks, shape de tarea)
  • allowlists separadas para las capas de policy y execution
  • timeout por subtarea + retry_once para transient timeout
  • política de resultados parciales: una tarea no crítica puede fallar, pero el run continúa
  • valores stop_reason explícitos para depuración y monitoreo de producción

Arquitectura

  1. El LLM recibe el goal y devuelve un plan de orquestación en JSON (kind="plan", tasks).
  2. El policy boundary valida el plan y rechaza workers/argumentos no permitidos.
  3. OrchestratorGateway lanza subtareas en paralelo con timeout y retry.
  4. Los resultados (successful y failed) se reúnen en un history/trace unificado.
  5. Si una tarea crítica no se completa, el run se detiene.
  6. Si están los hechos mínimos requeridos, el LLM compone un informe operativo final corto.

El LLM devuelve intent (plan), tratado como input no confiable: el policy boundary primero lo valida y luego (si está permitido) lanza workers.


Estructura del proyecto

TEXT
examples/
└── agent-patterns/
    └── orchestrator-agent/
        └── python/
            ├── main.py           # Plan -> Dispatch (parallel) -> Aggregate -> Finalize
            ├── llm.py            # planner + final synthesis
            ├── gateway.py        # policy boundary: validation + timeout/retry + budgets
            ├── workers.py        # deterministic specialists (sales/payments/inventory)
            └── requirements.txt

Cómo ejecutar

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/orchestrator-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Se requiere Python 3.11+.

Opción con export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Opción con .env (opcional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Esta es la variante de shell (macOS/Linux). En Windows es más fácil usar variables de entorno con set o, si lo prefieres, python-dotenv para cargar .env automáticamente.


Tarea

Imagina un caso real de producción:

"Prepara un informe operativo matutino para el mercado US del 2026-02-26: ventas, incidentes de pago y riesgos de inventario."

El agente no debe hacerlo todo en un solo flujo. Debe:

  • dividir la tarea en subtareas independientes
  • ejecutar subtareas en paralelo
  • relanzar una tarea con timeout una vez
  • producir un único brief final corto

Solución

Aquí el Orchestrator funciona como coordinador:

  • el LLM genera un plan con tareas para workers especializados
  • la capa de policy revisa el plan y bloquea tareas no permitidas
  • el gateway ejecuta workers en paralelo con timeout/retry
  • los resultados se recopilan en un modelo agregado
  • el texto final corto se genera solo después de recopilar hechos

Esto no es ReAct ni Routing:

  • no es ReAct, porque el foco no está en un ciclo secuencial Think -> Act de un solo agente
  • no es Routing, porque aquí se necesitan varios ejecutores a la vez, no un solo target

Código

workers.py — ejecutores especializados

PYTHON
from __future__ import annotations

import time
from typing import Any

SALES_METRICS = {
    "2026-02-26:US": {
        "gross_sales_usd": 182_450.0,
        "orders": 4_820,
        "aov_usd": 37.85,
    }
}

PAYMENT_ALERTS = {
    "2026-02-26:US": {
        "failed_payment_rate": 0.023,
        "chargeback_alerts": 3,
        "gateway_incident": "none",
    }
}

INVENTORY_ALERTS = {
    "2026-02-26:US": {
        "low_stock_skus": ["SKU-4411", "SKU-8820"],
        "out_of_stock_skus": ["SKU-9033"],
        "restock_eta_days": 2,
    }
}

_ATTEMPT_STATE: dict[str, int] = {}


def _key(report_date: str, region: str) -> str:
    return f"{report_date}:{region.upper()}"


def sales_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    time.sleep(0.4)
    metrics = SALES_METRICS.get(_key(report_date, region))
    if not metrics:
        return {"status": "done", "worker": "sales_worker", "result": {"warning": "sales_data_missing"}}
    return {"status": "done", "worker": "sales_worker", "result": metrics}


def payments_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    # Simulate a transient timeout on the first attempt within one request_id.
    attempt_key = f"{request_id}:{_key(report_date, region)}:payments"
    _ATTEMPT_STATE[attempt_key] = _ATTEMPT_STATE.get(attempt_key, 0) + 1
    attempt = _ATTEMPT_STATE[attempt_key]

    if attempt == 1:
        time.sleep(2.6)  # Exceeds gateway timeout on purpose.
    else:
        time.sleep(0.3)

    alerts = PAYMENT_ALERTS.get(_key(report_date, region))
    if not alerts:
        return {"status": "done", "worker": "payments_worker", "result": {"warning": "payments_data_missing"}}
    return {"status": "done", "worker": "payments_worker", "result": alerts}


def inventory_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    time.sleep(0.5)
    alerts = INVENTORY_ALERTS.get(_key(report_date, region))
    if not alerts:
        return {"status": "done", "worker": "inventory_worker", "result": {"warning": "inventory_data_missing"}}
    return {"status": "done", "worker": "inventory_worker", "result": alerts}

Qué es lo más importante aquí (en palabras simples)

  • Cada worker es responsable solo de su parte de dominio.
  • payments_worker provoca intencionalmente timeout en el 1er intento para mostrar la retry-policy.
  • Los workers no toman decisiones de orquestación: solo devuelven hechos.

Contract entre capas en este ejemplo:

  • El LLM solo puede proponer un plan (kind="plan", tasks).
  • El Gateway valida el plan y rechaza tareas peligrosas/inválidas.
  • Las unknown keys en tareas se ignoran durante la normalización si están presentes los campos obligatorios.
  • El Gateway aplica runtime-policy: allowlist, timeout, retry, dispatch budget, deadline.
  • Los workers ejecutan solo su función de dominio y devuelven un dict estructurado.
  • Orchestrator decide si el run puede finalizarse (por ejemplo, tras failed critical task -> stop).

gateway.py — policy boundary (capa más importante)

PYTHON
from __future__ import annotations

import hashlib
import json
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_tasks: int = 4
    max_parallel: int = 3
    max_retries_per_task: int = 1
    max_dispatches: int = 8
    task_timeout_seconds: float = 2.0
    max_seconds: int = 25


def args_hash(args: dict[str, Any]) -> str:
    stable = json.dumps(args, ensure_ascii=True, sort_keys=True, separators=(",", ":"))
    return hashlib.sha256(stable.encode("utf-8")).hexdigest()[:12]


def validate_orchestration_plan(
    raw_plan: dict[str, Any], *, allowed_workers: set[str], max_tasks: int
) -> list[dict[str, Any]]:
    if not isinstance(raw_plan, dict):
        raise StopRun("invalid_plan:non_json")
    if raw_plan.get("kind") != "plan":
        raise StopRun("invalid_plan:kind")

    tasks = raw_plan.get("tasks")
    if not isinstance(tasks, list):
        raise StopRun("invalid_plan:tasks")
    if not (1 <= len(tasks) <= max_tasks):
        raise StopRun("invalid_plan:max_tasks")

    normalized: list[dict[str, Any]] = []
    seen_ids: set[str] = set()
    required_keys = {"id", "worker", "args", "critical"}

    for task in tasks:
        if not isinstance(task, dict):
            raise StopRun("invalid_plan:task_shape")
        if not required_keys.issubset(task.keys()):
            raise StopRun("invalid_plan:missing_keys")

        # Ignore unknown keys and keep only contract fields.
        task_id = task["id"]
        worker = task["worker"]
        args = task["args"]
        critical = task["critical"]

        if not isinstance(task_id, str) or not task_id.strip():
            raise StopRun("invalid_plan:task_id")
        if task_id in seen_ids:
            raise StopRun("invalid_plan:duplicate_task_id")
        seen_ids.add(task_id)

        if not isinstance(worker, str) or not worker.strip():
            raise StopRun("invalid_plan:worker")
        if worker not in allowed_workers:
            raise StopRun(f"invalid_plan:worker_not_allowed:{worker}")

        if not isinstance(args, dict):
            raise StopRun("invalid_plan:args")
        if not isinstance(critical, bool):
            raise StopRun("invalid_plan:critical")

        normalized.append(
            {
                "id": task_id.strip(),
                "worker": worker.strip(),
                "args": dict(args),
                "critical": critical,
            }
        )

    return normalized


class OrchestratorGateway:
    def __init__(
        self,
        *,
        allow: set[str],
        registry: dict[str, Callable[..., dict[str, Any]]],
        budget: Budget,
    ) -> None:
        self.allow = allow
        self.registry = registry
        self.budget = budget
        self.dispatches = 0
        self._lock = threading.Lock()

    def _consume_dispatch_budget(self) -> None:
        with self._lock:
            self.dispatches += 1
            if self.dispatches > self.budget.max_dispatches:
                raise StopRun("max_dispatches")

    def _call_once(
        self, worker_name: str, args: dict[str, Any], *, deadline_monotonic: float
    ) -> dict[str, Any]:
        if worker_name not in self.allow:
            raise StopRun(f"worker_denied:{worker_name}")
        fn = self.registry.get(worker_name)
        if fn is None:
            raise StopRun(f"worker_missing:{worker_name}")

        remaining = deadline_monotonic - time.monotonic()
        if remaining <= 0:
            raise StopRun("max_seconds")
        task_timeout = min(self.budget.task_timeout_seconds, max(0.01, remaining))

        with ThreadPoolExecutor(max_workers=1) as pool:
            future = pool.submit(fn, **args)
            try:
                result = future.result(timeout=task_timeout)
            except TimeoutError as exc:
                raise StopRun("task_timeout") from exc
            except TypeError as exc:
                raise StopRun(f"worker_bad_args:{worker_name}") from exc

        if not isinstance(result, dict):
            raise StopRun(f"worker_bad_result:{worker_name}")
        return result

    def _run_task_with_retry(
        self, task: dict[str, Any], request_id: str, deadline_monotonic: float
    ) -> dict[str, Any]:
        task_id = task["id"]
        worker_name = task["worker"]
        semantic_args = dict(task["args"])
        semantic_hash = args_hash(semantic_args)
        base_args = dict(semantic_args)
        base_args["request_id"] = request_id

        attempts_total = self.budget.max_retries_per_task + 1
        last_reason = "unknown"

        for attempt in range(1, attempts_total + 1):
            try:
                self._consume_dispatch_budget()
                observation = self._call_once(
                    worker_name, base_args, deadline_monotonic=deadline_monotonic
                )
                return {
                    "task_id": task_id,
                    "worker": worker_name,
                    "critical": task["critical"],
                    "status": "done",
                    "attempts_used": attempt,
                    "retried": attempt > 1,
                    "args_hash": semantic_hash,
                    "observation": observation,
                }
            except StopRun as exc:
                last_reason = exc.reason
                if exc.reason == "task_timeout" and attempt < attempts_total:
                    continue
                return {
                    "task_id": task_id,
                    "worker": worker_name,
                    "critical": task["critical"],
                    "status": "failed",
                    "attempts_used": attempt,
                    "retried": attempt > 1,
                    "args_hash": semantic_hash,
                    "stop_reason": last_reason,
                }

        return {
            "task_id": task_id,
            "worker": worker_name,
            "critical": task["critical"],
            "status": "failed",
            "attempts_used": attempts_total,
            "retried": True,
            "args_hash": semantic_hash,
            "stop_reason": last_reason,
        }

    def dispatch_parallel(
        self,
        tasks: list[dict[str, Any]],
        *,
        request_id: str,
        deadline_monotonic: float,
    ) -> list[dict[str, Any]]:
        if not tasks:
            return []

        indexed_tasks = list(enumerate(tasks))
        output: list[tuple[int, dict[str, Any]]] = []
        parallelism = min(self.budget.max_parallel, len(tasks))

        with ThreadPoolExecutor(max_workers=parallelism) as pool:
            future_to_idx = {
                pool.submit(
                    self._run_task_with_retry, task, request_id, deadline_monotonic
                ): idx
                for idx, task in indexed_tasks
            }
            remaining = deadline_monotonic - time.monotonic()
            if remaining <= 0:
                raise StopRun("max_seconds")
            try:
                for future in as_completed(future_to_idx, timeout=max(0.01, remaining)):
                    idx = future_to_idx[future]
                    output.append((idx, future.result()))
            except TimeoutError as exc:
                raise StopRun("max_seconds") from exc

        output.sort(key=lambda item: item[0])
        return [item[1] for item in output]

Qué es lo más importante aquí (en palabras simples)

  • El gateway es donde se implementa la execution-policy: allowlist, timeout, retry, dispatch budget.
  • validate_orchestration_plan(...) verifica el contrato obligatorio de campos e ignora keys extra.
  • dispatch_parallel(...) mantiene el paralelismo y además controla la deadline global (max_seconds) dentro del gateway.

llm.py — planificación y síntesis final

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


PLAN_SYSTEM_PROMPT = """
You are an orchestration planner.
Return only one JSON object in this exact shape:
{
  "kind": "plan",
  "tasks": [
    {"id":"t1","worker":"sales_worker","args":{"report_date":"YYYY-MM-DD","region":"US"},"critical":true}
  ]
}

Rules:
- Create 2 to 4 independent tasks.
- Use only workers from available_workers.
- Keep args minimal and valid for the selected worker.
- Mark task as critical=true only if final brief is impossible without it.
- Do not output markdown or extra keys.
""".strip()

FINAL_SYSTEM_PROMPT = """
You are an operations assistant.
Write a short final briefing in English for an e-commerce operations manager.
Include:
- headline health status (green/yellow/red)
- key sales metrics
- payment risk note
- inventory risk note
- one concrete next action
Use only evidence from orchestrator output.
""".strip()

WORKER_CATALOG = [
    {
        "name": "sales_worker",
        "description": "Provides sales KPIs for a date and region",
        "args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
    },
    {
        "name": "payments_worker",
        "description": "Provides payment failure and chargeback signals",
        "args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
    },
    {
        "name": "inventory_worker",
        "description": "Provides low-stock and out-of-stock risk signals",
        "args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
    },
]


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def create_plan(
    *,
    goal: str,
    report_date: str,
    region: str,
    max_tasks: int,
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "report_date": report_date,
        "region": region,
        "max_tasks": max_tasks,
        "available_workers": WORKER_CATALOG,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": PLAN_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_final_answer(*, goal: str, aggregate: dict[str, Any]) -> str:
    payload = {
        "goal": goal,
        "aggregate": aggregate,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            messages=[
                {"role": "system", "content": FINAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or ""
    text = text.strip()
    if not text:
        raise LLMEmpty("llm_empty")
    return text

Qué es lo más importante aquí (en palabras simples)

  • El LLM devuelve solo plan/answer, pero no lanza workers directamente.
  • El planner recibe max_tasks y available_workers, por lo que la selección ya queda limitada en la etapa de intent.
  • Ante non-JSON o timeout, se devuelven señales explícitas para una parada controlada.

main.py — Plan -> Dispatch (parallel) -> Aggregate -> Finalize

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from gateway import Budget, OrchestratorGateway, StopRun, validate_orchestration_plan
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from workers import inventory_worker, payments_worker, sales_worker

REPORT_DATE = "2026-02-26"
REGION = "US"
GOAL = (
    "Prepare a morning operations report for e-commerce region US on 2026-02-26. "
    "Include sales, payment risk, and inventory risk with one concrete action."
)

BUDGET = Budget(
    max_tasks=4,
    max_parallel=3,
    max_retries_per_task=1,
    max_dispatches=8,
    task_timeout_seconds=2.0,
    max_seconds=25,
)

WORKER_REGISTRY = {
    "sales_worker": sales_worker,
    "payments_worker": payments_worker,
    "inventory_worker": inventory_worker,
}

ALLOWED_WORKERS_POLICY = {"sales_worker", "payments_worker", "inventory_worker"}
INVENTORY_RUNTIME_ENABLED = True
ALLOWED_WORKERS_EXECUTION = (
    {"sales_worker", "payments_worker", "inventory_worker"}
    if INVENTORY_RUNTIME_ENABLED
    else {"sales_worker", "payments_worker"}
)
# Set INVENTORY_RUNTIME_ENABLED=False to get worker_denied:inventory_worker in trace.


def aggregate_results(task_results: list[dict[str, Any]]) -> dict[str, Any]:
    done_by_worker: dict[str, dict[str, Any]] = {}
    failed: list[dict[str, Any]] = []

    for item in task_results:
        if item["status"] == "done":
            done_by_worker[item["worker"]] = item["observation"]
        else:
            failed.append(
                {
                    "task_id": item["task_id"],
                    "worker": item["worker"],
                    "critical": item["critical"],
                    "stop_reason": item["stop_reason"],
                }
            )

    sales = done_by_worker.get("sales_worker", {}).get("result", {})
    payments = done_by_worker.get("payments_worker", {}).get("result", {})
    inventory = done_by_worker.get("inventory_worker", {}).get("result", {})

    health = "green"
    if payments.get("failed_payment_rate", 0) >= 0.03 or inventory.get("out_of_stock_skus"):
        health = "yellow"
    if payments.get("gateway_incident") not in (None, "none"):
        health = "red"

    return {
        "report_date": REPORT_DATE,
        "region": REGION,
        "health": health,
        "sales": sales,
        "payments": payments,
        "inventory": inventory,
        "failed_tasks": failed,
    }


def run_orchestrator(goal: str) -> dict[str, Any]:
    started = time.monotonic()
    deadline = started + BUDGET.max_seconds
    request_id = uuid.uuid4().hex[:10]

    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = OrchestratorGateway(
        allow=ALLOWED_WORKERS_EXECUTION,
        registry=WORKER_REGISTRY,
        budget=BUDGET,
    )

    try:
        raw_plan = create_plan(
            goal=goal,
            report_date=REPORT_DATE,
            region=REGION,
            max_tasks=BUDGET.max_tasks,
        )
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        tasks = validate_orchestration_plan(
            raw_plan,
            allowed_workers=ALLOWED_WORKERS_POLICY,
            max_tasks=BUDGET.max_tasks,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "plan",
            "raw_plan": raw_plan,
            "trace": trace,
            "history": history,
        }

    if time.monotonic() > deadline:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "dispatch",
            "plan": tasks,
            "trace": trace,
            "history": history,
        }

    try:
        task_results = gateway.dispatch_parallel(
            tasks,
            request_id=request_id,
            deadline_monotonic=deadline,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "dispatch",
            "plan": tasks,
            "trace": trace,
            "history": history,
        }

    for item in task_results:
        trace.append(
            {
                "task_id": item["task_id"],
                "worker": item["worker"],
                "status": item["status"],
                "attempts_used": item["attempts_used"],
                "retried": item["retried"],
                "args_hash": item["args_hash"],
                "stop_reason": item.get("stop_reason"),
                "critical": item["critical"],
            }
        )
        history.append(item)

    failed_critical = [
        item
        for item in task_results
        if item["status"] == "failed" and item["critical"]
    ]
    if failed_critical:
        return {
            "status": "stopped",
            "stop_reason": "critical_task_failed",
            "phase": "dispatch",
            "plan": tasks,
            "failed_critical": failed_critical,
            "trace": trace,
            "history": history,
        }

    aggregate = aggregate_results(task_results)

    if time.monotonic() > deadline:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "finalize",
            "aggregate": aggregate,
            "trace": trace,
            "history": history,
        }

    try:
        answer = compose_final_answer(goal=goal, aggregate=aggregate)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "finalize",
            "aggregate": aggregate,
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "phase": "finalize",
            "aggregate": aggregate,
            "trace": trace,
            "history": history,
        }

    return {
        "status": "ok",
        "stop_reason": "success",
        "answer": answer,
        "plan": tasks,
        "aggregate": aggregate,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_orchestrator(GOAL)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Qué es lo más importante aquí (en palabras simples)

  • run_orchestrator(...) gestiona el ciclo de vida completo del orchestration run.
  • La ejecución en paralelo ocurre solo en el gateway, no dentro de decisiones del LLM.
  • ALLOWED_WORKERS_POLICY y ALLOWED_WORKERS_EXECUTION pueden diferir (runtime feature-flag/tenant gate).
  • Hay una política clara: si falla una tarea crítica, la finalización no se ejecuta.
  • trace y history dan una auditoría transparente: qué se ejecutó, cuántos intentos, por qué falló.

requirements.txt

TEXT
openai==2.21.0

Ejemplo de salida

Debido a la simulación de timeout en payments_worker en el primer intento, un trace típico contiene retried=true.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "answer": "Morning Operations Report - US Region (2026-02-26): Health=yellow. Gross sales=$182,450, orders=4,820, AOV=$37.85. Payment failed rate=2.3% with 3 chargeback alerts. Inventory has one out-of-stock SKU (SKU-9033). Next action: prioritize immediate restock for SKU-9033.",
  "plan": [
    {
      "id": "t1",
      "worker": "sales_worker",
      "args": {"report_date": "2026-02-26", "region": "US"},
      "critical": true
    },
    {
      "id": "t2",
      "worker": "payments_worker",
      "args": {"report_date": "2026-02-26", "region": "US"},
      "critical": true
    },
    {
      "id": "t3",
      "worker": "inventory_worker",
      "args": {"report_date": "2026-02-26", "region": "US"},
      "critical": true
    }
  ],
  "aggregate": {
    "report_date": "2026-02-26",
    "region": "US",
    "health": "yellow",
    "sales": {"gross_sales_usd": 182450.0, "orders": 4820, "aov_usd": 37.85},
    "payments": {"failed_payment_rate": 0.023, "chargeback_alerts": 3, "gateway_incident": "none"},
    "inventory": {"low_stock_skus": ["SKU-4411", "SKU-8820"], "out_of_stock_skus": ["SKU-9033"], "restock_eta_days": 2}
  },
  "trace": [
    {
      "task_id": "t1",
      "worker": "sales_worker",
      "status": "done",
      "attempts_used": 1,
      "retried": false
    },
    {
      "task_id": "t2",
      "worker": "payments_worker",
      "status": "done",
      "attempts_used": 2,
      "retried": true
    },
    {
      "task_id": "t3",
      "worker": "inventory_worker",
      "status": "done",
      "attempts_used": 1,
      "retried": false
    }
  ],
  "history": [{...}]
}

Este es un ejemplo abreviado: history se muestra de forma resumida y el formato de answer puede variar ligeramente entre runs.

Si estableces INVENTORY_RUNTIME_ENABLED=False, aparecerá worker_denied:inventory_worker en trace - un ejemplo claro de la diferencia entre policy y execution allowlist.


stop_reason típicos

  • success - plan ejecutado, respuesta final generada
  • invalid_plan:* - el plan del LLM no pasó policy validation
  • invalid_plan:missing_keys - a una tarea le faltan campos obligatorios del contrato
  • llm_timeout - el LLM no respondió dentro de OPENAI_TIMEOUT_SECONDS
  • llm_empty - el LLM devolvió una respuesta final vacía
  • max_seconds - se superó el presupuesto total de tiempo del run
  • max_dispatches - se superó el límite de dispatch de subtareas (incluyendo retries)
  • worker_denied:<name> - worker no está en execution allowlist
  • worker_missing:<name> - worker ausente en registry
  • worker_bad_args:<name> - la tarea contiene argumentos inválidos
  • worker_bad_result:<name> - worker devolvió datos fuera del contrato
  • task_timeout - la subtarea no terminó dentro de task_timeout_seconds
  • critical_task_failed - falló al menos una subtarea crítica

Qué NO se muestra aquí

  • No hay auth/PII ni aislamiento de tenant.
  • No hay cola/job-runner persistente (RabbitMQ/SQS/Kafka).
  • No hay circuit breaker ni políticas backoff complejas.
  • No hay presupuestos de tokens/costo (cost guardrails).

Qué probar después

  • Haz payments_worker no crítico (critical=false) y mira cómo el run devuelve partial-result.
  • Establece INVENTORY_RUNTIME_ENABLED=False y verifica worker_denied:inventory_worker.
  • Reduce task_timeout_seconds a 0.2 y comprueba cómo aumenta la cantidad de timeout/retry.
  • Agrega un cuarto worker (fraud_worker) y compara el tiempo de run con max_parallel=2 y max_parallel=4.
⏱️ 16 min de lecturaActualizado Mar, 2026Dificultad: ★★☆
Integrado: control en producciónOnceOnly
Guardrails para agentes con tool-calling
Lleva este patrón a producción con gobernanza:
  • Presupuestos (pasos / topes de gasto)
  • Permisos de herramientas (allowlist / blocklist)
  • Kill switch y parada por incidente
  • Idempotencia y dedupe
  • Audit logs y trazabilidad
Mención integrada: OnceOnly es una capa de control para sistemas de agentes en producción.
Autor

Esta documentación está curada y mantenida por ingenieros que despliegan agentes de IA en producción.

El contenido es asistido por IA, con responsabilidad editorial humana sobre la exactitud, la claridad y la relevancia en producción.

Los patrones y las recomendaciones se basan en post-mortems, modos de fallo e incidentes operativos en sistemas desplegados, incluido durante el desarrollo y la operación de infraestructura de gobernanza para agentes en OnceOnly.