Orchestrator Agent — Python (exemple de production simple avec LLM)

Exemple exécutable d’agent Orchestrator en Python, style production, avec plan de sous-tâches, dispatch parallèle, politique timeout/retry et agrégation finale.
Sur cette page
  1. Essence du pattern (bref)
  2. Ce que cet exemple démontre
  3. Architecture
  4. Structure du projet
  5. Lancer le projet
  6. Tâche
  7. Solution
  8. Code
  9. workers.py — exécuteurs spécialisés
  10. gateway.py — policy boundary (couche la plus importante)
  11. llm.py — planification et synthèse finale
  12. main.py — Plan -> Dispatch (parallel) -> Aggregate -> Finalize
  13. requirements.txt
  14. Exemple de sortie
  15. stop_reason courants
  16. Ce qui N’est PAS montré ici
  17. À essayer ensuite

Essence du pattern (bref)

Orchestrator Agent est un pattern dans lequel l’agent n’exécute pas lui-même le travail métier, mais coordonne plusieurs exécuteurs spécialisés.

Le LLM décide quelles sous-tâches lancer (quoi faire), tandis que la couche policy contrôle comment les lancer en sécurité (ce qui est autorisé, combien attendre, quand retry, quelles limites).


Ce que cet exemple démontre

  • étape Plan séparée pour construire l’ensemble de sous-tâches
  • Dispatch parallèle des sous-tâches (ThreadPoolExecutor)
  • policy boundary entre orchestration decision (LLM) et worker execution
  • validation stricte du plan (kind, tasks, shape de tâche)
  • allowlists séparées pour les couches policy et execution
  • timeout par sous-tâche + retry_once pour transient timeout
  • politique de résultats partiels : une tâche non critique peut échouer, mais le run continue
  • valeurs stop_reason explicites pour le débogage et le monitoring production

Architecture

  1. Le LLM reçoit le goal et renvoie un plan d’orchestration en JSON (kind="plan", tasks).
  2. La policy boundary valide le plan et rejette les workers/arguments non autorisés.
  3. OrchestratorGateway lance les sous-tâches en parallèle avec timeout et retry.
  4. Les résultats (successful et failed) sont réunis dans un history/trace unique.
  5. Si une tâche critique ne se termine pas, le run s’arrête.
  6. Si les faits minimaux requis sont présents, le LLM compose un court rapport opérationnel final.

Le LLM renvoie un intent (plan), traité comme un input non fiable : la policy boundary le valide d’abord puis (si autorisé) lance les workers.


Structure du projet

TEXT
examples/
└── agent-patterns/
    └── orchestrator-agent/
        └── python/
            ├── main.py           # Plan -> Dispatch (parallel) -> Aggregate -> Finalize
            ├── llm.py            # planner + final synthesis
            ├── gateway.py        # policy boundary: validation + timeout/retry + budgets
            ├── workers.py        # deterministic specialists (sales/payments/inventory)
            └── requirements.txt

Lancer le projet

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/orchestrator-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ est requis.

Option via export :

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Option via .env (optionnel)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

C’est la variante shell (macOS/Linux). Sous Windows, il est plus simple d’utiliser des variables d’environnement set ou, si souhaité, python-dotenv pour charger .env automatiquement.


Tâche

Imagine un cas de production réel :

"Prépare un rapport opérationnel du matin pour le marché US au 2026-02-26 : ventes, incidents de paiement et risques de stock."

L’agent ne doit pas tout faire dans un seul flux. Il doit :

  • découper la tâche en sous-tâches indépendantes
  • lancer les sous-tâches en parallèle
  • relancer une fois une tâche en timeout
  • produire un brief final court unique

Solution

Ici, l’Orchestrator fonctionne comme coordinateur :

  • le LLM génère un plan avec des tâches pour des workers spécialisés
  • la couche policy vérifie le plan et bloque les tâches non autorisées
  • le gateway exécute les workers en parallèle avec timeout/retry
  • les résultats sont rassemblés dans un modèle d’agrégation
  • le texte final court est généré uniquement après collecte des faits

