RAG Agent — Python (повна реалізація з LLM)

Production-style runnable приклад RAG агента на Python з retrieval intent, policy boundary, grounded answer, citation allowlist check і fallback без галюцинацій.
На цій сторінці
  1. Суть патерна (коротко)
  2. Що демонструє цей приклад
  3. Архітектура
  4. Структура проєкту
  5. Як запустити
  6. Задача
  7. Рішення
  8. Код
  9. kb.py — локальна база знань
  10. retriever.py — детермінований пошук і пакування контексту
  11. gateway.py — policy boundary для retrieval
  12. llm.py — retrieval planning + grounded answer
  13. main.py — Plan -> Retrieve -> Ground -> Answer
  14. requirements.txt
  15. Приклад виводу
  16. Типові stop_reason
  17. Що тут НЕ показано
  18. Що спробувати далі

Суть патерна (коротко)

RAG Agent — це патерн, у якому агент спочатку знаходить релевантні фрагменти знань, а вже потім формує відповідь на їх основі.

LLM вирішує, що шукати (retrieval intent), а policy/execution layer контролює як шукати безпечно (allowlist джерел, ліміти контексту, fallback).


Що демонструє цей приклад

  • retrieval planning крок (kind="retrieve") перед генерацією відповіді
  • policy boundary для перевірки retrieval intent (query, top_k, sources)
  • execution boundary для runtime allowlist джерел
  • deterministic retriever + context packing (min_score, max_chunks, max_chars)
  • fallback без вигадування, якщо grounded context порожній
  • генерацію відповіді тільки на основі контексту + валідацію цитувань
  • явні stop_reason, trace, history для продакшен-моніторингу

Архітектура

  1. LLM повертає retrieval intent у JSON.
  2. Policy boundary валідовує shape intent і допустимі джерела.
  3. RetrievalGateway виконує пошук тільки в runtime-дозволених джерелах.
  4. Context pack відсікає слабкі фрагменти (min_chunk_score) і тримає ліміти розміру.
  5. Якщо релевантного контексту немає, повертається clarify/fallback (без галюцинацій).
  6. Якщо контекст є, LLM генерує grounded answer, а система перевіряє citations.

Ключовий контракт: LLM пропонує intent і текст відповіді, але policy/execution шар визначає, що можна використовувати та що вважається валідним результатом.

Policy allowlist визначає, що модель може запитати, а execution allowlist визначає, що runtime реально дозволяє виконати зараз.

stop_reason — технічний статус виконання run-а, а outcome — бізнес-результат (grounded_answer або clarify).


Структура проєкту

TEXT
examples/
└── agent-patterns/
    └── rag-agent/
        └── python/
            ├── main.py           # Plan -> Retrieve -> Ground -> Answer
            ├── llm.py            # retrieval planner + grounded answer composer
            ├── gateway.py        # policy boundary: intent validation + source allowlist
            ├── retriever.py      # deterministic ranking + context pack
            ├── kb.py             # local knowledge base (documents + metadata)
            └── requirements.txt

Як запустити

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/rag-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Потрібен Python 3.11+.

Варіант через export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Варіант через .env (опційно)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Це shell-варіант (macOS/Linux). На Windows простіше використовувати set змінних або, за бажанням, python-dotenv, щоб підвантажувати .env автоматично.


Задача

Уяви реальний support-кейс:

"What SLA applies to enterprise plan and what is P1 first response target?"

Агент не має відповідати "з пам'яті". Він має:

  • знайти релевантні policy-документи
  • використати тільки дозволені джерела
  • сформувати grounded answer з citations
  • при нестачі фактів повернути fallback, а не вигадувати

Рішення

У цьому прикладі:

  • LLM планує retrieval (query, top_k, sources опційно)
  • gateway валідовує intent і enforce-ить execution allowlist
  • retriever рахує релевантність і пакує контекст у межах бюджетів
  • generate-крок відбувається тільки якщо є достатній контекст
  • фінальна відповідь проходить citation allowlist check (усі цитати мають посилатись на реально відібрані chunk-и)

Код

kb.py — локальна база знань

PYTHON
from __future__ import annotations

from typing import Any

