RAG Agent — Python (vollständige Implementierung mit LLM)

Ausführbares RAG-Agent-Beispiel in Python im Production-Stil mit Retrieval Intent, Policy Boundary, Grounded Answer, Citation-Allowlist-Check und fallback ohne Halluzinationen.
Auf dieser Seite
  1. Kern des Musters (Kurz)
  2. Was dieses Beispiel zeigt
  3. Architektur
  4. Projektstruktur
  5. Ausführen
  6. Aufgabe
  7. Lösung
  8. Code
  9. kb.py — lokale Wissensbasis
  10. retriever.py — deterministische Suche und Kontext-Packing
  11. gateway.py — Policy Boundary für Retrieval
  12. llm.py — retrieval planning + grounded answer
  13. main.py — Plan -> Retrieve -> Ground -> Answer
  14. requirements.txt
  15. Beispielausgabe
  16. Typische stop_reason-Werte
  17. Was hier NICHT gezeigt wird
  18. Was du als Nächstes ausprobieren kannst

Kern des Musters (Kurz)

RAG Agent ist ein Muster, bei dem der Agent zuerst relevante Wissensfragmente findet und erst danach darauf basierend eine Antwort formuliert.

Das LLM entscheidet, wonach gesucht wird (retrieval intent), und die Policy/Execution-Layer kontrolliert wie sicher gesucht wird (Source-Allowlist, Kontextlimits, fallback).


Was dieses Beispiel zeigt

  • Retrieval-Planning-Schritt (kind="retrieve") vor der Antwortgenerierung
  • Policy Boundary zur Validierung von Retrieval Intent (query, top_k, sources)
  • Execution Boundary für Runtime-Source-Allowlist
  • deterministic retriever + context packing (min_score, max_chunks, max_chars)
  • fallback ohne Erfinden, wenn grounded context leer ist
  • Antwortgenerierung nur auf Basis von Kontext + Zitationsvalidierung
  • explizite stop_reason, trace, history für Production-Monitoring

Architektur

  1. LLM gibt Retrieval Intent als JSON zurück.
  2. Policy Boundary validiert Intent-Shape und erlaubte Quellen.
  3. RetrievalGateway sucht nur in Runtime-erlaubten Quellen.
  4. Context Pack filtert schwache Fragmente (min_chunk_score) und hält Größenlimits ein.
  5. Wenn kein relevanter Kontext vorhanden ist, wird clarify/fallback zurückgegeben (ohne Halluzinationen).
  6. Wenn Kontext vorhanden ist, erzeugt LLM eine grounded answer und das System validiert citations.

Schlüsselvertrag: LLM schlägt Intent und Antworttext vor, aber die Policy/Execution-Schicht definiert, was verwendet werden darf und was als valides Ergebnis gilt.

Policy Allowlist definiert, was das Modell anfragen darf, und Execution Allowlist definiert, was Runtime aktuell tatsächlich ausführen darf.

stop_reason ist der technische Run-Status, während outcome das Business-Ergebnis ist (grounded_answer oder clarify).


Projektstruktur

TEXT
examples/
└── agent-patterns/
    └── rag-agent/
        └── python/
            ├── main.py           # Plan -> Retrieve -> Ground -> Answer
            ├── llm.py            # retrieval planner + grounded answer composer
            ├── gateway.py        # policy boundary: intent validation + source allowlist
            ├── retriever.py      # deterministic ranking + context pack
            ├── kb.py             # local knowledge base (documents + metadata)
            └── requirements.txt

Ausführen

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/rag-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ ist erforderlich.

