Task Decomposition Agent β€” Python (full implementation with LLM)

Production-style runnable Task Decomposition agent example in Python with planning, policy boundary, sequential step execution, budgets, and stop reasons.
On this page
  1. Pattern Essence (Brief)
  2. What this example demonstrates
  3. Architecture
  4. Project structure
  5. How to run
  6. Task
  7. Solution
  8. Code
  9. tools.py β€” tools (source of facts)
  10. gateway.py β€” policy boundary (the most important layer)
  11. llm.py β€” planning + final synthesis
  12. main.py β€” Plan -> Execute -> Combine
  13. requirements.txt
  14. Example output
  15. Typical stop_reason values
  16. What is NOT shown here
  17. What to try next
  18. Full code on GitHub

Pattern Essence (Brief)

Task Decomposition Agent is a pattern where the agent first breaks a complex task into sequential steps, and only then executes them one by one.

The model is responsible for the plan and action order, while execution of each step goes through a controlled gateway with plan validation, allowlist, and run budgets.


What this example demonstrates

  • separate Plan stage before Execute
  • policy boundary between planning (LLM) and tools (execution layer)
  • strict plan validation (kind, step structure, allowed keys)
  • tool allowlist (deny by default)
  • separate run budgets: max_plan_steps (plan) and max_execute_steps (execution), plus max_tool_calls, max_seconds
  • explicit stop_reason values for debugging and monitoring
  • raw_plan in the response if the plan is invalid

Architecture

  1. LLM receives the goal and returns a JSON plan (kind="plan", steps).
  2. Policy boundary validates the plan and blocks invalid/unsafe forms.
  3. Each step is executed sequentially through ToolGateway (allowlist, budgets, loop detection).
  4. Observation from each step is added to history as a checkpoint for transparent execution.
  5. After all steps are executed, LLM performs final synthesis from history with a separate Combine call without tools.

LLM returns intent (plan), treated as untrusted input: policy boundary validates it first and then (if allowed) calls tools. Allowlist is applied twice: in plan validation (invalid_plan:tool_not_allowed:*) and during tool execution (tool_denied:*).

This keeps Task Decomposition controllable: the agent plans, and execution goes through a controlled layer.


Project structure

TEXT
examples/
└── agent-patterns/
    └── task-decomposition-agent/
        └── python/
            β”œβ”€β”€ main.py           # Plan -> Execute -> Combine
            β”œβ”€β”€ llm.py            # planner + final synthesis
            β”œβ”€β”€ gateway.py        # policy boundary: plan validation + tool execution control
            β”œβ”€β”€ tools.py          # deterministic tools (Anna/Max, US, USD)
            └── requirements.txt

How to run

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/task-decomposition-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ is required.