KB_DOCUMENTS: list[dict[str, Any]] = [
    {
        "id": "doc_sla_enterprise_v3",
        "source": "support_policy",
        "title": "Support Policy",
        "section": "Enterprise SLA",
        "updated_at": "2026-01-15",
        "text": (
            "Enterprise plan includes 99.95% monthly uptime SLA. "
            "For P1 incidents, first response target is 15 minutes, 24/7. "
            "For P2 incidents, first response target is 1 hour."
        ),
    },
    {
        "id": "doc_sla_standard_v2",
        "source": "support_policy",
        "title": "Support Policy",
        "section": "Standard SLA",
        "updated_at": "2025-11-10",
        "text": (
            "Standard plan includes 99.5% monthly uptime SLA. "
            "For P1 incidents, first response target is 1 hour during business hours."
        ),
    },
    {
        "id": "doc_security_incident_v2",
        "source": "security_policy",
        "title": "Security Incident Playbook",
        "section": "Escalation",
        "updated_at": "2026-01-20",
        "text": (
            "For enterprise customers, security-related P1 incidents require immediate escalation "
            "to the on-call incident commander and customer success lead."
        ),
    },
    {
        "id": "doc_refund_policy_v4",
        "source": "billing_policy",
        "title": "Billing and Refund Policy",
        "section": "Refund Eligibility",
        "updated_at": "2025-12-01",
        "text": (
            "Annual enterprise subscriptions may receive a prorated refund within 14 days "
            "under approved exception flow."
        ),
    },
    {
        "id": "doc_onboarding_checklist_v1",
        "source": "operations_notes",
        "title": "Enterprise Onboarding Checklist",
        "section": "Launch Prep",
        "updated_at": "2025-09-02",
        "text": (
            "Checklist for onboarding includes SSO setup, domain verification, and success plan kickoff."
        ),
    },
]

Що тут найважливіше (простими словами)

  • Знання подані як структуровані документи з metadata (id, source, updated_at).
  • Є релевантні та нерелевантні документи, щоб показати справжню роботу retriever-а.

retriever.py — детермінований пошук і пакування контексту

PYTHON
from __future__ import annotations

import re
from typing import Any

STOPWORDS = {
    "the",
    "and",
    "for",
    "with",
    "that",
    "this",
    "from",
    "into",
    "what",
    "which",
    "when",
    "where",
    "have",
    "has",
    "plan",
    "does",
}



def _tokenize(text: str) -> list[str]:
    tokens = re.findall(r"[a-zA-Z0-9_]+", text.lower())
    return [token for token in tokens if len(token) > 2 and token not in STOPWORDS]



def _score_document(query_tokens: list[str], doc_text: str) -> float:
    if not query_tokens:
        return 0.0

    haystack = doc_text.lower()
    overlap = sum(1 for token in query_tokens if token in haystack)
    base = overlap / len(query_tokens)

    # Boost explicit SLA intent to prefer policy-grade docs.
    phrase_boost = 0.0
    if "sla" in haystack:
        phrase_boost += 0.15
    if "p1" in haystack and "response" in haystack:
        phrase_boost += 0.1

    return round(min(base + phrase_boost, 1.0), 4)



def retrieve_candidates(
    *,
    query: str,
    documents: list[dict[str, Any]],
    top_k: int,
    allowed_sources: set[str],
) -> list[dict[str, Any]]:
    query_tokens = _tokenize(query)
    scored: list[dict[str, Any]] = []

    for doc in documents:
        if doc.get("source") not in allowed_sources:
            continue

        text = str(doc.get("text", ""))
        score = _score_document(query_tokens, text)
        if score <= 0:
            continue

        scored.append(
            {
                "doc_id": doc["id"],
                "source": doc["source"],
                "title": doc["title"],
                "section": doc["section"],
                "updated_at": doc["updated_at"],
                "score": score,
                "text": text,
            }
        )

    scored.sort(key=lambda item: item["score"], reverse=True)
    return scored[:top_k]



def build_context_pack(
    *,
    candidates: list[dict[str, Any]],
    min_score: float,
    max_chunks: int,
    max_chars: int,
) -> dict[str, Any]:
    selected: list[dict[str, Any]] = []
    total_chars = 0
    rejected_low_score = 0

    for item in candidates:
        if item["score"] < min_score:
            rejected_low_score += 1
            continue

        text = item["text"].strip()
        next_size = len(text)
        if len(selected) >= max_chunks:
            break
        if total_chars + next_size > max_chars:
            continue

        selected.append(item)
        total_chars += next_size

    return {
        "chunks": selected,
        "total_chars": total_chars,
        "rejected_low_score": rejected_low_score,
    }