Variante über export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Variante über .env (optional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Das ist die Shell-Variante (macOS/Linux). Unter Windows ist es einfacher, set-Variablen zu verwenden oder optional python-dotenv, um .env automatisch zu laden.


Aufgabe

Stell dir einen realen Support-Fall vor:

"What SLA applies to enterprise plan and what is P1 first response target?"

Der Agent darf nicht "aus dem Gedächtnis" antworten. Er muss:

  • relevante Policy-Dokumente finden
  • nur erlaubte Quellen verwenden
  • eine grounded answer mit citations formulieren
  • bei unzureichenden Fakten fallback zurückgeben statt zu erfinden

Lösung

In diesem Beispiel:

  • LLM plant Retrieval (query, top_k, sources optional)
  • Gateway validiert Intent und enforced die Execution-Allowlist
  • Retriever berechnet Relevanz und packt Kontext innerhalb der Budgets
  • der Generate-Schritt läuft nur bei ausreichendem Kontext
  • die finale Antwort durchläuft einen Citation-Allowlist-Check (alle Zitate müssen auf tatsächlich ausgewählte Chunks verweisen)

Code

kb.py — lokale Wissensbasis

PYTHON
from __future__ import annotations

from typing import Any

KB_DOCUMENTS: list[dict[str, Any]] = [
    {
        "id": "doc_sla_enterprise_v3",
        "source": "support_policy",
        "title": "Support Policy",
        "section": "Enterprise SLA",
        "updated_at": "2026-01-15",
        "text": (
            "Enterprise plan includes 99.95% monthly uptime SLA. "
            "For P1 incidents, first response target is 15 minutes, 24/7. "
            "For P2 incidents, first response target is 1 hour."
        ),
    },
    {
        "id": "doc_sla_standard_v2",
        "source": "support_policy",
        "title": "Support Policy",
        "section": "Standard SLA",
        "updated_at": "2025-11-10",
        "text": (
            "Standard plan includes 99.5% monthly uptime SLA. "
            "For P1 incidents, first response target is 1 hour during business hours."
        ),
    },
    {
        "id": "doc_security_incident_v2",
        "source": "security_policy",
        "title": "Security Incident Playbook",
        "section": "Escalation",
        "updated_at": "2026-01-20",
        "text": (
            "For enterprise customers, security-related P1 incidents require immediate escalation "
            "to the on-call incident commander and customer success lead."
        ),
    },
    {
        "id": "doc_refund_policy_v4",
        "source": "billing_policy",
        "title": "Billing and Refund Policy",
        "section": "Refund Eligibility",
        "updated_at": "2025-12-01",
        "text": (
            "Annual enterprise subscriptions may receive a prorated refund within 14 days "
            "under approved exception flow."
        ),
    },
    {
        "id": "doc_onboarding_checklist_v1",
        "source": "operations_notes",
        "title": "Enterprise Onboarding Checklist",
        "section": "Launch Prep",
        "updated_at": "2025-09-02",
        "text": (
            "Checklist for onboarding includes SSO setup, domain verification, and success plan kickoff."
        ),
    },
]

Was hier am wichtigsten ist (einfach erklärt)

  • Wissen ist als strukturierte Dokumente mit Metadaten dargestellt (id, source, updated_at).
  • Es gibt relevante und irrelevante Dokumente, um das echte Verhalten des Retrievers zu zeigen.

retriever.py — deterministische Suche und Kontext-Packing

PYTHON
from __future__ import annotations

import re
from typing import Any

STOPWORDS = {
    "the",
    "and",
    "for",
    "with",
    "that",
    "this",
    "from",
    "into",
    "what",
    "which",
    "when",
    "where",
    "have",
    "has",
    "plan",
    "does",
}



def _tokenize(text: str) -> list[str]:
    tokens = re.findall(r"[a-zA-Z0-9_]+", text.lower())
    return [token for token in tokens if len(token) > 2 and token not in STOPWORDS]



def _score_document(query_tokens: list[str], doc_text: str) -> float:
    if not query_tokens:
        return 0.0

    haystack = doc_text.lower()
    overlap = sum(1 for token in query_tokens if token in haystack)
    base = overlap / len(query_tokens)

    # Boost explicit SLA intent to prefer policy-grade docs.
    phrase_boost = 0.0
    if "sla" in haystack:
        phrase_boost += 0.15
    if "p1" in haystack and "response" in haystack:
        phrase_boost += 0.1

    return round(min(base + phrase_boost, 1.0), 4)



def retrieve_candidates(
    *,
    query: str,
    documents: list[dict[str, Any]],
    top_k: int,
    allowed_sources: set[str],
) -> list[dict[str, Any]]:
    query_tokens = _tokenize(query)
    scored: list[dict[str, Any]] = []

    for doc in documents:
        if doc.get("source") not in allowed_sources:
            continue

        text = str(doc.get("text", ""))
        score = _score_document(query_tokens, text)
        if score <= 0:
            continue

        scored.append(
            {
                "doc_id": doc["id"],
                "source": doc["source"],
                "title": doc["title"],
                "section": doc["section"],
                "updated_at": doc["updated_at"],
                "score": score,
                "text": text,
            }
        )

    scored.sort(key=lambda item: item["score"], reverse=True)
    return scored[:top_k]



def build_context_pack(
    *,
    candidates: list[dict[str, Any]],
    min_score: float,
    max_chunks: int,
    max_chars: int,
) -> dict[str, Any]:
    selected: list[dict[str, Any]] = []
    total_chars = 0
    rejected_low_score = 0

    for item in candidates:
        if item["score"] < min_score:
            rejected_low_score += 1
            continue

        text = item["text"].strip()
        next_size = len(text)
        if len(selected) >= max_chunks:
            break
        if total_chars + next_size > max_chars:
            continue

        selected.append(item)
        total_chars += next_size

    return {
        "chunks": selected,
        "total_chars": total_chars,
        "rejected_low_score": rejected_low_score,
    }

Was hier am wichtigsten ist (einfach erklärt)

  • Die Suche ist deterministisch und vorhersehbar (leicht testbar).
  • Context Pack schneidet Rauschen ab und hält technische Limits für stabile Generierung ein.

gateway.py — Policy Boundary für Retrieval

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from retriever import build_context_pack, retrieve_candidates


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_query_chars: int = 240
    max_top_k: int = 6
    max_context_chunks: int = 3
    max_context_chars: int = 2200
    min_chunk_score: float = 0.2
    max_seconds: int = 20



def validate_retrieval_intent(
    raw: Any,
    *,
    allowed_sources_policy: set[str],
    max_top_k: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_intent:not_object")

    if raw.get("kind") != "retrieve":
        raise StopRun("invalid_intent:kind")

    query = raw.get("query")
    if not isinstance(query, str) or not query.strip():
        raise StopRun("invalid_intent:query")

    top_k = raw.get("top_k", 4)
    if not isinstance(top_k, int) or not (1 <= top_k <= max_top_k):
        raise StopRun("invalid_intent:top_k")

    sources_raw = raw.get("sources")
    normalized_sources: list[str] = []
    if sources_raw is not None:
        if not isinstance(sources_raw, list) or not sources_raw:
            raise StopRun("invalid_intent:sources")
        for source in sources_raw:
            if not isinstance(source, str) or not source.strip():
                raise StopRun("invalid_intent:source_item")
            source_name = source.strip()
            if source_name not in allowed_sources_policy:
                raise StopRun(f"invalid_intent:source_not_allowed:{source_name}")
            normalized_sources.append(source_name)

    # Ignore unknown keys and keep only contract fields.
    payload = {
        "kind": "retrieve",
        "query": query.strip(),
        "top_k": top_k,
    }
    if normalized_sources:
        payload["sources"] = normalized_sources
    return payload


class RetrievalGateway:
    def __init__(
        self,
        *,
        documents: list[dict[str, Any]],
        budget: Budget,
        allow_execution_sources: set[str],
    ):
        self.documents = documents
        self.budget = budget
        self.allow_execution_sources = set(allow_execution_sources)

    def run(self, intent: dict[str, Any]) -> dict[str, Any]:
        query = intent["query"]
        if len(query) > self.budget.max_query_chars:
            raise StopRun("invalid_intent:query_too_long")

        requested_sources = set(intent.get("sources") or self.allow_execution_sources)
        denied = sorted(requested_sources - self.allow_execution_sources)
        if denied:
            raise StopRun(f"source_denied:{denied[0]}")

        candidates = retrieve_candidates(
            query=query,
            documents=self.documents,
            top_k=intent["top_k"],
            allowed_sources=requested_sources,
        )

        context_pack = build_context_pack(
            candidates=candidates,
            min_score=self.budget.min_chunk_score,
            max_chunks=self.budget.max_context_chunks,
            max_chars=self.budget.max_context_chars,
        )

        return {
            "query": query,
            "requested_sources": sorted(requested_sources),
            "candidates": candidates,
            "context_chunks": context_pack["chunks"],
            "context_total_chars": context_pack["total_chars"],
            "rejected_low_score": context_pack["rejected_low_score"],
        }

Was hier am wichtigsten ist (einfach erklärt)

  • Gateway validiert den Intent-Vertrag und blockiert nicht erlaubte Quellen.
  • Unknown Keys werden ignoriert, wenn required Felder gültig sind.
  • Gateway enforced nur die Execution-Allowlist, die aus main.py übergeben wird.

llm.py — retrieval planning + grounded answer

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


RETRIEVAL_SYSTEM_PROMPT = """
You are a retrieval planner for a RAG system.
Return exactly one JSON object in this shape:
{
  "kind": "retrieve",
  "query": "short retrieval query",
  "top_k": 4
}

Optional key:
- "sources": ["support_policy", "security_policy"]

Rules:
- Use only sources from available_sources.
- Keep query compact and factual.
- top_k must be between 1 and 6.
- Prefer omitting "sources" unless the question explicitly requires a specific policy domain.
- Do not output markdown or extra keys.
""".strip()

ANSWER_SYSTEM_PROMPT = """
You are a support assistant.
Return exactly one JSON object with this shape:
{
  "answer": "grounded answer in English",
  "citations": ["doc_id_1", "doc_id_2"]
}

Rules:
- Use only facts from provided context_chunks.
- Keep the answer concise and actionable.
- Include at least one citation.
- All citations must be doc_ids from context_chunks.
- Do not output markdown or extra keys.
""".strip()



def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)



def plan_retrieval_intent(*, question: str, available_sources: list[str]) -> dict[str, Any]:
    payload = {
        "question": question,
        "available_sources": available_sources,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": RETRIEVAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}



def compose_grounded_answer(
    *,
    question: str,
    context_chunks: list[dict[str, Any]],
) -> dict[str, Any]:
    payload = {
        "question": question,
        "context_chunks": [
            {
                "doc_id": item.get("doc_id"),
                "title": item.get("title"),
                "section": item.get("section"),
                "updated_at": item.get("updated_at"),
                "text": item.get("text"),
            }
            for item in context_chunks
        ],
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": ANSWER_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")

    answer = data.get("answer")
    citations = data.get("citations")

    if not isinstance(answer, str):
        raise LLMInvalid("llm_invalid_schema")
    if not answer.strip():
        raise LLMEmpty("llm_empty")

    if not isinstance(citations, list):
        raise LLMInvalid("llm_invalid_schema")

    normalized_citations: list[str] = []
    for item in citations:
        if not isinstance(item, str):
            raise LLMInvalid("llm_invalid_schema")
        value = item.strip()
        if value:
            normalized_citations.append(value)

    return {
        "answer": answer.strip(),
        "citations": normalized_citations,
    }

Was hier am wichtigsten ist (einfach erklärt)

  • LLM plant Retrieval separat und erzeugt grounded answer separat.
  • Beide Schritte laufen über einen JSON-Vertrag, nicht über freien Text.

main.py — Plan -> Retrieve -> Ground -> Answer

PYTHON
from __future__ import annotations

import json
import time
from typing import Any

from gateway import Budget, RetrievalGateway, StopRun, validate_retrieval_intent
from kb import KB_DOCUMENTS
from llm import LLMEmpty, LLMInvalid, LLMTimeout, compose_grounded_answer, plan_retrieval_intent

QUESTION = "What SLA applies to enterprise plan and what is P1 first response target?"

BUDGET = Budget(
    max_query_chars=240,
    max_top_k=6,
    max_context_chunks=3,
    max_context_chars=2200,
    min_chunk_score=0.2,
    max_seconds=20,
)

ALLOWED_SOURCES_POLICY = {
    "support_policy",
    "security_policy",
    "billing_policy",
}

SECURITY_SOURCE_RUNTIME_ENABLED = True
ALLOWED_SOURCES_EXECUTION = (
    {"support_policy", "security_policy", "billing_policy"}
    if SECURITY_SOURCE_RUNTIME_ENABLED
    else {"support_policy", "billing_policy"}
)
# Set SECURITY_SOURCE_RUNTIME_ENABLED=False to observe source_denied:security_policy.



def _shorten(text: str, *, limit: int = 280) -> str:
    text = (text or "").strip()
    if len(text) <= limit:
        return text
    return text[: limit - 3].rstrip() + "..."



def _validate_citations_from_context(
    context_chunks: list[dict[str, Any]],
    citations: list[str],
) -> tuple[list[str], list[dict[str, Any]], list[str], list[str]]:
    by_id: dict[str, dict[str, Any]] = {
        str(chunk["doc_id"]): chunk
        for chunk in context_chunks
        if chunk.get("doc_id")
    }

    normalized: list[str] = []
    seen: set[str] = set()
    for citation in citations:
        value = str(citation).strip()
        if not value or value in seen:
            continue
        seen.add(value)
        normalized.append(value)

    invalid = sorted([doc_id for doc_id in normalized if doc_id not in by_id])

    valid_doc_ids: list[str] = []
    citation_details: list[dict[str, Any]] = []
    for doc_id in normalized:
        chunk = by_id.get(doc_id)
        if not chunk:
            continue
        valid_doc_ids.append(doc_id)
        citation_details.append(
            {
                "doc_id": chunk["doc_id"],
                "title": chunk["title"],
                "section": chunk["section"],
                "updated_at": chunk["updated_at"],
                "source": chunk["source"],
                "score": chunk["score"],
            }
        )

    return valid_doc_ids, citation_details, invalid, sorted(by_id.keys())



def run_rag(question: str) -> dict[str, Any]:
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = RetrievalGateway(
        documents=KB_DOCUMENTS,
        budget=BUDGET,
        allow_execution_sources=ALLOWED_SOURCES_EXECUTION,
    )

    try:
        raw_intent = plan_retrieval_intent(
            question=question,
            available_sources=sorted(ALLOWED_SOURCES_POLICY),
        )
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        intent = validate_retrieval_intent(
            raw_intent,
            allowed_sources_policy=ALLOWED_SOURCES_POLICY,
            max_top_k=BUDGET.max_top_k,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "plan",
            "raw_intent": raw_intent,
            "trace": trace,
            "history": history,
        }

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "retrieve",
            "trace": trace,
            "history": history,
        }

    try:
        retrieval = gateway.run(intent)
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "retrieve",
            "intent": intent,
            "trace": trace,
            "history": history,
        }

    trace.append(
        {
            "step": 1,
            "phase": "retrieve",
            "query": retrieval["query"],
            "requested_sources": retrieval["requested_sources"],
            "candidates": len(retrieval["candidates"]),
            "context_chunks": len(retrieval["context_chunks"]),
            "rejected_low_score": retrieval["rejected_low_score"],
            "ok": True,
        }
    )

    history.append(
        {
            "step": 1,
            "intent": intent,
            "retrieval": {
                "candidates": [
                    {
                        "doc_id": item["doc_id"],
                        "source": item["source"],
                        "score": item["score"],
                    }
                    for item in retrieval["candidates"]
                ],
                "context_chunks": [item["doc_id"] for item in retrieval["context_chunks"]],
            },
        }
    )

    if not retrieval["context_chunks"]:
        fallback_answer = (
            "I could not find enough grounded evidence in approved sources. "
            "Please clarify the plan (enterprise/standard) or provide a policy document link."
        )
        trace.append(
            {
                "step": 2,
                "phase": "fallback",
                "reason": "no_grounded_context",
                "ok": True,
            }
        )
        history.append(
            {
                "step": 2,
                "action": "fallback",
                "answer": fallback_answer,
            }
        )
        return {
            "status": "ok",
            "stop_reason": "success",
            "outcome": "clarify",
            "answer": fallback_answer,
            "citations": [],
            "citation_details": [],
            "trace": trace,
            "history": history,
        }

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }

    try:
        final = compose_grounded_answer(
            question=question,
            context_chunks=retrieval["context_chunks"],
        )
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }
    except LLMInvalid as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.args[0],
            "phase": "generate",
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }

    citations, citation_details, invalid_citations, context_doc_ids = _validate_citations_from_context(
        retrieval["context_chunks"],
        final["citations"],
    )
    if invalid_citations:
        return {
            "status": "stopped",
            "stop_reason": "invalid_answer:citations_out_of_context",
            "phase": "generate",
            "invalid_citations": invalid_citations,
            "context_doc_ids": context_doc_ids,
            "trace": trace,
            "history": history,
        }
    if len(citations) < 1:
        return {
            "status": "stopped",
            "stop_reason": "invalid_answer:missing_citations",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }

    trace.append(
        {
            "step": 2,
            "phase": "generate",
            "citation_count": len(citations),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 2,
            "action": "compose_grounded_answer",
            "answer": _shorten(final["answer"]),
            "citations": citations,
        }
    )

    return {
        "status": "ok",
        "stop_reason": "success",
        "outcome": "grounded_answer",
        "answer": final["answer"],
        "citations": citations,
        "citation_details": citation_details,
        "trace": trace,
        "history": history,
    }