Option via export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Option via .env (optional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

This is the shell variant (macOS/Linux). On Windows it is easier to use environment set commands or, if desired, python-dotenv to load .env automatically.


Task

Imagine a manager asks:

"Prepare a short report for April 2026: sales, refunds, net revenue, and risks."

The agent should not invent an answer "from memory". It must:

  • first create a plan
  • execute steps in sequence
  • use data only from allowed tools
  • provide the final answer only after all steps

Solution

Here the agent works in a straightforward flow:

  • LLM first creates a plan with several steps
  • system checks that the plan is valid and allowed
  • tools execute steps and return facts
  • after that, LLM composes the final short summary
  • if a plan or step is invalid, the run stops with a reason

Code

tools.py β€” tools (source of facts)

PYTHON
from __future__ import annotations

from typing import Any

MANAGERS = {
    42: {"id": 42, "name": "Anna", "region": "US", "team": "Retail East"},
    7: {"id": 7, "name": "Max", "region": "US", "team": "Retail West"},
}

SALES_DATA = {
    "2026-04": [
        {"day": "2026-04-01", "gross_usd": 5200.0, "orders": 120},
        {"day": "2026-04-02", "gross_usd": 4890.0, "orders": 113},
        {"day": "2026-04-03", "gross_usd": 6105.0, "orders": 141},
        {"day": "2026-04-04", "gross_usd": 5580.0, "orders": 127},
        {"day": "2026-04-05", "gross_usd": 6420.0, "orders": 149},
    ]
}

REFUND_DATA = {
    "2026-04": [
        {"day": "2026-04-01", "refunds_usd": 140.0},
        {"day": "2026-04-02", "refunds_usd": 260.0},
        {"day": "2026-04-03", "refunds_usd": 210.0},
        {"day": "2026-04-04", "refunds_usd": 590.0},
        {"day": "2026-04-05", "refunds_usd": 170.0},
    ]
}


def get_manager_profile(manager_id: int) -> dict[str, Any]:
    manager = MANAGERS.get(manager_id)
    if not manager:
        return {"error": f"manager {manager_id} not found"}
    return {"manager": manager}


def fetch_sales_data(month: str) -> dict[str, Any]:
    rows = SALES_DATA.get(month)
    if not rows:
        return {"error": f"sales data for {month} not found"}
    return {"month": month, "currency": "USD", "daily_sales": rows}


def fetch_refund_data(month: str) -> dict[str, Any]:
    rows = REFUND_DATA.get(month)
    if not rows:
        return {"error": f"refund data for {month} not found"}
    return {"month": month, "currency": "USD", "daily_refunds": rows}


def calculate_monthly_kpis(month: str) -> dict[str, Any]:
    sales_rows = SALES_DATA.get(month)
    refund_rows = REFUND_DATA.get(month)
    if not sales_rows or not refund_rows:
        return {"error": f"kpi inputs for {month} not found"}

    gross_sales = sum(row["gross_usd"] for row in sales_rows)
    total_refunds = sum(row["refunds_usd"] for row in refund_rows)
    total_orders = sum(row["orders"] for row in sales_rows)
    net_sales = gross_sales - total_refunds
    refund_rate = (total_refunds / gross_sales) if gross_sales else 0.0

    top_day = max(sales_rows, key=lambda row: row["gross_usd"])["day"]

    return {
        "month": month,
        "currency": "USD",
        "gross_sales_usd": round(gross_sales, 2),
        "refunds_usd": round(total_refunds, 2),
        "net_sales_usd": round(net_sales, 2),
        "orders": total_orders,
        "refund_rate": round(refund_rate, 4),
        "top_sales_day": top_day,
    }


def detect_risk_signals(month: str) -> dict[str, Any]:
    refund_rows = REFUND_DATA.get(month)
    if not refund_rows:
        return {"error": f"refund data for {month} not found"}

    high_refund_day = max(refund_rows, key=lambda row: row["refunds_usd"])
    warnings: list[str] = []

    if high_refund_day["refunds_usd"] >= 500:
        warnings.append(
            f"Refund spike detected on {high_refund_day['day']}: {high_refund_day['refunds_usd']} USD"
        )

    if not warnings:
        warnings.append("No critical risk signals detected for this month.")

    return {
        "month": month,
        "currency": "USD",
        "risk_warnings": warnings,
        "peak_refund_day": high_refund_day,
    }

What matters most here (plain words)

  • Tools are deterministic and contain no LLM logic.
  • The agent only decides which steps to execute.
  • Business logic is executed by the execution layer (tools), not by LLM.

gateway.py β€” policy boundary (the most important layer)

PYTHON
from __future__ import annotations

import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_plan_steps: int = 6
    max_execute_steps: int = 8
    max_tool_calls: int = 8
    max_seconds: int = 60


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(item) for item in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(
                json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key])
            )
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def args_hash(args: dict[str, Any]) -> str:
    raw = _stable_json(args or {})
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def validate_plan_action(
    action: Any, *, max_plan_steps: int, allowed_tools: set[str]
) -> list[dict[str, Any]]:
    if not isinstance(action, dict):
        raise StopRun("invalid_plan:not_object")

    kind = action.get("kind")
    if kind == "invalid":
        raise StopRun("invalid_plan:non_json")
    if kind != "plan":
        raise StopRun("invalid_plan:bad_kind")

    allowed_top_keys = {"kind", "steps"}
    if set(action.keys()) - allowed_top_keys:
        raise StopRun("invalid_plan:extra_keys")

    steps = action.get("steps")
    if not isinstance(steps, list) or not steps:
        raise StopRun("invalid_plan:missing_steps")
    if len(steps) < 3:
        raise StopRun("invalid_plan:min_steps")
    if len(steps) > max_plan_steps:
        raise StopRun("invalid_plan:max_steps")

    normalized: list[dict[str, Any]] = []
    seen_ids: set[str] = set()

    for index, step in enumerate(steps, start=1):
        if not isinstance(step, dict):
            raise StopRun(f"invalid_plan:step_{index}_not_object")

        allowed_step_keys = {"id", "title", "tool", "args"}
        if set(step.keys()) - allowed_step_keys:
            raise StopRun(f"invalid_plan:step_{index}_extra_keys")

        step_id = step.get("id")
        if not isinstance(step_id, str) or not step_id.strip():
            raise StopRun(f"invalid_plan:step_{index}_missing_id")
        if step_id in seen_ids:
            raise StopRun("invalid_plan:duplicate_step_id")
        seen_ids.add(step_id)

        title = step.get("title")
        if not isinstance(title, str) or not title.strip():
            raise StopRun(f"invalid_plan:step_{index}_missing_title")

        tool = step.get("tool")
        if not isinstance(tool, str) or not tool.strip():
            raise StopRun(f"invalid_plan:step_{index}_missing_tool")
        tool = tool.strip()
        if tool not in allowed_tools:
            raise StopRun(f"invalid_plan:tool_not_allowed:{tool}")

        args = step.get("args", {})
        if args is None:
            args = {}
        if not isinstance(args, dict):
            raise StopRun(f"invalid_plan:step_{index}_bad_args")

        normalized.append(
            {
                "id": step_id.strip(),
                "title": title.strip(),
                "tool": tool,
                "args": args,
            }
        )

    return normalized