Що тут найважливіше (простими словами)

  • Пошук детермінований і прогнозований (легко тестувати).
  • Context pack обрізає шум і тримає технічні ліміти для стабільної генерації.

gateway.py — policy boundary для retrieval

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from retriever import build_context_pack, retrieve_candidates


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_query_chars: int = 240
    max_top_k: int = 6
    max_context_chunks: int = 3
    max_context_chars: int = 2200
    min_chunk_score: float = 0.2
    max_seconds: int = 20



def validate_retrieval_intent(
    raw: Any,
    *,
    allowed_sources_policy: set[str],
    max_top_k: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_intent:not_object")

    if raw.get("kind") != "retrieve":
        raise StopRun("invalid_intent:kind")

    query = raw.get("query")
    if not isinstance(query, str) or not query.strip():
        raise StopRun("invalid_intent:query")

    top_k = raw.get("top_k", 4)
    if not isinstance(top_k, int) or not (1 <= top_k <= max_top_k):
        raise StopRun("invalid_intent:top_k")

    sources_raw = raw.get("sources")
    normalized_sources: list[str] = []
    if sources_raw is not None:
        if not isinstance(sources_raw, list) or not sources_raw:
            raise StopRun("invalid_intent:sources")
        for source in sources_raw:
            if not isinstance(source, str) or not source.strip():
                raise StopRun("invalid_intent:source_item")
            source_name = source.strip()
            if source_name not in allowed_sources_policy:
                raise StopRun(f"invalid_intent:source_not_allowed:{source_name}")
            normalized_sources.append(source_name)

    # Ignore unknown keys and keep only contract fields.
    payload = {
        "kind": "retrieve",
        "query": query.strip(),
        "top_k": top_k,
    }
    if normalized_sources:
        payload["sources"] = normalized_sources
    return payload


class RetrievalGateway:
    def __init__(
        self,
        *,
        documents: list[dict[str, Any]],
        budget: Budget,
        allow_execution_sources: set[str],
    ):
        self.documents = documents
        self.budget = budget
        self.allow_execution_sources = set(allow_execution_sources)

    def run(self, intent: dict[str, Any]) -> dict[str, Any]:
        query = intent["query"]
        if len(query) > self.budget.max_query_chars:
            raise StopRun("invalid_intent:query_too_long")

        requested_sources = set(intent.get("sources") or self.allow_execution_sources)
        denied = sorted(requested_sources - self.allow_execution_sources)
        if denied:
            raise StopRun(f"source_denied:{denied[0]}")

        candidates = retrieve_candidates(
            query=query,
            documents=self.documents,
            top_k=intent["top_k"],
            allowed_sources=requested_sources,
        )

        context_pack = build_context_pack(
            candidates=candidates,
            min_score=self.budget.min_chunk_score,
            max_chunks=self.budget.max_context_chunks,
            max_chars=self.budget.max_context_chars,
        )

        return {
            "query": query,
            "requested_sources": sorted(requested_sources),
            "candidates": candidates,
            "context_chunks": context_pack["chunks"],
            "context_total_chars": context_pack["total_chars"],
            "rejected_low_score": context_pack["rejected_low_score"],
        }

Що тут найважливіше (простими словами)

  • Gateway валідовує intent-контракт і блокує недозволені джерела.
  • Unknown keys ігноруються, якщо required поля валідні.
  • Gateway тільки enforce-ить execution allowlist, який передається з main.py.

llm.py — retrieval planning + grounded answer

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


RETRIEVAL_SYSTEM_PROMPT = """
You are a retrieval planner for a RAG system.
Return exactly one JSON object in this shape:
{
  "kind": "retrieve",
  "query": "short retrieval query",
  "top_k": 4
}

Optional key:
- "sources": ["support_policy", "security_policy"]

Rules:
- Use only sources from available_sources.
- Keep query compact and factual.
- top_k must be between 1 and 6.
- Prefer omitting "sources" unless the question explicitly requires a specific policy domain.
- Do not output markdown or extra keys.
""".strip()

ANSWER_SYSTEM_PROMPT = """
You are a support assistant.
Return exactly one JSON object with this shape:
{
  "answer": "grounded answer in English",
  "citations": ["doc_id_1", "doc_id_2"]
}

Rules:
- Use only facts from provided context_chunks.
- Keep the answer concise and actionable.
- Include at least one citation.
- All citations must be doc_ids from context_chunks.
- Do not output markdown or extra keys.
""".strip()



def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)