Ce n’est ni ReAct ni Routing :

  • pas ReAct, car l’accent n’est pas sur une boucle séquentielle Think -> Act d’un seul agent
  • pas Routing, car ici il faut plusieurs exécuteurs en même temps, pas une seule target

Code

workers.py — exécuteurs spécialisés

PYTHON
from __future__ import annotations

import time
from typing import Any

SALES_METRICS = {
    "2026-02-26:US": {
        "gross_sales_usd": 182_450.0,
        "orders": 4_820,
        "aov_usd": 37.85,
    }
}

PAYMENT_ALERTS = {
    "2026-02-26:US": {
        "failed_payment_rate": 0.023,
        "chargeback_alerts": 3,
        "gateway_incident": "none",
    }
}

INVENTORY_ALERTS = {
    "2026-02-26:US": {
        "low_stock_skus": ["SKU-4411", "SKU-8820"],
        "out_of_stock_skus": ["SKU-9033"],
        "restock_eta_days": 2,
    }
}

_ATTEMPT_STATE: dict[str, int] = {}


def _key(report_date: str, region: str) -> str:
    return f"{report_date}:{region.upper()}"


def sales_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    time.sleep(0.4)
    metrics = SALES_METRICS.get(_key(report_date, region))
    if not metrics:
        return {"status": "done", "worker": "sales_worker", "result": {"warning": "sales_data_missing"}}
    return {"status": "done", "worker": "sales_worker", "result": metrics}


def payments_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    # Simulate a transient timeout on the first attempt within one request_id.
    attempt_key = f"{request_id}:{_key(report_date, region)}:payments"
    _ATTEMPT_STATE[attempt_key] = _ATTEMPT_STATE.get(attempt_key, 0) + 1
    attempt = _ATTEMPT_STATE[attempt_key]

    if attempt == 1:
        time.sleep(2.6)  # Exceeds gateway timeout on purpose.
    else:
        time.sleep(0.3)

    alerts = PAYMENT_ALERTS.get(_key(report_date, region))
    if not alerts:
        return {"status": "done", "worker": "payments_worker", "result": {"warning": "payments_data_missing"}}
    return {"status": "done", "worker": "payments_worker", "result": alerts}


def inventory_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    time.sleep(0.5)
    alerts = INVENTORY_ALERTS.get(_key(report_date, region))
    if not alerts:
        return {"status": "done", "worker": "inventory_worker", "result": {"warning": "inventory_data_missing"}}
    return {"status": "done", "worker": "inventory_worker", "result": alerts}

Ce qui est le plus important ici (en mots simples)

  • Chaque worker ne couvre que sa partie métier.
  • payments_worker produit volontairement un timeout à la 1re tentative pour montrer la retry-policy.
  • Les workers ne prennent pas de décisions d’orchestration : ils renvoient seulement des faits.

Contract entre couches dans cet exemple :

  • Le LLM peut seulement proposer un plan (kind="plan", tasks).
  • Le Gateway valide le plan et rejette les tâches dangereuses/non valides.
  • Les unknown keys dans les tâches sont ignorées pendant la normalisation si les champs obligatoires sont présents.
  • Le Gateway applique la runtime-policy : allowlist, timeout, retry, dispatch budget, deadline.
  • Les workers exécutent uniquement leur fonction métier et renvoient un dict structuré.
  • L’Orchestrator décide si le run peut être finalisé (par exemple, après failed critical task -> stop).

gateway.py — policy boundary (couche la plus importante)

PYTHON
from __future__ import annotations

import hashlib
import json
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_tasks: int = 4
    max_parallel: int = 3
    max_retries_per_task: int = 1
    max_dispatches: int = 8
    task_timeout_seconds: float = 2.0
    max_seconds: int = 25