class ToolGateway:
    def __init__(
        self,
        *,
        allow: set[str],
        registry: dict[str, Callable[..., dict[str, Any]]],
        budget: Budget,
    ):
        self.allow = set(allow)
        self.registry = registry
        self.budget = budget
        self.tool_calls = 0
        self.seen_calls: set[str] = set()

    def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
        self.tool_calls += 1
        if self.tool_calls > self.budget.max_tool_calls:
            raise StopRun("max_tool_calls")

        if name not in self.allow:
            raise StopRun(f"tool_denied:{name}")

        tool = self.registry.get(name)
        if tool is None:
            raise StopRun(f"tool_missing:{name}")

        signature = f"{name}:{args_hash(args)}"
        if signature in self.seen_calls:
            raise StopRun("loop_detected")
        self.seen_calls.add(signature)

        try:
            return tool(**args)
        except TypeError as exc:
            raise StopRun(f"tool_bad_args:{name}") from exc
        except Exception as exc:
            raise StopRun(f"tool_error:{name}") from exc

What matters most here (plain words)

  • validate_plan_action(...) is the governance/control layer for the LLM plan.
  • The plan is treated as untrusted input and goes through strict validation.
  • ToolGateway.call(...) is the agent β‰  executor boundary: the agent plans, gateway executes safely.
  • loop_detected catches exact repeats (tool + args_hash).

llm.py β€” planning + final synthesis

LLM only sees the catalog of available tools; if a tool is not in allowlist, gateway stops the run.

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


PLAN_SYSTEM_PROMPT = """
You are a task decomposition planner.
Return only one JSON object in this exact shape:
{
  "kind": "plan",
  "steps": [
    {"id": "step_1", "title": "...", "tool": "...", "args": {...}}
  ]
}

Rules:
- Create 3 to 6 steps.
- Use only tools from available_tools.
- Keep args minimal and valid.
- Do not add extra keys.
- Do not output markdown.
""".strip()

FINAL_SYSTEM_PROMPT = """
You are a reporting assistant.
Write a short final summary in English for a US business audience.
Include: manager name, month, gross sales (USD), refunds (USD), net sales (USD), refund rate (%), and key risk note.
""".strip()