def main() -> None:
    result = run_rag(QUESTION)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Was hier am wichtigsten ist (einfach erklärt)

  • ALLOWED_SOURCES_POLICY und ALLOWED_SOURCES_EXECUTION werden in main.py definiert.
  • Gateway enforced die Execution-Allowlist und kennt keinen Business-Policy-Kontext.
  • _validate_citations_from_context(...) gibt 4 Werte zurück: valide doc_id, citation_details, invalid_citations, context_doc_ids (für das Debugging von Policy-Stops).
  • Wenn keine belastbare Evidenz vorliegt, wird outcome="clarify" zurückgegeben und keine erfundene Antwort.

requirements.txt

TEXT
openai==2.21.0

Beispielausgabe

Unten ist ein Beispiel für einen erfolgreichen grounded run, in dem der Agent nur auf Basis gefundener Dokumente antwortet.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "outcome": "grounded_answer",
  "answer": "The Enterprise plan includes a 99.95% monthly uptime SLA. For P1 incidents, the first response target is 15 minutes, available 24/7.",
  "citations": ["doc_sla_enterprise_v3"],
  "citation_details": [
    {
      "doc_id": "doc_sla_enterprise_v3",
      "title": "Support Policy",
      "section": "Enterprise SLA",
      "updated_at": "2026-01-15",
      "source": "support_policy",
      "score": 1.0
    }
  ],
  "trace": [
    {
      "step": 1,
      "phase": "retrieve",
      "query": "SLA for enterprise plan and P1 first response target",
      "requested_sources": ["support_policy"],
      "candidates": 2,
      "context_chunks": 2,
      "rejected_low_score": 0,
      "ok": true
    },
    {
      "step": 2,
      "phase": "generate",
      "citation_count": 1,
      "ok": true
    }
  ],
  "history": [
    {
      "step": 1,
      "intent": {"kind": "retrieve", "query": "SLA for enterprise plan and P1 first response target", "top_k": 4, "sources": ["support_policy"]},
      "retrieval": {
        "candidates": [
          {"doc_id": "doc_sla_enterprise_v3", "source": "support_policy", "score": 1.0},
          {"doc_id": "doc_sla_standard_v2", "source": "support_policy", "score": 1.0}
        ],
        "context_chunks": ["doc_sla_enterprise_v3", "doc_sla_standard_v2"]
      }
    },
    {
      "step": 2,
      "action": "compose_grounded_answer",
      "answer": "The Enterprise plan includes a 99.95% monthly uptime SLA. For P1 incidents, the first response target is 15 minutes, available 24/7.",
      "citations": ["doc_sla_enterprise_v3"]
    }
  ]
}