def plan_retrieval_intent(*, question: str, available_sources: list[str]) -> dict[str, Any]:
    payload = {
        "question": question,
        "available_sources": available_sources,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": RETRIEVAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}



def compose_grounded_answer(
    *,
    question: str,
    context_chunks: list[dict[str, Any]],
) -> dict[str, Any]:
    payload = {
        "question": question,
        "context_chunks": [
            {
                "doc_id": item.get("doc_id"),
                "title": item.get("title"),
                "section": item.get("section"),
                "updated_at": item.get("updated_at"),
                "text": item.get("text"),
            }
            for item in context_chunks
        ],
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": ANSWER_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")

    answer = data.get("answer")
    citations = data.get("citations")

    if not isinstance(answer, str):
        raise LLMInvalid("llm_invalid_schema")
    if not answer.strip():
        raise LLMEmpty("llm_empty")

    if not isinstance(citations, list):
        raise LLMInvalid("llm_invalid_schema")

    normalized_citations: list[str] = []
    for item in citations:
        if not isinstance(item, str):
            raise LLMInvalid("llm_invalid_schema")
        value = item.strip()
        if value:
            normalized_citations.append(value)

    return {
        "answer": answer.strip(),
        "citations": normalized_citations,
    }

Що тут найважливіше (простими словами)

  • LLM окремо планує retrieval і окремо формує grounded answer.
  • Обидва кроки працюють через JSON-контракт, а не вільний текст.

main.py — Plan -> Retrieve -> Ground -> Answer

PYTHON
from __future__ import annotations

import json
import time
from typing import Any

from gateway import Budget, RetrievalGateway, StopRun, validate_retrieval_intent
from kb import KB_DOCUMENTS
from llm import LLMEmpty, LLMInvalid, LLMTimeout, compose_grounded_answer, plan_retrieval_intent

QUESTION = "What SLA applies to enterprise plan and what is P1 first response target?"

BUDGET = Budget(
    max_query_chars=240,
    max_top_k=6,
    max_context_chunks=3,
    max_context_chars=2200,
    min_chunk_score=0.2,
    max_seconds=20,
)

ALLOWED_SOURCES_POLICY = {
    "support_policy",
    "security_policy",
    "billing_policy",
}

SECURITY_SOURCE_RUNTIME_ENABLED = True
ALLOWED_SOURCES_EXECUTION = (
    {"support_policy", "security_policy", "billing_policy"}
    if SECURITY_SOURCE_RUNTIME_ENABLED
    else {"support_policy", "billing_policy"}
)
# Set SECURITY_SOURCE_RUNTIME_ENABLED=False to observe source_denied:security_policy.



def _shorten(text: str, *, limit: int = 280) -> str:
    text = (text or "").strip()
    if len(text) <= limit:
        return text
    return text[: limit - 3].rstrip() + "..."



def _validate_citations_from_context(
    context_chunks: list[dict[str, Any]],
    citations: list[str],
) -> tuple[list[str], list[dict[str, Any]], list[str], list[str]]:
    by_id: dict[str, dict[str, Any]] = {
        str(chunk["doc_id"]): chunk
        for chunk in context_chunks
        if chunk.get("doc_id")
    }

    normalized: list[str] = []
    seen: set[str] = set()
    for citation in citations:
        value = str(citation).strip()
        if not value or value in seen:
            continue
        seen.add(value)
        normalized.append(value)

    invalid = sorted([doc_id for doc_id in normalized if doc_id not in by_id])

    valid_doc_ids: list[str] = []
    citation_details: list[dict[str, Any]] = []
    for doc_id in normalized:
        chunk = by_id.get(doc_id)
        if not chunk:
            continue
        valid_doc_ids.append(doc_id)
        citation_details.append(
            {
                "doc_id": chunk["doc_id"],
                "title": chunk["title"],
                "section": chunk["section"],
                "updated_at": chunk["updated_at"],
                "source": chunk["source"],
                "score": chunk["score"],
            }
        )

    return valid_doc_ids, citation_details, invalid, sorted(by_id.keys())



def run_rag(question: str) -> dict[str, Any]:
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = RetrievalGateway(
        documents=KB_DOCUMENTS,
        budget=BUDGET,
        allow_execution_sources=ALLOWED_SOURCES_EXECUTION,
    )

    try:
        raw_intent = plan_retrieval_intent(
            question=question,
            available_sources=sorted(ALLOWED_SOURCES_POLICY),
        )
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        intent = validate_retrieval_intent(
            raw_intent,
            allowed_sources_policy=ALLOWED_SOURCES_POLICY,
            max_top_k=BUDGET.max_top_k,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "plan",
            "raw_intent": raw_intent,
            "trace": trace,
            "history": history,
        }

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "retrieve",
            "trace": trace,
            "history": history,
        }

    try:
        retrieval = gateway.run(intent)
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "retrieve",
            "intent": intent,
            "trace": trace,
            "history": history,
        }

    trace.append(
        {
            "step": 1,
            "phase": "retrieve",
            "query": retrieval["query"],
            "requested_sources": retrieval["requested_sources"],
            "candidates": len(retrieval["candidates"]),
            "context_chunks": len(retrieval["context_chunks"]),
            "rejected_low_score": retrieval["rejected_low_score"],
            "ok": True,
        }
    )

    history.append(
        {
            "step": 1,
            "intent": intent,
            "retrieval": {
                "candidates": [
                    {
                        "doc_id": item["doc_id"],
                        "source": item["source"],
                        "score": item["score"],
                    }
                    for item in retrieval["candidates"]
                ],
                "context_chunks": [item["doc_id"] for item in retrieval["context_chunks"]],
            },
        }
    )

    if not retrieval["context_chunks"]:
        fallback_answer = (
            "I could not find enough grounded evidence in approved sources. "
            "Please clarify the plan (enterprise/standard) or provide a policy document link."
        )
        trace.append(
            {
                "step": 2,
                "phase": "fallback",
                "reason": "no_grounded_context",
                "ok": True,
            }
        )
        history.append(
            {
                "step": 2,
                "action": "fallback",
                "answer": fallback_answer,
            }
        )
        return {
            "status": "ok",
            "stop_reason": "success",
            "outcome": "clarify",
            "answer": fallback_answer,
            "citations": [],
            "citation_details": [],
            "trace": trace,
            "history": history,
        }

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }

    try:
        final = compose_grounded_answer(
            question=question,
            context_chunks=retrieval["context_chunks"],
        )
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }
    except LLMInvalid as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.args[0],
            "phase": "generate",
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }

    citations, citation_details, invalid_citations, context_doc_ids = _validate_citations_from_context(
        retrieval["context_chunks"],
        final["citations"],
    )
    if invalid_citations:
        return {
            "status": "stopped",
            "stop_reason": "invalid_answer:citations_out_of_context",
            "phase": "generate",
            "invalid_citations": invalid_citations,
            "context_doc_ids": context_doc_ids,
            "trace": trace,
            "history": history,
        }
    if len(citations) < 1:
        return {
            "status": "stopped",
            "stop_reason": "invalid_answer:missing_citations",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }

    trace.append(
        {
            "step": 2,
            "phase": "generate",
            "citation_count": len(citations),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 2,
            "action": "compose_grounded_answer",
            "answer": _shorten(final["answer"]),
            "citations": citations,
        }
    )

    return {
        "status": "ok",
        "stop_reason": "success",
        "outcome": "grounded_answer",
        "answer": final["answer"],
        "citations": citations,
        "citation_details": citation_details,
        "trace": trace,
        "history": history,
    }