TOOL_CATALOG = [
    {
        "name": "get_manager_profile",
        "description": "Get manager profile by manager_id",
        "args": {"manager_id": "integer"},
    },
    {
        "name": "fetch_sales_data",
        "description": "Get daily gross sales for a month",
        "args": {"month": "string in YYYY-MM"},
    },
    {
        "name": "fetch_refund_data",
        "description": "Get daily refund values for a month",
        "args": {"month": "string in YYYY-MM"},
    },
    {
        "name": "calculate_monthly_kpis",
        "description": "Calculate gross/refunds/net/order KPIs for a month",
        "args": {"month": "string in YYYY-MM"},
    },
    {
        "name": "detect_risk_signals",
        "description": "Detect risk warnings for a month",
        "args": {"month": "string in YYYY-MM"},
    },
]


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def create_plan(goal: str, max_plan_steps: int) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "max_plan_steps": max_plan_steps,
        "available_tools": TOOL_CATALOG,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": PLAN_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
    payload = {
        "goal": goal,
        "history": history,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            messages=[
                {"role": "system", "content": FINAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or ""
    text = text.strip()
    if not text:
        raise LLMEmpty("llm_empty")
    return text

What matters most here (plain words)

  • create_plan(...) is the decision stage for decomposition.
  • timeout=LLM_TIMEOUT_SECONDS + LLMTimeout provide controlled stopping on network/model issues.
  • Empty final output is not masked with fallback text: explicit llm_empty is returned.
  • If JSON is broken, {"kind":"invalid"...} is returned, and policy layer emits a readable stop_reason.

main.py β€” Plan -> Execute -> Combine

PYTHON
from __future__ import annotations

import json
import time
from typing import Any

from gateway import Budget, StopRun, ToolGateway, args_hash, validate_plan_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from tools import (
    calculate_monthly_kpis,
    detect_risk_signals,
    fetch_refund_data,
    fetch_sales_data,
    get_manager_profile,
)

GOAL = (
    "Prepare an April 2026 monthly sales summary for manager_id=42 in USD. "
    "Use step-by-step decomposition. Include gross sales, refunds, net sales, refund rate, and one risk note."
)

# max_execute_steps here limits plan length (number of planned steps), not runtime loop iterations.
BUDGET = Budget(max_plan_steps=6, max_execute_steps=8, max_tool_calls=8, max_seconds=60)

TOOL_REGISTRY = {
    "get_manager_profile": get_manager_profile,
    "fetch_sales_data": fetch_sales_data,
    "fetch_refund_data": fetch_refund_data,
    "calculate_monthly_kpis": calculate_monthly_kpis,
    "detect_risk_signals": detect_risk_signals,
}

ALLOWED_TOOLS = {
    "get_manager_profile",
    "fetch_sales_data",
    "fetch_refund_data",
    "calculate_monthly_kpis",
    "detect_risk_signals",
}


def run_task_decomposition(goal: str) -> dict[str, Any]:
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)

    try:
        raw_plan = create_plan(goal=goal, max_plan_steps=BUDGET.max_plan_steps)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "llm_phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        steps = validate_plan_action(
            raw_plan,
            max_plan_steps=BUDGET.max_plan_steps,
            allowed_tools=ALLOWED_TOOLS,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "raw_plan": raw_plan,
            "trace": trace,
            "history": history,
        }

    if len(steps) > BUDGET.max_execute_steps:
        return {
            "status": "stopped",
            "stop_reason": "max_execute_steps",
            "plan": steps,
            "trace": trace,
            "history": history,
        }

    for step_no, step in enumerate(steps, start=1):
        elapsed = time.monotonic() - started
        if elapsed > BUDGET.max_seconds:
            return {
                "status": "stopped",
                "stop_reason": "max_seconds",
                "plan": steps,
                "trace": trace,
                "history": history,
            }

        tool_name = step["tool"]
        tool_args = step["args"]

        try:
            observation = gateway.call(tool_name, tool_args)
            trace.append(
                {
                    "step_no": step_no,
                    "step_id": step["id"],
                    "tool": tool_name,
                    "args_hash": args_hash(tool_args),
                    "ok": True,
                }
            )
        except StopRun as exc:
            trace.append(
                {
                    "step_no": step_no,
                    "step_id": step["id"],
                    "tool": tool_name,
                    "args_hash": args_hash(tool_args),
                    "ok": False,
                    "stop_reason": exc.reason,
                }
            )
            return {
                "status": "stopped",
                "stop_reason": exc.reason,
                "plan": steps,
                "trace": trace,
                "history": history,
            }

        history.append(
            {
                "step_no": step_no,
                "plan_step": step,
                "observation": observation,
            }
        )

    try:
        answer = compose_final_answer(goal=goal, history=history)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "llm_phase": "finalize",
            "plan": steps,
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "llm_phase": "finalize",
            "plan": steps,
            "trace": trace,
            "history": history,
        }

    return {
        "status": "ok",
        "stop_reason": "success",
        "answer": answer,
        "plan": steps,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_task_decomposition(GOAL)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

What matters most here (plain words)

  • run_task_decomposition(...) controls Plan -> Execute -> Combine; business actions are executed only through ToolGateway.
  • For an invalid plan, raw_plan is returned for debugging.
  • In this version, max_execute_steps checks plan length before execution; runtime limits are then enforced by max_tool_calls and max_seconds.
  • history is a transparent step log: what was in the plan and which observation each tool returned.

requirements.txt

TEXT
openai==2.21.0

Example output

Step order in the plan may vary slightly between runs, but policy gates and stop reasons stay stable. Planner may reorder steps; what matters is that policy + allowlist work the same regardless of order.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "answer": "In April 2026, under manager Anna's leadership, gross sales were $28,195, refunds were $1,370, net sales were $26,825, and refund rate was 4.86%.",
  "plan": [
    {"id": "step_1", "tool": "fetch_sales_data", "args": {"month": "2026-04"}},
    {"id": "step_2", "tool": "fetch_refund_data", "args": {"month": "2026-04"}},
    {"id": "step_3", "tool": "calculate_monthly_kpis", "args": {"month": "2026-04"}},
    {"id": "step_4", "tool": "detect_risk_signals", "args": {"month": "2026-04"}},
    {"id": "step_5", "tool": "get_manager_profile", "args": {"manager_id": 42}}
  ],
  "trace": [
    {"step_no": 1, "step_id": "step_1", "tool": "fetch_sales_data", "args_hash": "...", "ok": true},
    {"step_no": 2, "step_id": "step_2", "tool": "fetch_refund_data", "args_hash": "...", "ok": true},
    {"step_no": 3, "step_id": "step_3", "tool": "calculate_monthly_kpis", "args_hash": "...", "ok": true},
    {"step_no": 4, "step_id": "step_4", "tool": "detect_risk_signals", "args_hash": "...", "ok": true},
    {"step_no": 5, "step_id": "step_5", "tool": "get_manager_profile", "args_hash": "...", "ok": true}
  ],
  "history": [{...}]
}