Dies ist ein gekürztes Beispiel: Ein Teil verschachtelter Felder ist kompakt in einer Zeile dargestellt, ohne die Bedeutung zu ändern.


Typische stop_reason-Werte

  • success — Run wurde korrekt abgeschlossen; siehe outcome (grounded_answer oder clarify)
  • invalid_intent:* — Retrieval Intent vom LLM hat die Policy-Validierung nicht bestanden
  • source_denied:<name> — Quelle ist durch die Execution-Allowlist nicht erlaubt
  • llm_timeout — LLM hat innerhalb von OPENAI_TIMEOUT_SECONDS nicht geantwortet
  • llm_empty — Generate-Schritt hat leere answer zurückgegeben
  • llm_invalid_json — Generate-Schritt hat ungültiges JSON zurückgegeben
  • llm_invalid_schema — JSON entspricht nicht dem erwarteten Schema (answer/citations)
  • invalid_answer:missing_citations — Antwort ist durch keine valide Zitation belegt
  • invalid_answer:citations_out_of_context — Antwort enthält Zitationen, die nicht unter den Retrieval-Context-Chunks sind
  • max_seconds — gesamtes Run-Zeitbudget überschritten

Was hier NICHT gezeigt wird

  • Kein Vektorindex/Embeddings und keine hybride Suche.
  • Keine Multi-Tenant-Auth/ACL auf Dokumentebene.
  • Kein Reranker-Modell und keine semantische Deduplikation.
  • Keine Online-Aktualisierung des Index bei Änderungen in der Knowledge Base.

