Pattern Essence (Brief)
Task Decomposition Agent is a pattern where the agent first breaks a complex task into sequential steps, and only then executes them one by one.
The model is responsible for the plan and action order, while execution of each step goes through a controlled gateway with plan validation, allowlist, and run budgets.
What this example demonstrates
- separate
Planstage beforeExecute - policy boundary between planning (LLM) and tools (execution layer)
- strict plan validation (
kind, step structure, allowed keys) - tool allowlist (deny by default)
- separate run budgets:
max_plan_steps(plan) andmax_execute_steps(execution), plusmax_tool_calls,max_seconds - explicit
stop_reasonvalues for debugging and monitoring raw_planin the response if the plan is invalid
Architecture
- LLM receives the goal and returns a JSON plan (
kind="plan",steps). - Policy boundary validates the plan and blocks invalid/unsafe forms.
- Each step is executed sequentially through
ToolGateway(allowlist, budgets, loop detection). - Observation from each step is added to
historyas a checkpoint for transparent execution. - After all steps are executed, LLM performs final synthesis from
historywith a separateCombinecall without tools.
LLM returns intent (plan), treated as untrusted input: policy boundary validates it first and then (if allowed) calls tools.
Allowlist is applied twice: in plan validation (invalid_plan:tool_not_allowed:*) and during tool execution (tool_denied:*).
This keeps Task Decomposition controllable: the agent plans, and execution goes through a controlled layer.
Project structure
examples/
βββ agent-patterns/
βββ task-decomposition-agent/
βββ python/
βββ main.py # Plan -> Execute -> Combine
βββ llm.py # planner + final synthesis
βββ gateway.py # policy boundary: plan validation + tool execution control
βββ tools.py # deterministic tools (Anna/Max, US, USD)
βββ requirements.txt
How to run
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/task-decomposition-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ is required.
Option via export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Option via .env (optional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
This is the shell variant (macOS/Linux). On Windows it is easier to use environment set commands or, if desired, python-dotenv to load .env automatically.
Task
Imagine a manager asks:
"Prepare a short report for April 2026: sales, refunds, net revenue, and risks."
The agent should not invent an answer "from memory". It must:
- first create a plan
- execute steps in sequence
- use data only from allowed tools
- provide the final answer only after all steps
Solution
Here the agent works in a straightforward flow:
- LLM first creates a plan with several steps
- system checks that the plan is valid and allowed
- tools execute steps and return facts
- after that, LLM composes the final short summary
- if a plan or step is invalid, the run stops with a reason
Code
tools.py β tools (source of facts)
from __future__ import annotations
from typing import Any
MANAGERS = {
42: {"id": 42, "name": "Anna", "region": "US", "team": "Retail East"},
7: {"id": 7, "name": "Max", "region": "US", "team": "Retail West"},
}
SALES_DATA = {
"2026-04": [
{"day": "2026-04-01", "gross_usd": 5200.0, "orders": 120},
{"day": "2026-04-02", "gross_usd": 4890.0, "orders": 113},
{"day": "2026-04-03", "gross_usd": 6105.0, "orders": 141},
{"day": "2026-04-04", "gross_usd": 5580.0, "orders": 127},
{"day": "2026-04-05", "gross_usd": 6420.0, "orders": 149},
]
}
REFUND_DATA = {
"2026-04": [
{"day": "2026-04-01", "refunds_usd": 140.0},
{"day": "2026-04-02", "refunds_usd": 260.0},
{"day": "2026-04-03", "refunds_usd": 210.0},
{"day": "2026-04-04", "refunds_usd": 590.0},
{"day": "2026-04-05", "refunds_usd": 170.0},
]
}
def get_manager_profile(manager_id: int) -> dict[str, Any]:
manager = MANAGERS.get(manager_id)
if not manager:
return {"error": f"manager {manager_id} not found"}
return {"manager": manager}
def fetch_sales_data(month: str) -> dict[str, Any]:
rows = SALES_DATA.get(month)
if not rows:
return {"error": f"sales data for {month} not found"}
return {"month": month, "currency": "USD", "daily_sales": rows}
def fetch_refund_data(month: str) -> dict[str, Any]:
rows = REFUND_DATA.get(month)
if not rows:
return {"error": f"refund data for {month} not found"}
return {"month": month, "currency": "USD", "daily_refunds": rows}
def calculate_monthly_kpis(month: str) -> dict[str, Any]:
sales_rows = SALES_DATA.get(month)
refund_rows = REFUND_DATA.get(month)
if not sales_rows or not refund_rows:
return {"error": f"kpi inputs for {month} not found"}
gross_sales = sum(row["gross_usd"] for row in sales_rows)
total_refunds = sum(row["refunds_usd"] for row in refund_rows)
total_orders = sum(row["orders"] for row in sales_rows)
net_sales = gross_sales - total_refunds
refund_rate = (total_refunds / gross_sales) if gross_sales else 0.0
top_day = max(sales_rows, key=lambda row: row["gross_usd"])["day"]
return {
"month": month,
"currency": "USD",
"gross_sales_usd": round(gross_sales, 2),
"refunds_usd": round(total_refunds, 2),
"net_sales_usd": round(net_sales, 2),
"orders": total_orders,
"refund_rate": round(refund_rate, 4),
"top_sales_day": top_day,
}
def detect_risk_signals(month: str) -> dict[str, Any]:
refund_rows = REFUND_DATA.get(month)
if not refund_rows:
return {"error": f"refund data for {month} not found"}
high_refund_day = max(refund_rows, key=lambda row: row["refunds_usd"])
warnings: list[str] = []
if high_refund_day["refunds_usd"] >= 500:
warnings.append(
f"Refund spike detected on {high_refund_day['day']}: {high_refund_day['refunds_usd']} USD"
)
if not warnings:
warnings.append("No critical risk signals detected for this month.")
return {
"month": month,
"currency": "USD",
"risk_warnings": warnings,
"peak_refund_day": high_refund_day,
}
What matters most here (plain words)
- Tools are deterministic and contain no LLM logic.
- The agent only decides which steps to execute.
- Business logic is executed by the execution layer (tools), not by LLM.
gateway.py β policy boundary (the most important layer)
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_plan_steps: int = 6
max_execute_steps: int = 8
max_tool_calls: int = 8
max_seconds: int = 60
def _stable_json(value: Any) -> str:
if value is None or isinstance(value, (bool, int, float, str)):
return json.dumps(value, ensure_ascii=True, sort_keys=True)
if isinstance(value, list):
return "[" + ",".join(_stable_json(item) for item in value) + "]"
if isinstance(value, dict):
parts = []
for key in sorted(value):
parts.append(
json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key])
)
return "{" + ",".join(parts) + "}"
return json.dumps(str(value), ensure_ascii=True)
def args_hash(args: dict[str, Any]) -> str:
raw = _stable_json(args or {})
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
def validate_plan_action(
action: Any, *, max_plan_steps: int, allowed_tools: set[str]
) -> list[dict[str, Any]]:
if not isinstance(action, dict):
raise StopRun("invalid_plan:not_object")
kind = action.get("kind")
if kind == "invalid":
raise StopRun("invalid_plan:non_json")
if kind != "plan":
raise StopRun("invalid_plan:bad_kind")
allowed_top_keys = {"kind", "steps"}
if set(action.keys()) - allowed_top_keys:
raise StopRun("invalid_plan:extra_keys")
steps = action.get("steps")
if not isinstance(steps, list) or not steps:
raise StopRun("invalid_plan:missing_steps")
if len(steps) < 3:
raise StopRun("invalid_plan:min_steps")
if len(steps) > max_plan_steps:
raise StopRun("invalid_plan:max_steps")
normalized: list[dict[str, Any]] = []
seen_ids: set[str] = set()
for index, step in enumerate(steps, start=1):
if not isinstance(step, dict):
raise StopRun(f"invalid_plan:step_{index}_not_object")
allowed_step_keys = {"id", "title", "tool", "args"}
if set(step.keys()) - allowed_step_keys:
raise StopRun(f"invalid_plan:step_{index}_extra_keys")
step_id = step.get("id")
if not isinstance(step_id, str) or not step_id.strip():
raise StopRun(f"invalid_plan:step_{index}_missing_id")
if step_id in seen_ids:
raise StopRun("invalid_plan:duplicate_step_id")
seen_ids.add(step_id)
title = step.get("title")
if not isinstance(title, str) or not title.strip():
raise StopRun(f"invalid_plan:step_{index}_missing_title")
tool = step.get("tool")
if not isinstance(tool, str) or not tool.strip():
raise StopRun(f"invalid_plan:step_{index}_missing_tool")
tool = tool.strip()
if tool not in allowed_tools:
raise StopRun(f"invalid_plan:tool_not_allowed:{tool}")
args = step.get("args", {})
if args is None:
args = {}
if not isinstance(args, dict):
raise StopRun(f"invalid_plan:step_{index}_bad_args")
normalized.append(
{
"id": step_id.strip(),
"title": title.strip(),
"tool": tool,
"args": args,
}
)
return normalized
class ToolGateway:
def __init__(
self,
*,
allow: set[str],
registry: dict[str, Callable[..., dict[str, Any]]],
budget: Budget,
):
self.allow = set(allow)
self.registry = registry
self.budget = budget
self.tool_calls = 0
self.seen_calls: set[str] = set()
def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
self.tool_calls += 1
if self.tool_calls > self.budget.max_tool_calls:
raise StopRun("max_tool_calls")
if name not in self.allow:
raise StopRun(f"tool_denied:{name}")
tool = self.registry.get(name)
if tool is None:
raise StopRun(f"tool_missing:{name}")
signature = f"{name}:{args_hash(args)}"
if signature in self.seen_calls:
raise StopRun("loop_detected")
self.seen_calls.add(signature)
try:
return tool(**args)
except TypeError as exc:
raise StopRun(f"tool_bad_args:{name}") from exc
except Exception as exc:
raise StopRun(f"tool_error:{name}") from exc
What matters most here (plain words)
validate_plan_action(...)is the governance/control layer for the LLM plan.- The plan is treated as untrusted input and goes through strict validation.
ToolGateway.call(...)is theagent β executorboundary: the agent plans, gateway executes safely.loop_detectedcatches exact repeats (tool + args_hash).
llm.py β planning + final synthesis
LLM only sees the catalog of available tools; if a tool is not in allowlist, gateway stops the run.
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
PLAN_SYSTEM_PROMPT = """
You are a task decomposition planner.
Return only one JSON object in this exact shape:
{
"kind": "plan",
"steps": [
{"id": "step_1", "title": "...", "tool": "...", "args": {...}}
]
}
Rules:
- Create 3 to 6 steps.
- Use only tools from available_tools.
- Keep args minimal and valid.
- Do not add extra keys.
- Do not output markdown.
""".strip()
FINAL_SYSTEM_PROMPT = """
You are a reporting assistant.
Write a short final summary in English for a US business audience.
Include: manager name, month, gross sales (USD), refunds (USD), net sales (USD), refund rate (%), and key risk note.
""".strip()
TOOL_CATALOG = [
{
"name": "get_manager_profile",
"description": "Get manager profile by manager_id",
"args": {"manager_id": "integer"},
},
{
"name": "fetch_sales_data",
"description": "Get daily gross sales for a month",
"args": {"month": "string in YYYY-MM"},
},
{
"name": "fetch_refund_data",
"description": "Get daily refund values for a month",
"args": {"month": "string in YYYY-MM"},
},
{
"name": "calculate_monthly_kpis",
"description": "Calculate gross/refunds/net/order KPIs for a month",
"args": {"month": "string in YYYY-MM"},
},
{
"name": "detect_risk_signals",
"description": "Detect risk warnings for a month",
"args": {"month": "string in YYYY-MM"},
},
]
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def create_plan(goal: str, max_plan_steps: int) -> dict[str, Any]:
payload = {
"goal": goal,
"max_plan_steps": max_plan_steps,
"available_tools": TOOL_CATALOG,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": PLAN_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
payload = {
"goal": goal,
"history": history,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or ""
text = text.strip()
if not text:
raise LLMEmpty("llm_empty")
return text
What matters most here (plain words)
create_plan(...)is the decision stage for decomposition.timeout=LLM_TIMEOUT_SECONDS+LLMTimeoutprovide controlled stopping on network/model issues.- Empty final output is not masked with fallback text: explicit
llm_emptyis returned. - If JSON is broken,
{"kind":"invalid"...}is returned, and policy layer emits a readablestop_reason.
main.py β Plan -> Execute -> Combine
from __future__ import annotations
import json
import time
from typing import Any
from gateway import Budget, StopRun, ToolGateway, args_hash, validate_plan_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from tools import (
calculate_monthly_kpis,
detect_risk_signals,
fetch_refund_data,
fetch_sales_data,
get_manager_profile,
)
GOAL = (
"Prepare an April 2026 monthly sales summary for manager_id=42 in USD. "
"Use step-by-step decomposition. Include gross sales, refunds, net sales, refund rate, and one risk note."
)
# max_execute_steps here limits plan length (number of planned steps), not runtime loop iterations.
BUDGET = Budget(max_plan_steps=6, max_execute_steps=8, max_tool_calls=8, max_seconds=60)
TOOL_REGISTRY = {
"get_manager_profile": get_manager_profile,
"fetch_sales_data": fetch_sales_data,
"fetch_refund_data": fetch_refund_data,
"calculate_monthly_kpis": calculate_monthly_kpis,
"detect_risk_signals": detect_risk_signals,
}
ALLOWED_TOOLS = {
"get_manager_profile",
"fetch_sales_data",
"fetch_refund_data",
"calculate_monthly_kpis",
"detect_risk_signals",
}
def run_task_decomposition(goal: str) -> dict[str, Any]:
started = time.monotonic()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)
try:
raw_plan = create_plan(goal=goal, max_plan_steps=BUDGET.max_plan_steps)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"llm_phase": "plan",
"trace": trace,
"history": history,
}
try:
steps = validate_plan_action(
raw_plan,
max_plan_steps=BUDGET.max_plan_steps,
allowed_tools=ALLOWED_TOOLS,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"raw_plan": raw_plan,
"trace": trace,
"history": history,
}
if len(steps) > BUDGET.max_execute_steps:
return {
"status": "stopped",
"stop_reason": "max_execute_steps",
"plan": steps,
"trace": trace,
"history": history,
}
for step_no, step in enumerate(steps, start=1):
elapsed = time.monotonic() - started
if elapsed > BUDGET.max_seconds:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"plan": steps,
"trace": trace,
"history": history,
}
tool_name = step["tool"]
tool_args = step["args"]
try:
observation = gateway.call(tool_name, tool_args)
trace.append(
{
"step_no": step_no,
"step_id": step["id"],
"tool": tool_name,
"args_hash": args_hash(tool_args),
"ok": True,
}
)
except StopRun as exc:
trace.append(
{
"step_no": step_no,
"step_id": step["id"],
"tool": tool_name,
"args_hash": args_hash(tool_args),
"ok": False,
"stop_reason": exc.reason,
}
)
return {
"status": "stopped",
"stop_reason": exc.reason,
"plan": steps,
"trace": trace,
"history": history,
}
history.append(
{
"step_no": step_no,
"plan_step": step,
"observation": observation,
}
)
try:
answer = compose_final_answer(goal=goal, history=history)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"llm_phase": "finalize",
"plan": steps,
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"llm_phase": "finalize",
"plan": steps,
"trace": trace,
"history": history,
}
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"plan": steps,
"trace": trace,
"history": history,
}
def main() -> None:
result = run_task_decomposition(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
What matters most here (plain words)
run_task_decomposition(...)controlsPlan -> Execute -> Combine; business actions are executed only throughToolGateway.- For an invalid plan,
raw_planis returned for debugging. - In this version,
max_execute_stepschecks plan length before execution; runtime limits are then enforced bymax_tool_callsandmax_seconds. historyis a transparent step log: what was in the plan and which observation each tool returned.
requirements.txt
openai==2.21.0
Example output
Step order in the plan may vary slightly between runs, but policy gates and stop reasons stay stable. Planner may reorder steps; what matters is that policy + allowlist work the same regardless of order.
{
"status": "ok",
"stop_reason": "success",
"answer": "In April 2026, under manager Anna's leadership, gross sales were $28,195, refunds were $1,370, net sales were $26,825, and refund rate was 4.86%.",
"plan": [
{"id": "step_1", "tool": "fetch_sales_data", "args": {"month": "2026-04"}},
{"id": "step_2", "tool": "fetch_refund_data", "args": {"month": "2026-04"}},
{"id": "step_3", "tool": "calculate_monthly_kpis", "args": {"month": "2026-04"}},
{"id": "step_4", "tool": "detect_risk_signals", "args": {"month": "2026-04"}},
{"id": "step_5", "tool": "get_manager_profile", "args": {"manager_id": 42}}
],
"trace": [
{"step_no": 1, "step_id": "step_1", "tool": "fetch_sales_data", "args_hash": "...", "ok": true},
{"step_no": 2, "step_id": "step_2", "tool": "fetch_refund_data", "args_hash": "...", "ok": true},
{"step_no": 3, "step_id": "step_3", "tool": "calculate_monthly_kpis", "args_hash": "...", "ok": true},
{"step_no": 4, "step_id": "step_4", "tool": "detect_risk_signals", "args_hash": "...", "ok": true},
{"step_no": 5, "step_id": "step_5", "tool": "get_manager_profile", "args_hash": "...", "ok": true}
],
"history": [{...}]
}
This is a shortened example: in a real run, plan and trace may contain more steps.
history is the execution log: for each step_no, it stores plan_step and observation.
args_hash hashes only arguments, so it can match across different tools when args are identical; loop detection additionally uses tool name.
Typical stop_reason values
successβ plan executed and final answer generatedinvalid_plan:*β LLM plan failed policy validationinvalid_plan:non_jsonβ LLM did not return valid JSON planinvalid_plan:min_stepsβ plan has fewer than 3 decomposition stepsinvalid_plan:tool_not_allowed:<name>β plan contains a tool outside allowlistmax_execute_stepsβ plan is longer than allowed execution budgetmax_tool_callsβ tool call limit exhaustedmax_secondsβ run time budget exceededllm_timeoutβ LLM did not respond withinOPENAI_TIMEOUT_SECONDSllm_emptyβ LLM returned an empty final answer atfinalizestagetool_denied:<name>β tool is not in allowlisttool_missing:<name>β tool is missing from registrytool_bad_args:<name>β step contains invalid argumentsloop_detectedβ exact repeat (tool + args_hash)
What is NOT shown here
- No auth/PII and production access controls for personal data.
- No retry/backoff policies for LLM and tool layer.
- No token/cost budgets (cost guardrails).
- Tools here are deterministic learning mocks, not real external APIs.
What to try next
- Remove
detect_risk_signalsfromALLOWED_TOOLSand verifytool_denied:*. - Add a non-existent tool to the plan and verify
tool_missing:*. - Reduce
max_plan_stepsto3and observe how often you getinvalid_plan:max_steps. - Change
GOALtomanager_id=7 (Max)and compare final synthesis. - Add cost/token guardrails to
Budgetand to the final JSON result.
Full code on GitHub
The repository contains the full runnable version of this example: planning, policy boundary, sequential step execution, and stop reasons.
View full code on GitHub β