def args_hash(args: dict[str, Any]) -> str:
    stable = json.dumps(args, ensure_ascii=True, sort_keys=True, separators=(",", ":"))
    return hashlib.sha256(stable.encode("utf-8")).hexdigest()[:12]


def validate_orchestration_plan(
    raw_plan: dict[str, Any], *, allowed_workers: set[str], max_tasks: int
) -> list[dict[str, Any]]:
    if not isinstance(raw_plan, dict):
        raise StopRun("invalid_plan:non_json")
    if raw_plan.get("kind") != "plan":
        raise StopRun("invalid_plan:kind")

    tasks = raw_plan.get("tasks")
    if not isinstance(tasks, list):
        raise StopRun("invalid_plan:tasks")
    if not (1 <= len(tasks) <= max_tasks):
        raise StopRun("invalid_plan:max_tasks")

    normalized: list[dict[str, Any]] = []
    seen_ids: set[str] = set()
    required_keys = {"id", "worker", "args", "critical"}

    for task in tasks:
        if not isinstance(task, dict):
            raise StopRun("invalid_plan:task_shape")
        if not required_keys.issubset(task.keys()):
            raise StopRun("invalid_plan:missing_keys")

        # Ignore unknown keys and keep only contract fields.
        task_id = task["id"]
        worker = task["worker"]
        args = task["args"]
        critical = task["critical"]

        if not isinstance(task_id, str) or not task_id.strip():
            raise StopRun("invalid_plan:task_id")
        if task_id in seen_ids:
            raise StopRun("invalid_plan:duplicate_task_id")
        seen_ids.add(task_id)

        if not isinstance(worker, str) or not worker.strip():
            raise StopRun("invalid_plan:worker")
        if worker not in allowed_workers:
            raise StopRun(f"invalid_plan:worker_not_allowed:{worker}")

        if not isinstance(args, dict):
            raise StopRun("invalid_plan:args")
        if not isinstance(critical, bool):
            raise StopRun("invalid_plan:critical")

        normalized.append(
            {
                "id": task_id.strip(),
                "worker": worker.strip(),
                "args": dict(args),
                "critical": critical,
            }
        )

    return normalized