def main() -> None:
    result = run_rag(QUESTION)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Що тут найважливіше (простими словами)

  • ALLOWED_SOURCES_POLICY і ALLOWED_SOURCES_EXECUTION задаються в main.py.
  • Gateway enforce-ить execution allowlist і не знає policy-контексту бізнес-рівня.
  • _validate_citations_from_context(...) повертає 4 значення: валідні doc_id, citation_details, invalid_citations, context_doc_ids (для дебагу policy-стопу).
  • Якщо доказової бази немає, повертається outcome="clarify", а не вигадана відповідь.

requirements.txt

TEXT
openai==2.21.0

Приклад виводу

Нижче приклад успішного grounded-run, де агент відповідає тільки на основі знайдених документів.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "outcome": "grounded_answer",
  "answer": "The Enterprise plan includes a 99.95% monthly uptime SLA. For P1 incidents, the first response target is 15 minutes, available 24/7.",
  "citations": ["doc_sla_enterprise_v3"],
  "citation_details": [
    {
      "doc_id": "doc_sla_enterprise_v3",
      "title": "Support Policy",
      "section": "Enterprise SLA",
      "updated_at": "2026-01-15",
      "source": "support_policy",
      "score": 1.0
    }
  ],
  "trace": [
    {
      "step": 1,
      "phase": "retrieve",
      "query": "SLA for enterprise plan and P1 first response target",
      "requested_sources": ["support_policy"],
      "candidates": 2,
      "context_chunks": 2,
      "rejected_low_score": 0,
      "ok": true
    },
    {
      "step": 2,
      "phase": "generate",
      "citation_count": 1,
      "ok": true
    }
  ],
  "history": [
    {
      "step": 1,
      "intent": {"kind": "retrieve", "query": "SLA for enterprise plan and P1 first response target", "top_k": 4, "sources": ["support_policy"]},
      "retrieval": {
        "candidates": [
          {"doc_id": "doc_sla_enterprise_v3", "source": "support_policy", "score": 1.0},
          {"doc_id": "doc_sla_standard_v2", "source": "support_policy", "score": 1.0}
        ],
        "context_chunks": ["doc_sla_enterprise_v3", "doc_sla_standard_v2"]
      }
    },
    {
      "step": 2,
      "action": "compose_grounded_answer",
      "answer": "The Enterprise plan includes a 99.95% monthly uptime SLA. For P1 incidents, the first response target is 15 minutes, available 24/7.",
      "citations": ["doc_sla_enterprise_v3"]
    }
  ]
}