This is a shortened example: in a real run, plan and trace may contain more steps.

history is the execution log: for each step_no, it stores plan_step and observation.

args_hash hashes only arguments, so it can match across different tools when args are identical; loop detection additionally uses tool name.


Typical stop_reason values

  • success β€” plan executed and final answer generated
  • invalid_plan:* β€” LLM plan failed policy validation
  • invalid_plan:non_json β€” LLM did not return valid JSON plan
  • invalid_plan:min_steps β€” plan has fewer than 3 decomposition steps
  • invalid_plan:tool_not_allowed:<name> β€” plan contains a tool outside allowlist
  • max_execute_steps β€” plan is longer than allowed execution budget
  • max_tool_calls β€” tool call limit exhausted
  • max_seconds β€” run time budget exceeded
  • llm_timeout β€” LLM did not respond within OPENAI_TIMEOUT_SECONDS
  • llm_empty β€” LLM returned an empty final answer at finalize stage
  • tool_denied:<name> β€” tool is not in allowlist
  • tool_missing:<name> β€” tool is missing from registry
  • tool_bad_args:<name> β€” step contains invalid arguments
  • loop_detected β€” exact repeat (tool + args_hash)

What is NOT shown here

  • No auth/PII and production access controls for personal data.
  • No retry/backoff policies for LLM and tool layer.
  • No token/cost budgets (cost guardrails).
  • Tools here are deterministic learning mocks, not real external APIs.

What to try next

  • Remove detect_risk_signals from ALLOWED_TOOLS and verify tool_denied:*.
  • Add a non-existent tool to the plan and verify tool_missing:*.
  • Reduce max_plan_steps to 3 and observe how often you get invalid_plan:max_steps.
  • Change GOAL to manager_id=7 (Max) and compare final synthesis.
  • Add cost/token guardrails to Budget and to the final JSON result.

Full code on GitHub

The repository contains the full runnable version of this example: planning, policy boundary, sequential step execution, and stop reasons.

View full code on GitHub β†—
⏱️ 14 min read β€’ Updated Mar, 2026Difficulty: β˜…β˜…β˜†
Integrated: production controlOnceOnly
Add guardrails to tool-calling agents
Ship this pattern with governance:
  • Budgets (steps / spend caps)
  • Tool permissions (allowlist / blocklist)
  • Kill switch & incident stop
  • Idempotency & dedupe
  • Audit logs & traceability
Integrated mention: OnceOnly is a control layer for production agent systems.
Author

This documentation is curated and maintained by engineers who ship AI agents in production.

The content is AI-assisted, with human editorial responsibility for accuracy, clarity, and production relevance.

Patterns and recommendations are grounded in post-mortems, failure modes, and operational incidents in deployed systems, including during the development and operation of governance infrastructure for agents at OnceOnly.