class OrchestratorGateway:
    def __init__(
        self,
        *,
        allow: set[str],
        registry: dict[str, Callable[..., dict[str, Any]]],
        budget: Budget,
    ) -> None:
        self.allow = allow
        self.registry = registry
        self.budget = budget
        self.dispatches = 0
        self._lock = threading.Lock()

    def _consume_dispatch_budget(self) -> None:
        with self._lock:
            self.dispatches += 1
            if self.dispatches > self.budget.max_dispatches:
                raise StopRun("max_dispatches")

    def _call_once(
        self, worker_name: str, args: dict[str, Any], *, deadline_monotonic: float
    ) -> dict[str, Any]:
        if worker_name not in self.allow:
            raise StopRun(f"worker_denied:{worker_name}")
        fn = self.registry.get(worker_name)
        if fn is None:
            raise StopRun(f"worker_missing:{worker_name}")

        remaining = deadline_monotonic - time.monotonic()
        if remaining <= 0:
            raise StopRun("max_seconds")
        task_timeout = min(self.budget.task_timeout_seconds, max(0.01, remaining))

        with ThreadPoolExecutor(max_workers=1) as pool:
            future = pool.submit(fn, **args)
            try:
                result = future.result(timeout=task_timeout)
            except TimeoutError as exc:
                raise StopRun("task_timeout") from exc
            except TypeError as exc:
                raise StopRun(f"worker_bad_args:{worker_name}") from exc

        if not isinstance(result, dict):
            raise StopRun(f"worker_bad_result:{worker_name}")
        return result

    def _run_task_with_retry(
        self, task: dict[str, Any], request_id: str, deadline_monotonic: float
    ) -> dict[str, Any]:
        task_id = task["id"]
        worker_name = task["worker"]
        semantic_args = dict(task["args"])
        semantic_hash = args_hash(semantic_args)
        base_args = dict(semantic_args)
        base_args["request_id"] = request_id

        attempts_total = self.budget.max_retries_per_task + 1
        last_reason = "unknown"

        for attempt in range(1, attempts_total + 1):
            try:
                self._consume_dispatch_budget()
                observation = self._call_once(
                    worker_name, base_args, deadline_monotonic=deadline_monotonic
                )
                return {
                    "task_id": task_id,
                    "worker": worker_name,
                    "critical": task["critical"],
                    "status": "done",
                    "attempts_used": attempt,
                    "retried": attempt > 1,
                    "args_hash": semantic_hash,
                    "observation": observation,
                }
            except StopRun as exc:
                last_reason = exc.reason
                if exc.reason == "task_timeout" and attempt < attempts_total:
                    continue
                return {
                    "task_id": task_id,
                    "worker": worker_name,
                    "critical": task["critical"],
                    "status": "failed",
                    "attempts_used": attempt,
                    "retried": attempt > 1,
                    "args_hash": semantic_hash,
                    "stop_reason": last_reason,
                }

        return {
            "task_id": task_id,
            "worker": worker_name,
            "critical": task["critical"],
            "status": "failed",
            "attempts_used": attempts_total,
            "retried": True,
            "args_hash": semantic_hash,
            "stop_reason": last_reason,
        }

    def dispatch_parallel(
        self,
        tasks: list[dict[str, Any]],
        *,
        request_id: str,
        deadline_monotonic: float,
    ) -> list[dict[str, Any]]:
        if not tasks:
            return []

        indexed_tasks = list(enumerate(tasks))
        output: list[tuple[int, dict[str, Any]]] = []
        parallelism = min(self.budget.max_parallel, len(tasks))

        with ThreadPoolExecutor(max_workers=parallelism) as pool:
            future_to_idx = {
                pool.submit(
                    self._run_task_with_retry, task, request_id, deadline_monotonic
                ): idx
                for idx, task in indexed_tasks
            }
            remaining = deadline_monotonic - time.monotonic()
            if remaining <= 0:
                raise StopRun("max_seconds")
            try:
                for future in as_completed(future_to_idx, timeout=max(0.01, remaining)):
                    idx = future_to_idx[future]
                    output.append((idx, future.result()))
            except TimeoutError as exc:
                raise StopRun("max_seconds") from exc

        output.sort(key=lambda item: item[0])
        return [item[1] for item in output]

Ce qui est le plus important ici (en mots simples)

  • C’est le gateway qui implémente l’execution-policy : allowlist, timeout, retry, dispatch budget.
  • validate_orchestration_plan(...) vérifie le contrat obligatoire des champs et ignore les clés en trop.
  • dispatch_parallel(...) conserve le parallélisme et contrôle aussi la deadline globale (max_seconds) à l’intérieur du gateway.

llm.py — planification et synthèse finale

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


PLAN_SYSTEM_PROMPT = """
You are an orchestration planner.
Return only one JSON object in this exact shape:
{
  "kind": "plan",
  "tasks": [
    {"id":"t1","worker":"sales_worker","args":{"report_date":"YYYY-MM-DD","region":"US"},"critical":true}
  ]
}

Rules:
- Create 2 to 4 independent tasks.
- Use only workers from available_workers.
- Keep args minimal and valid for the selected worker.
- Mark task as critical=true only if final brief is impossible without it.
- Do not output markdown or extra keys.
""".strip()

FINAL_SYSTEM_PROMPT = """
You are an operations assistant.
Write a short final briefing in English for an e-commerce operations manager.
Include:
- headline health status (green/yellow/red)
- key sales metrics
- payment risk note
- inventory risk note
- one concrete next action
Use only evidence from orchestrator output.
""".strip()