Це скорочений приклад: частину вкладених полів подано компактно в один рядок без зміни змісту.


Типові stop_reason

  • success — run завершено коректно; дивись outcome (grounded_answer або clarify)
  • invalid_intent:* — retrieval intent від LLM не пройшов policy validation
  • source_denied:<name> — джерело не дозволене execution allowlist-ом
  • llm_timeout — LLM не відповів у межах OPENAI_TIMEOUT_SECONDS
  • llm_empty — generate-крок повернув порожній answer
  • llm_invalid_json — generate-крок повернув невалідний JSON
  • llm_invalid_schema — JSON не відповідає очікуваному schema (answer/citations)
  • invalid_answer:missing_citations — відповідь не підтверджена жодною валідною цитатою
  • invalid_answer:citations_out_of_context — у відповіді є цитати, яких немає серед retrieval context chunks
  • max_seconds — перевищено загальний time budget run

Що тут НЕ показано

  • Немає векторного індексу/embeddings і гібридного search.
  • Немає multi-tenant auth/ACL на рівні документів.
  • Немає reranker-моделі та semantic deduplication.
  • Немає онлайн-оновлення індексу при змінах у knowledge base.

Що спробувати далі

  1. Постав SECURITY_SOURCE_RUNTIME_ENABLED=False і попроси security_policy, щоб побачити source_denied:*.
  2. Збільш min_chunk_score, щоб подивитись частіші outcome="clarify" без галюцинацій.
  3. Додай post-check, який порівнює ключові числа у відповіді з текстом цитованих документів.
⏱️ 15 хв читанняОновлено Бер, 2026Складність: ★★☆
Інтегровано: продакшен-контрольOnceOnly
Додай guardrails до агентів з tool-calling
Зашип цей патерн з governance:
  • Бюджетами (кроки / ліміти витрат)
  • Дозволами на інструменти (allowlist / blocklist)
  • Kill switch та аварійна зупинка
  • Ідемпотентність і dedupe
  • Audit logs та трасування
Інтегрована згадка: OnceOnly — контрольний шар для продакшен агент-систем.
Автор

Цю документацію курують і підтримують інженери, які запускають AI-агентів у продакшені.

Контент створено з допомогою AI, із людською редакторською відповідальністю за точність, ясність і продакшн-релевантність.

Патерни та рекомендації базуються на постмортемах, режимах відмов і операційних інцидентах у розгорнутих системах, зокрема під час розробки та експлуатації governance-інфраструктури для агентів у OnceOnly.