Was du als Nächstes ausprobieren kannst

  1. Setze SECURITY_SOURCE_RUNTIME_ENABLED=False und frage nach security_policy, um source_denied:* zu sehen.
  2. Erhöhe min_chunk_score, um häufiger outcome="clarify" ohne Halluzinationen zu sehen.
  3. Füge einen Post-Check hinzu, der Schlüsselzahlen in der Antwort mit dem Text zitierter Dokumente vergleicht.
⏱️ 15 Min. LesezeitAktualisiert Mär, 2026Schwierigkeit: ★★☆
Integriert: Production ControlOnceOnly
Guardrails für Tool-Calling-Agents
Shippe dieses Pattern mit Governance:
  • Budgets (Steps / Spend Caps)
  • Tool-Permissions (Allowlist / Blocklist)
  • Kill switch & Incident Stop
  • Idempotenz & Dedupe
  • Audit logs & Nachvollziehbarkeit
Integrierter Hinweis: OnceOnly ist eine Control-Layer für Production-Agent-Systeme.
Autor

Diese Dokumentation wird von Engineers kuratiert und gepflegt, die AI-Agenten in der Produktion betreiben.

Die Inhalte sind KI-gestützt, mit menschlicher redaktioneller Verantwortung für Genauigkeit, Klarheit und Produktionsrelevanz.

Patterns und Empfehlungen basieren auf Post-Mortems, Failure-Modes und operativen Incidents in produktiven Systemen, auch bei der Entwicklung und dem Betrieb von Governance-Infrastruktur für Agenten bei OnceOnly.