WORKER_CATALOG = [
    {
        "name": "sales_worker",
        "description": "Provides sales KPIs for a date and region",
        "args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
    },
    {
        "name": "payments_worker",
        "description": "Provides payment failure and chargeback signals",
        "args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
    },
    {
        "name": "inventory_worker",
        "description": "Provides low-stock and out-of-stock risk signals",
        "args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
    },
]


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def create_plan(
    *,
    goal: str,
    report_date: str,
    region: str,
    max_tasks: int,
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "report_date": report_date,
        "region": region,
        "max_tasks": max_tasks,
        "available_workers": WORKER_CATALOG,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": PLAN_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_final_answer(*, goal: str, aggregate: dict[str, Any]) -> str:
    payload = {
        "goal": goal,
        "aggregate": aggregate,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            messages=[
                {"role": "system", "content": FINAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or ""
    text = text.strip()
    if not text:
        raise LLMEmpty("llm_empty")
    return text

Ce qui est le plus important ici (en mots simples)

  • Le LLM renvoie seulement plan/answer, mais ne lance pas les workers directement.
  • Le planner reçoit max_tasks et available_workers, donc le choix est déjà contraint au stade intent.
  • En cas de non-JSON ou timeout, des signaux explicites sont renvoyés pour un arrêt contrôlé.

main.py — Plan -> Dispatch (parallel) -> Aggregate -> Finalize

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from gateway import Budget, OrchestratorGateway, StopRun, validate_orchestration_plan
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from workers import inventory_worker, payments_worker, sales_worker

REPORT_DATE = "2026-02-26"
REGION = "US"
GOAL = (
    "Prepare a morning operations report for e-commerce region US on 2026-02-26. "
    "Include sales, payment risk, and inventory risk with one concrete action."
)

BUDGET = Budget(
    max_tasks=4,
    max_parallel=3,
    max_retries_per_task=1,
    max_dispatches=8,
    task_timeout_seconds=2.0,
    max_seconds=25,
)

WORKER_REGISTRY = {
    "sales_worker": sales_worker,
    "payments_worker": payments_worker,
    "inventory_worker": inventory_worker,
}

ALLOWED_WORKERS_POLICY = {"sales_worker", "payments_worker", "inventory_worker"}
INVENTORY_RUNTIME_ENABLED = True
ALLOWED_WORKERS_EXECUTION = (
    {"sales_worker", "payments_worker", "inventory_worker"}
    if INVENTORY_RUNTIME_ENABLED
    else {"sales_worker", "payments_worker"}
)
# Set INVENTORY_RUNTIME_ENABLED=False to get worker_denied:inventory_worker in trace.


def aggregate_results(task_results: list[dict[str, Any]]) -> dict[str, Any]:
    done_by_worker: dict[str, dict[str, Any]] = {}
    failed: list[dict[str, Any]] = []

    for item in task_results:
        if item["status"] == "done":
            done_by_worker[item["worker"]] = item["observation"]
        else:
            failed.append(
                {
                    "task_id": item["task_id"],
                    "worker": item["worker"],
                    "critical": item["critical"],
                    "stop_reason": item["stop_reason"],
                }
            )

    sales = done_by_worker.get("sales_worker", {}).get("result", {})
    payments = done_by_worker.get("payments_worker", {}).get("result", {})
    inventory = done_by_worker.get("inventory_worker", {}).get("result", {})

    health = "green"
    if payments.get("failed_payment_rate", 0) >= 0.03 or inventory.get("out_of_stock_skus"):
        health = "yellow"
    if payments.get("gateway_incident") not in (None, "none"):
        health = "red"

    return {
        "report_date": REPORT_DATE,
        "region": REGION,
        "health": health,
        "sales": sales,
        "payments": payments,
        "inventory": inventory,
        "failed_tasks": failed,
    }


def run_orchestrator(goal: str) -> dict[str, Any]:
    started = time.monotonic()
    deadline = started + BUDGET.max_seconds
    request_id = uuid.uuid4().hex[:10]

    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = OrchestratorGateway(
        allow=ALLOWED_WORKERS_EXECUTION,
        registry=WORKER_REGISTRY,
        budget=BUDGET,
    )

    try:
        raw_plan = create_plan(
            goal=goal,
            report_date=REPORT_DATE,
            region=REGION,
            max_tasks=BUDGET.max_tasks,
        )
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        tasks = validate_orchestration_plan(
            raw_plan,
            allowed_workers=ALLOWED_WORKERS_POLICY,
            max_tasks=BUDGET.max_tasks,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "plan",
            "raw_plan": raw_plan,
            "trace": trace,
            "history": history,
        }

    if time.monotonic() > deadline:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "dispatch",
            "plan": tasks,
            "trace": trace,
            "history": history,
        }

    try:
        task_results = gateway.dispatch_parallel(
            tasks,
            request_id=request_id,
            deadline_monotonic=deadline,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "dispatch",
            "plan": tasks,
            "trace": trace,
            "history": history,
        }

    for item in task_results:
        trace.append(
            {
                "task_id": item["task_id"],
                "worker": item["worker"],
                "status": item["status"],
                "attempts_used": item["attempts_used"],
                "retried": item["retried"],
                "args_hash": item["args_hash"],
                "stop_reason": item.get("stop_reason"),
                "critical": item["critical"],
            }
        )
        history.append(item)

    failed_critical = [
        item
        for item in task_results
        if item["status"] == "failed" and item["critical"]
    ]
    if failed_critical:
        return {
            "status": "stopped",
            "stop_reason": "critical_task_failed",
            "phase": "dispatch",
            "plan": tasks,
            "failed_critical": failed_critical,
            "trace": trace,
            "history": history,
        }

    aggregate = aggregate_results(task_results)

    if time.monotonic() > deadline:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "finalize",
            "aggregate": aggregate,
            "trace": trace,
            "history": history,
        }

    try:
        answer = compose_final_answer(goal=goal, aggregate=aggregate)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "finalize",
            "aggregate": aggregate,
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "phase": "finalize",
            "aggregate": aggregate,
            "trace": trace,
            "history": history,
        }

    return {
        "status": "ok",
        "stop_reason": "success",
        "answer": answer,
        "plan": tasks,
        "aggregate": aggregate,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_orchestrator(GOAL)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Ce qui est le plus important ici (en mots simples)

  • run_orchestrator(...) gère le cycle de vie complet d’un run d’orchestration.
  • L’exécution parallèle a lieu uniquement dans le gateway, pas dans les décisions LLM.
  • ALLOWED_WORKERS_POLICY et ALLOWED_WORKERS_EXECUTION peuvent différer (runtime feature-flag/tenant gate).
  • Politique claire : si une tâche critique échoue, la finalisation ne démarre pas.
  • trace et history donnent un audit transparent : qui a été lancé, combien de tentatives, pourquoi échec.

requirements.txt

TEXT
openai==2.21.0

Exemple de sortie

À cause de la simulation de timeout dans payments_worker à la première tentative, un trace typique contient retried=true.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "answer": "Morning Operations Report - US Region (2026-02-26): Health=yellow. Gross sales=$182,450, orders=4,820, AOV=$37.85. Payment failed rate=2.3% with 3 chargeback alerts. Inventory has one out-of-stock SKU (SKU-9033). Next action: prioritize immediate restock for SKU-9033.",
  "plan": [
    {
      "id": "t1",
      "worker": "sales_worker",
      "args": {"report_date": "2026-02-26", "region": "US"},
      "critical": true
    },
    {
      "id": "t2",
      "worker": "payments_worker",
      "args": {"report_date": "2026-02-26", "region": "US"},
      "critical": true
    },
    {
      "id": "t3",
      "worker": "inventory_worker",
      "args": {"report_date": "2026-02-26", "region": "US"},
      "critical": true
    }
  ],
  "aggregate": {
    "report_date": "2026-02-26",
    "region": "US",
    "health": "yellow",
    "sales": {"gross_sales_usd": 182450.0, "orders": 4820, "aov_usd": 37.85},
    "payments": {"failed_payment_rate": 0.023, "chargeback_alerts": 3, "gateway_incident": "none"},
    "inventory": {"low_stock_skus": ["SKU-4411", "SKU-8820"], "out_of_stock_skus": ["SKU-9033"], "restock_eta_days": 2}
  },
  "trace": [
    {
      "task_id": "t1",
      "worker": "sales_worker",
      "status": "done",
      "attempts_used": 1,
      "retried": false
    },
    {
      "task_id": "t2",
      "worker": "payments_worker",
      "status": "done",
      "attempts_used": 2,
      "retried": true
    },
    {
      "task_id": "t3",
      "worker": "inventory_worker",
      "status": "done",
      "attempts_used": 1,
      "retried": false
    }
  ],
  "history": [{...}]
}

Ceci est un exemple abrégé : history est affiché de façon concise, et le format de answer peut légèrement varier entre les runs.

Si vous définissez INVENTORY_RUNTIME_ENABLED=False, worker_denied:inventory_worker apparaît dans trace - exemple clair de la différence entre allowlist policy et execution.


stop_reason courants

  • success - plan exécuté, réponse finale générée
  • invalid_plan:* - le plan LLM n’a pas passé la policy validation
  • invalid_plan:missing_keys - il manque des champs obligatoires du contrat dans une tâche
  • llm_timeout - le LLM n’a pas répondu dans OPENAI_TIMEOUT_SECONDS
  • llm_empty - le LLM a renvoyé une réponse finale vide
  • max_seconds - le budget temps global du run est dépassé
  • max_dispatches - limite de lancements de sous-tâches dépassée (retries inclus)
  • worker_denied:<name> - worker absent de l’execution allowlist
  • worker_missing:<name> - worker absent du registry
  • worker_bad_args:<name> - la tâche contient des arguments invalides
  • worker_bad_result:<name> - le worker a renvoyé des données hors contrat
  • task_timeout - sous-tâche hors délai task_timeout_seconds
  • critical_task_failed - au moins une sous-tâche critique a échoué

Ce qui N’est PAS montré ici

  • Pas d’auth/PII ni d’isolation tenant.
  • Pas de file/job-runner persistant (RabbitMQ/SQS/Kafka).
  • Pas de circuit breaker ni de politiques backoff complexes.
  • Pas de budgets token/coût (cost guardrails).

À essayer ensuite

  • Rendez payments_worker non critique (critical=false) et voyez comment le run renvoie un résultat partiel.
  • Définissez INVENTORY_RUNTIME_ENABLED=False et vérifiez worker_denied:inventory_worker.
  • Réduisez task_timeout_seconds à 0.2 et observez l’augmentation des timeout/retry.
  • Ajoutez un quatrième worker (fraud_worker) et comparez le temps de run avec max_parallel=2 et max_parallel=4.
⏱️ 16 min de lectureMis à jour Mars, 2026Difficulté: ★★☆
Intégré : contrôle en productionOnceOnly
Ajoutez des garde-fous aux agents tool-calling
Livrez ce pattern avec de la gouvernance :
  • Budgets (steps / plafonds de coût)
  • Permissions outils (allowlist / blocklist)
  • Kill switch & arrêt incident
  • Idempotence & déduplication
  • Audit logs & traçabilité
Mention intégrée : OnceOnly est une couche de contrôle pour des systèmes d’agents en prod.
Auteur

Cette documentation est organisée et maintenue par des ingénieurs qui déploient des agents IA en production.

Le contenu est assisté par l’IA, avec une responsabilité éditoriale humaine quant à l’exactitude, la clarté et la pertinence en production.

Les patterns et recommandations s’appuient sur des post-mortems, des modes de défaillance et des incidents opérationnels dans des systèmes déployés, notamment lors du développement et de l’exploitation d’une infrastructure de gouvernance pour les agents chez OnceOnly.