Pattern Essence (Brief)
Routing Agent is a pattern where the agent does not execute the task directly, but chooses the best specialized executor for a specific request type.
LLM makes the route decision, while execution is performed only by the execution layer through a policy boundary.
What this example demonstrates
- separate
Routestage before execution - policy boundary between routing decision (LLM) and workers (execution layer)
- strict validation for route-action (
kind,target,args, allowed keys) - allowlist (deny by default) for routing
- fallback through
needs_reroutewith a limited number of attempts - run budgets:
max_route_attempts,max_delegations,max_seconds - explicit
stop_reasonvalues for debugging, alerts, and production monitoring raw_routein the response if LLM returned invalid route JSON
Architecture
- LLM receives the goal and returns route-intent in JSON (
kind="route",target,args). - Policy boundary validates route as untrusted input (including required
args.ticket). - RouteGateway delegates the task to the selected worker (
allowlist, budgets, loop detection). - Observation is added to
historyand becomes evidence for the next route attempt (if reroute is needed). - If the previous attempt had
needs_reroute, policy does not allow repeating the same target. - When a worker returns
done, a separateFinalizeLLM step composes the final answer without calling workers.
LLM returns intent (route JSON), treated as untrusted input: policy boundary validates it first and then (if allowed) calls workers.
Allowlist is applied twice: in route validation (invalid_route:route_not_allowed:*) and in execution (route_denied:*).
This keeps Routing controllable: the agent selects an executor, and execution goes through a controlled layer.
Project structure
examples/
βββ agent-patterns/
βββ routing-agent/
βββ python/
βββ main.py # Route -> Delegate -> (optional reroute) -> Finalize
βββ llm.py # router + final synthesis
βββ gateway.py # policy boundary: route validation + delegation control
βββ tools.py # deterministic specialists (billing/technical/sales)
βββ requirements.txt
How to run
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/routing-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ is required.
Option via export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Option via .env (optional)
cat > .env <<'EOF_ENV'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF_ENV
set -a
source .env
set +a
python main.py
This is the shell variant (macOS/Linux). On Windows it is easier to use environment set commands or, if desired, python-dotenv to load .env automatically.
Task
Imagine a user writes to support:
"I was charged for a subscription 10 days ago. Can I get a refund?"
The agent should not resolve this on its own. It must:
- understand the request type (
billing/technical/sales) - choose the right specialist
- delegate the task to a worker
- reroute if needed (
needs_reroute) - provide the final answer only after receiving the worker result
Solution
Here the agent does not "solve the issue itself". It only decides who should handle the request.
- the model decides where to route the request
- the system verifies that the route is allowed
- the specialist (worker) does the work
- if the route does not fit, the agent selects another one
- when a final result is available, the agent composes the final answer
- Not ReAct: because here you do not need many steps/tools, you need one correct executor choice.
- Not Orchestrator: because there are no parallel subtasks, there is one domain route for delegation.
Code
tools.py β specialized workers
from __future__ import annotations
from typing import Any
USERS = {
42: {"id": 42, "name": "Anna", "country": "US", "tier": "pro"},
7: {"id": 7, "name": "Max", "country": "US", "tier": "free"},
}
BILLING = {
42: {
"currency": "USD",
"plan": "pro_monthly",
"price_usd": 49.0,
"days_since_first_payment": 10,
},
7: {
"currency": "USD",
"plan": "free",
"price_usd": 0.0,
"days_since_first_payment": 120,
},
}
def _extract_user_id(ticket: str) -> int:
if "user_id=7" in ticket:
return 7
return 42
def _contains_any(text: str, keywords: list[str]) -> bool:
lowered = text.lower()
return any(keyword in lowered for keyword in keywords)
def billing_specialist(ticket: str) -> dict[str, Any]:
if not _contains_any(ticket, ["refund", "charge", "billing", "invoice"]):
return {
"status": "needs_reroute",
"reason": "ticket_not_billing",
"domain": "billing",
}
user_id = _extract_user_id(ticket)
user = USERS.get(user_id)
billing = BILLING.get(user_id)
if not user or not billing:
return {"status": "done", "domain": "billing", "error": "user_not_found"}
is_refundable = (
billing["plan"] == "pro_monthly" and billing["days_since_first_payment"] <= 14
)
refund_amount = billing["price_usd"] if is_refundable else 0.0
return {
"status": "done",
"domain": "billing",
"result": {
"user_name": user["name"],
"plan": billing["plan"],
"currency": billing["currency"],
"refund_eligible": is_refundable,
"refund_amount_usd": refund_amount,
"reason": "Pro monthly subscriptions are refundable within 14 days.",
},
}
def technical_specialist(ticket: str) -> dict[str, Any]:
if not _contains_any(ticket, ["error", "bug", "incident", "api", "latency"]):
return {
"status": "needs_reroute",
"reason": "ticket_not_technical",
"domain": "technical",
}
return {
"status": "done",
"domain": "technical",
"result": {
"incident_id": "INC-4021",
"service": "public-api",
"state": "mitigated",
"next_update_in_minutes": 30,
},
}
def sales_specialist(ticket: str) -> dict[str, Any]:
if not _contains_any(ticket, ["price", "pricing", "quote", "plan", "discount"]):
return {
"status": "needs_reroute",
"reason": "ticket_not_sales",
"domain": "sales",
}
return {
"status": "done",
"domain": "sales",
"result": {
"recommended_plan": "team_plus",
"currency": "USD",
"monthly_price_usd": 199.0,
"reason": "Best fit for teams that need priority support and usage controls.",
},
}
What matters most here (plain words)
- Workers are a deterministic execution layer and contain no LLM logic.
- Router decides whom to call, but does not execute domain business logic itself.
needs_rerouteprovides a safe signal for re-routing instead of a "fabricated" result.
gateway.py β policy boundary (the most important layer)
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_route_attempts: int = 3
max_delegations: int = 3
max_seconds: int = 60
def _stable_json(value: Any) -> str:
if value is None or isinstance(value, (bool, int, float, str)):
return json.dumps(value, ensure_ascii=True, sort_keys=True)
if isinstance(value, list):
return "[" + ",".join(_stable_json(item) for item in value) + "]"
if isinstance(value, dict):
parts = []
for key in sorted(value):
parts.append(
json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key])
)
return "{" + ",".join(parts) + "}"
return json.dumps(str(value), ensure_ascii=True)
def _normalize_for_hash(value: Any) -> Any:
if isinstance(value, str):
return " ".join(value.strip().split())
if isinstance(value, list):
return [_normalize_for_hash(item) for item in value]
if isinstance(value, dict):
return {str(key): _normalize_for_hash(value[key]) for key in sorted(value)}
return value
def _normalize_ticket(value: str) -> str:
return " ".join(value.strip().split())
def args_hash(args: dict[str, Any]) -> str:
normalized = _normalize_for_hash(args or {})
raw = _stable_json(normalized)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
def validate_route_action(
action: Any,
*,
allowed_routes: set[str],
previous_target: str | None = None,
previous_status: str | None = None,
) -> dict[str, Any]:
if not isinstance(action, dict):
raise StopRun("invalid_route:not_object")
kind = action.get("kind")
if kind == "invalid":
raise StopRun("invalid_route:non_json")
if kind != "route":
raise StopRun("invalid_route:bad_kind")
allowed_keys = {"kind", "target", "args"}
if set(action.keys()) - allowed_keys:
raise StopRun("invalid_route:extra_keys")
target = action.get("target")
if not isinstance(target, str) or not target.strip():
raise StopRun("invalid_route:missing_target")
target = target.strip()
if target not in allowed_routes:
raise StopRun(f"invalid_route:route_not_allowed:{target}")
args = action.get("args", {})
if args is None:
args = {}
if not isinstance(args, dict):
raise StopRun("invalid_route:bad_args")
ticket = args.get("ticket")
if not isinstance(ticket, str) or not ticket.strip():
raise StopRun("invalid_route:missing_ticket")
ticket = _normalize_ticket(ticket)
normalized_args = {**args, "ticket": ticket}
if previous_status == "needs_reroute" and target == previous_target:
raise StopRun("invalid_route:repeat_target_after_reroute")
return {"kind": "route", "target": target, "args": normalized_args}
class RouteGateway:
def __init__(
self,
*,
allow: set[str],
registry: dict[str, Callable[..., dict[str, Any]]],
budget: Budget,
):
self.allow = set(allow)
self.registry = registry
self.budget = budget
self.delegations = 0
self.seen_routes: set[str] = set()
def call(self, target: str, args: dict[str, Any]) -> dict[str, Any]:
self.delegations += 1
if self.delegations > self.budget.max_delegations:
raise StopRun("max_delegations")
if target not in self.allow:
raise StopRun(f"route_denied:{target}")
worker = self.registry.get(target)
if worker is None:
raise StopRun(f"route_missing:{target}")
signature = f"{target}:{args_hash(args)}"
if signature in self.seen_routes:
raise StopRun("loop_detected")
self.seen_routes.add(signature)
try:
return worker(**args)
except TypeError as exc:
raise StopRun(f"route_bad_args:{target}") from exc
except Exception as exc:
raise StopRun(f"route_error:{target}") from exc
What matters most here (plain words)
validate_route_action(...)is the governance/control layer for route decisions from LLM.- Route is treated as untrusted input and goes through strict validation (required
ticket,ticketnormalization, policy guard after reroute). RouteGateway.call(...)is theagent β executorboundary: router decides the route, gateway delegates safely to a worker.loop_detectedcatches exact repeats (target + args_hash), andargs_hashnormalizes whitespace in string arguments.
llm.py β routing decision + final synthesis
LLM sees only the catalog of available routes; if a route is not in allowlist, policy boundary stops the run.
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
ROUTER_SYSTEM_PROMPT = """
You are a routing decision engine.
Return only one JSON object in this exact shape:
{"kind":"route","target":"<route_name>","args":{"ticket":"..."}}
Rules:
- Choose exactly one target from available_routes.
- Never choose targets from forbidden_targets.
- Keep args minimal and valid for that target.
- If previous attempts failed with needs_reroute, choose a different target.
- Respect routing budgets and avoid unnecessary retries.
- Do not answer the user directly.
- Never output markdown or extra keys.
""".strip()
FINAL_SYSTEM_PROMPT = """
You are a support response assistant.
Write a short final answer in English for a US customer.
Use only evidence from delegated specialist observation.
Include: selected specialist, final decision, and one reason.
For billing refunds, include amount in USD when available.
""".strip()
ROUTE_CATALOG = [
{
"name": "billing_specialist",
"description": "Handle refunds, charges, invoices, and billing policy",
"args": {"ticket": "string"},
},
{
"name": "technical_specialist",
"description": "Handle errors, incidents, API issues, and outages",
"args": {"ticket": "string"},
},
{
"name": "sales_specialist",
"description": "Handle pricing, plan recommendations, and quotes",
"args": {"ticket": "string"},
},
]
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _build_state_summary(history: list[dict[str, Any]]) -> dict[str, Any]:
routes_used = [
step.get("route", {}).get("target")
for step in history
if isinstance(step, dict)
and isinstance(step.get("route"), dict)
and step.get("route", {}).get("kind") == "route"
]
routes_used_unique = list(dict.fromkeys(route for route in routes_used if route))
last_route_target = routes_used[-1] if routes_used else None
last_observation = history[-1].get("observation") if history else None
last_observation_status = (
last_observation.get("status") if isinstance(last_observation, dict) else None
)
return {
"attempts_completed": len(history),
"routes_used_unique": routes_used_unique,
"last_route_target": last_route_target,
"last_observation_status": last_observation_status,
"last_observation": last_observation,
}
def decide_route(
goal: str,
history: list[dict[str, Any]],
*,
max_route_attempts: int,
remaining_attempts: int,
forbidden_targets: list[str],
) -> dict[str, Any]:
recent_history = history[-3:]
payload = {
"goal": goal,
"budgets": {
"max_route_attempts": max_route_attempts,
"remaining_attempts": remaining_attempts,
},
"forbidden_targets": forbidden_targets,
"state_summary": _build_state_summary(history),
"recent_history": recent_history,
"available_routes": ROUTE_CATALOG,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": ROUTER_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_final_answer(
goal: str, selected_route: str, history: list[dict[str, Any]]
) -> str:
payload = {
"goal": goal,
"selected_route": selected_route,
"history": history,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or ""
text = text.strip()
if not text:
raise LLMEmpty("llm_empty")
return text
What matters most here (plain words)
decide_route(...)is the decision stage for choosing an executor.- For production stability, the prompt uses
state_summary + recent_history + budgets, not the full raw log. forbidden_targetsgives LLM an explicit prohibition against repeating the same target afterneeds_reroute.state_summarystabilizes routing viaroutes_used_unique,last_route_target,last_observation_status.timeout=LLM_TIMEOUT_SECONDSandLLMTimeoutprovide controlled stop on network/model issues.- Empty final response is not masked with fallback text: explicit
llm_emptyis returned.
main.py β Route -> Delegate -> Finalize
from __future__ import annotations
import json
import time
from typing import Any
from gateway import Budget, RouteGateway, StopRun, args_hash, validate_route_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, decide_route
from tools import billing_specialist, sales_specialist, technical_specialist
GOAL = (
"User Anna (user_id=42) asks: Can I get a refund for my pro_monthly subscription "
"charged 10 days ago? Route to the correct specialist and provide a short final answer."
)
BUDGET = Budget(max_route_attempts=3, max_delegations=3, max_seconds=60)
ROUTE_REGISTRY = {
"billing_specialist": billing_specialist,
"technical_specialist": technical_specialist,
"sales_specialist": sales_specialist,
}
ALLOWED_ROUTE_TARGETS_POLICY = {
"billing_specialist",
"technical_specialist",
"sales_specialist",
}
ALLOWED_ROUTE_TARGETS_EXECUTION = {
"billing_specialist",
"technical_specialist",
"sales_specialist",
}
def run_routing(goal: str) -> dict[str, Any]:
started = time.monotonic()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = RouteGateway(
allow=ALLOWED_ROUTE_TARGETS_EXECUTION,
registry=ROUTE_REGISTRY,
budget=BUDGET,
)
for attempt in range(1, BUDGET.max_route_attempts + 1):
elapsed = time.monotonic() - started
if elapsed > BUDGET.max_seconds:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"trace": trace,
"history": history,
}
previous_step = history[-1] if history else None
previous_observation = (
previous_step.get("observation")
if isinstance(previous_step, dict)
else None
)
previous_route = previous_step.get("route") if isinstance(previous_step, dict) else None
previous_status = (
previous_observation.get("status")
if isinstance(previous_observation, dict)
else None
)
previous_target = (
previous_route.get("target")
if isinstance(previous_route, dict)
else None
)
forbidden_targets = (
[previous_target]
if previous_status == "needs_reroute" and isinstance(previous_target, str)
else []
)
try:
raw_route = decide_route(
goal=goal,
history=history,
max_route_attempts=BUDGET.max_route_attempts,
remaining_attempts=(BUDGET.max_route_attempts - attempt + 1),
forbidden_targets=forbidden_targets,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "route",
"trace": trace,
"history": history,
}
try:
route_action = validate_route_action(
raw_route,
allowed_routes=ALLOWED_ROUTE_TARGETS_POLICY,
previous_target=previous_target,
previous_status=previous_status,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "route",
"raw_route": raw_route,
"trace": trace,
"history": history,
}
target = route_action["target"]
route_args = route_action["args"]
try:
observation = gateway.call(target, route_args)
trace.append(
{
"attempt": attempt,
"target": target,
"args_hash": args_hash(route_args),
"ok": True,
}
)
except StopRun as exc:
trace.append(
{
"attempt": attempt,
"target": target,
"args_hash": args_hash(route_args),
"ok": False,
"stop_reason": exc.reason,
}
)
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "delegate",
"route": route_action,
"trace": trace,
"history": history,
}
history.append(
{
"attempt": attempt,
"route": route_action,
"observation": observation,
}
)
observation_status = observation.get("status")
if trace:
trace[-1]["observation_status"] = observation_status
if isinstance(observation, dict) and observation.get("domain"):
trace[-1]["domain"] = observation.get("domain")
if observation_status == "needs_reroute":
continue
if observation_status != "done":
return {
"status": "stopped",
"stop_reason": "route_bad_observation",
"phase": "delegate",
"route": route_action,
"expected_statuses": ["needs_reroute", "done"],
"received_status": observation_status,
"bad_observation": observation,
"trace": trace,
"history": history,
}
try:
answer = compose_final_answer(
goal=goal,
selected_route=target,
history=history,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "finalize",
"route": route_action,
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"phase": "finalize",
"route": route_action,
"trace": trace,
"history": history,
}
return {
"status": "ok",
"stop_reason": "success",
"selected_route": target,
"answer": answer,
"trace": trace,
"history": history,
}
return {
"status": "stopped",
"stop_reason": "max_route_attempts",
"trace": trace,
"history": history,
}
def main() -> None:
result = run_routing(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
What matters most here (plain words)
run_routing(...)controls the full lifecycleRoute -> Delegate -> Finalize.- Router (LLM) does not perform work - execution is done only by a worker through
RouteGateway. - If route is invalid,
raw_routeis returned for debugging. - If reroute is needed, policy does not allow repeating the same target (
invalid_route:repeat_target_after_reroute). - For debugging, stop responses include
phase(route/delegate/finalize). historytransparently records route decisions and observations for each attempt.
requirements.txt
openai==2.21.0
Example output
Route target and route-attempt order may vary between runs, but policy gates and stop reasons remain stable.
{
"status": "ok",
"stop_reason": "success",
"selected_route": "billing_specialist",
"answer": "The billing specialist reviewed your request and confirmed that your pro_monthly subscription charged 10 days ago is eligible for a refund. You will receive a refund of $49.00 because pro monthly subscriptions are refundable within 14 days.",
"trace": [
{
"attempt": 1,
"target": "billing_specialist",
"args_hash": "5e89...",
"ok": true,
"observation_status": "done",
"domain": "billing"
}
],
"history": [{...}]
}
This is a shortened example: in a real run, trace may contain multiple route attempts.
history is the execution log: for each attempt, it stores route and observation.
args_hash is an arguments hash after string normalization (trim + collapse spaces), so loop detection more reliably catches semantically identical repeats.
Typical stop_reason values
successβ route selected, worker completed, final answer generatedinvalid_route:*β route JSON from LLM failed policy validationinvalid_route:non_jsonβ LLM did not return valid route JSONinvalid_route:missing_ticketβ route args do not include requiredticketinvalid_route:route_not_allowed:<target>β route is outside allowlist policyinvalid_route:repeat_target_after_rerouteβ afterneeds_reroute, the same target was selected againmax_route_attemptsβ reroute attempt limit exceededmax_delegationsβ delegation call limit exhaustedmax_secondsβ run time budget exceededllm_timeoutβ LLM did not respond withinOPENAI_TIMEOUT_SECONDSllm_emptyβ LLM returned an empty final response atfinalizestageroute_denied:<target>β target is blocked by execution allowlistroute_missing:<target>β target is missing inROUTE_REGISTRYroute_bad_args:<target>β route contains invalid argumentsroute_bad_observationβ worker returned an out-of-contract observation (result includesexpected_statuses,received_status,bad_observation)loop_detectedβ exact repeat (target + args_hash)
For stopped runs, phase is also returned to quickly see where the stop happened.
What is NOT shown here
- No auth/PII and production access controls for personal data.
- No retry/backoff policies for LLM and execution layer.
- No token/cost budgets (cost guardrails).
- Workers here are deterministic learning mocks, not real external systems.
What to try next
- Remove
billing_specialistfromALLOWED_ROUTE_TARGETS_POLICYand verifyinvalid_route:route_not_allowed:*. - Remove
billing_specialistonly fromALLOWED_ROUTE_TARGETS_EXECUTIONand verifyroute_denied:*. - Add a non-existent target to route JSON and verify
route_missing:*. - Change
GOALto a technical incident and verify routing totechnical_specialist. - Try a route without
ticketinargsand verifyinvalid_route:missing_ticket.
Full code on GitHub
The repository contains the full runnable version of this example: route decision, policy boundary, delegation, reroute fallback, and stop reasons.
View full code on GitHub β