Memory-Augmented Agent — Python (implementation complete avec LLM)

Exemple executable d agent Memory-Augmented en Python, style production, avec flow capture/store/retrieve/apply, allowlist policy vs execution, cycle de vie TTL de la memoire et stop reasons explicites.
Sur cette page
  1. Essence du pattern (bref)
  2. Ce que cet exemple demontre
  3. Architecture
  4. Structure du projet
  5. Lancer le projet
  6. Tache
  7. Solution
  8. Code
  9. memory_store.py — memoire TTL et scoring de retrieval
  10. gateway.py — policy/execution boundary pour la memoire
  11. llm.py — extract, retrieve-intent, apply
  12. main.py — Session1 Capture/Store -> Session2 Retrieve/Apply
  13. Exemple de sortie
  14. Valeurs stop_reason typiques
  15. Ce qui n est PAS montre
  16. Ce que vous pouvez essayer ensuite

Essence du pattern (bref)

Memory-Augmented Agent est un pattern dans lequel l agent stocke des faits importants entre les sessions et les reutilise dans les reponses suivantes.

Le LLM propose quoi memoriser et quoi recuperer, et la couche memory policy/execution controle ce qui peut reellement etre ecrit/lu.


Ce que cet exemple demontre

  • deux phases (Session 1/2) dans un seul run ; la persistance entre processus n est pas montree
  • extraction memoire via LLM avec contrat JSON (items[])
  • policy boundary pour les contrats memory write/retrieve
  • execution boundary (runtime allowlist) pour les cles et scopes memoire
  • runtime trust-gating : execution peut bloquer des cles sensibles meme si policy les autorise
  • dans cette demo, policy allowlist fonctionne en hard fail pour detecter vite le drift et les erreurs de contrat
  • cycle de vie TTL memoire et store in-memory borne
  • reponse finale avec verification de used_memory_keys contre la memoire reellement recuperee
  • stop_reason, trace, history explicites pour le monitoring production

Architecture

  1. Le LLM extrait des memory candidates depuis le message utilisateur (items).
  2. Policy boundary valide le shape, les cles, ttl_days et confidence.
  3. Execution boundary decide quels enregistrements sont reellement ecrits (runtime allowlist).
  4. Dans la session suivante, le LLM planifie le retrieval intent (kind/query/top_k/scopes).
  5. Gateway execute le retrieval uniquement dans les scopes autorises au runtime.
  6. Le LLM construit la reponse a partir de incident_context + memory_items.
  7. Le systeme verifie que used_memory_keys reference uniquement la memoire reellement recuperee.

Contrat cle : le LLM peut proposer des enregistrements et un retrieval intent, mais la couche policy/execution definit ce qui est valide et ce qui est reellement execute.

Policy allowlist definit ce que le modele peut demander, execution allowlist definit ce que le runtime autorise reellement maintenant.


Structure du projet

TEXT
examples/
└── agent-patterns/
    └── memory-augmented-agent/
        └── python/
            ├── main.py           # Session1 capture/store -> Session2 retrieve/apply
            ├── llm.py            # extraction + retrieval planning + final response
            ├── gateway.py        # policy/execution boundary for memory operations
            ├── memory_store.py   # in-memory store with TTL and relevance scoring
            ├── requirements.txt
            └── README.md

Lancer le projet

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/memory-augmented-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ est requis.

Option via export :

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Option via .env (optionnel)
BASH
cat > .env <<'ENVFILE'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
ENVFILE

set -a
source .env
set +a

python main.py

C est la variante shell (macOS/Linux). Sur Windows, il est plus simple d utiliser des variables set ou, si souhaite, python-dotenv pour charger .env automatiquement.


Tache

Imagine un cas operations pour un incident assistant :

Session 1 : l utilisateur definit des preferences stables (langue, style de reponse, canal de mise a jour).
Session 2 : il demande de rediger un court update sur un incident de paiement.

Dans Session 2, le goal inclut volontairement update et next actions pour montrer l application des champs de preferences dans un update structure.

L agent doit :

  • ecrire uniquement des faits memoire utiles
  • ne pas ecrire les cles que le runtime interdit
  • recuperer la memoire pertinente dans la session suivante
  • l appliquer dans la reponse finale

Solution

Dans cet exemple :

  • le LLM fait extract_memory_candidates(...) et plan_retrieval_intent(...)
  • gateway valide les contrats et enforce l execution allowlist
  • MemoryStore stocke des enregistrements avec TTL et retourne les faits les plus pertinents
  • ENABLE_PREFERENCE_BIAS est un switch runtime pour ce flow : ce n est pas "always include", mais un biais controle (les preference keys recoivent un petit bonus de score et peuvent entrer dans top_k)
  • avec ENABLE_PREFERENCE_BIAS=True, les preference keys peuvent entrer dans top_k meme sans token-overlap (via un bonus de score controle)
  • la reponse finale passe le allowlist-check : used_memory_keysretrieved_keys
  • la verification de response_style=concise dans cette demo est une compliance de format (longueur/nombre de phrases), pas une evaluation semantique du ton
  • le resultat contient un trace complet et un history compact

Code

memory_store.py — memoire TTL et scoring de retrieval

PYTHON
from __future__ import annotations

import re
import time
from dataclasses import dataclass
from typing import Any


DEFAULT_BOOST_KEYS = {"language", "response_style", "update_channel"}


def _tokenize(text: str) -> set[str]:
    return set(re.findall(r"[a-zA-Z0-9_]+", (text or "").lower()))


@dataclass
class MemoryRecord:
    user_id: int
    key: str
    value: str
    scope: str
    source: str
    confidence: float
    updated_at: float
    expires_at: float


class MemoryStore:
    def __init__(self, *, max_items: int):
        self.max_items = max_items
        self._records: dict[tuple[int, str, str], MemoryRecord] = {}

    def _evict_if_needed(self) -> None:
        if len(self._records) <= self.max_items:
            return
        oldest_key = min(self._records.items(), key=lambda item: item[1].updated_at)[0]
        self._records.pop(oldest_key, None)

    def upsert_items(
        self,
        *,
        user_id: int,
        items: list[dict[str, Any]],
        source: str,
    ) -> list[dict[str, Any]]:
        now = time.time()
        written: list[dict[str, Any]] = []

        for item in items:
            key = str(item["key"]).strip()
            value = str(item["value"]).strip()
            scope = str(item.get("scope", "user")).strip() or "user"
            ttl_days = float(item.get("ttl_days", 180))
            ttl_days = max(1.0, min(365.0, ttl_days))
            confidence = float(item.get("confidence", 0.8))
            confidence = max(0.0, min(1.0, confidence))
            expires_at = now + ttl_days * 86400.0

            record_key = (user_id, scope, key)
            existing = self._records.get(record_key)
            if existing and existing.value == value:
                # Stable value: refresh metadata without creating noisy rewrites.
                existing.source = source
                existing.confidence = confidence
                existing.updated_at = now
                existing.expires_at = expires_at

                written.append(
                    {
                        "key": key,
                        "value": value,
                        "scope": scope,
                        "source": source,
                        "confidence": round(confidence, 3),
                        "ttl_days": int(ttl_days),
                        "refreshed": True,
                    }
                )
                continue

            row = MemoryRecord(
                user_id=user_id,
                key=key,
                value=value,
                scope=scope,
                source=source,
                confidence=confidence,
                updated_at=now,
                expires_at=expires_at,
            )
            self._records[record_key] = row
            self._evict_if_needed()

            written.append(
                {
                    "key": key,
                    "value": value,
                    "scope": scope,
                    "source": source,
                    "confidence": round(confidence, 3),
                    "ttl_days": int(ttl_days),
                    "refreshed": False,
                }
            )

        return written

    def search(
        self,
        *,
        user_id: int,
        query: str,
        top_k: int,
        scopes: set[str],
        include_preference_keys: bool = False,
    ) -> list[dict[str, Any]]:
        now = time.time()
        query_tokens = _tokenize(query)
        if not query_tokens:
            return []

        hits: list[tuple[float, MemoryRecord]] = []
        for row in list(self._records.values()):
            if row.user_id != user_id:
                continue
            if row.scope not in scopes:
                continue
            if row.expires_at <= now:
                self._records.pop((row.user_id, row.scope, row.key), None)
                continue

            text_tokens = _tokenize(f"{row.key} {row.value}")
            overlap = len(query_tokens & text_tokens)
            if overlap == 0 and not (include_preference_keys and row.key in DEFAULT_BOOST_KEYS):
                continue

            score = overlap + (row.confidence * 0.3)

            if include_preference_keys and row.key in DEFAULT_BOOST_KEYS:
                score += 0.4

            if score <= 0:
                continue
            hits.append((score, row))

        hits.sort(key=lambda item: (item[0], item[1].updated_at), reverse=True)

        result: list[dict[str, Any]] = []
        for score, row in hits[:top_k]:
            result.append(
                {
                    "key": row.key,
                    "value": row.value,
                    "scope": row.scope,
                    "source": row.source,
                    "confidence": round(row.confidence, 3),
                    "score": round(score, 3),
                }
            )
        return result

    def dump_user_records(self, *, user_id: int) -> list[dict[str, Any]]:
        now = time.time()
        rows: list[MemoryRecord] = []

        for row in list(self._records.values()):
            if row.user_id != user_id:
                continue
            if row.expires_at <= now:
                self._records.pop((row.user_id, row.scope, row.key), None)
                continue
            rows.append(row)

        rows.sort(key=lambda item: item.updated_at, reverse=True)

        snapshot: list[dict[str, Any]] = []
        for row in rows:
            ttl_left_days = max(0.0, (row.expires_at - now) / 86400.0)
            snapshot.append(
                {
                    "key": row.key,
                    "value": row.value,
                    "scope": row.scope,
                    "source": row.source,
                    "confidence": round(row.confidence, 3),
                    "ttl_left_days": round(ttl_left_days, 1),
                }
            )
        return snapshot

Ce qui compte le plus ici (en clair)

  • La memoire est isolee par user_id et scope.
  • Il y a un cycle de vie : TTL + nettoyage des enregistrements expires.
  • Le boost des preference keys est pilote par la runtime policy (include_preference_keys), pas par le wording de la retrieval query.
  • search(...) retourne des memory items pertinents, pas tout le state.

gateway.py — policy/execution boundary pour la memoire

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from memory_store import MemoryStore


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_capture_items: int = 6
    max_retrieve_top_k: int = 6
    max_query_chars: int = 240
    max_answer_chars: int = 700
    max_value_chars: int = 120
    max_seconds: int = 25


def _is_number(value: Any) -> bool:
    return isinstance(value, (int, float)) and not isinstance(value, bool)


def validate_memory_candidates(
    raw: Any,
    *,
    allowed_keys_policy: set[str],
    allowed_scopes_policy: set[str],
    max_items: int,
    max_value_chars: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_memory_candidates:not_object")

    items = raw.get("items")
    if not isinstance(items, list):
        raise StopRun("invalid_memory_candidates:items")

    normalized: list[dict[str, Any]] = []
    for item in items:
        if not isinstance(item, dict):
            raise StopRun("invalid_memory_candidates:item")

        required_keys = {"key", "value"}
        if not required_keys.issubset(item.keys()):
            raise StopRun("invalid_memory_candidates:missing_keys")

        key = item.get("key")
        value = item.get("value")
        scope = item.get("scope", "user")
        ttl_days = item.get("ttl_days", 180)
        confidence = item.get("confidence", 0.8)

        if not isinstance(key, str) or not key.strip():
            raise StopRun("invalid_memory_candidates:key")
        key = key.strip()
        if key not in allowed_keys_policy:
            raise StopRun(f"memory_key_not_allowed_policy:{key}")

        if not isinstance(value, str) or not value.strip():
            raise StopRun("invalid_memory_candidates:value")
        value = value.strip()
        if len(value) > max_value_chars:
            raise StopRun("invalid_memory_candidates:value_too_long")

        if not isinstance(scope, str) or not scope.strip():
            raise StopRun("invalid_memory_candidates:scope")
        scope = scope.strip()
        if scope not in allowed_scopes_policy:
            raise StopRun(f"memory_scope_not_allowed_policy:{scope}")

        if not _is_number(ttl_days):
            raise StopRun("invalid_memory_candidates:ttl_days")
        ttl_days = int(float(ttl_days))
        ttl_days = max(1, min(365, ttl_days))

        if not _is_number(confidence):
            raise StopRun("invalid_memory_candidates:confidence")
        confidence = float(confidence)
        confidence = max(0.0, min(1.0, confidence))

        normalized.append(
            {
                "key": key,
                "value": value,
                "scope": scope,
                "ttl_days": ttl_days,
                "confidence": round(confidence, 3),
            }
        )

    if len(normalized) > max_items:
        raise StopRun("invalid_memory_candidates:too_many_items")

    return {"items": normalized}


def validate_retrieval_intent(
    raw: Any,
    *,
    allowed_scopes_policy: set[str],
    max_top_k: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_retrieval_intent:not_object")

    if raw.get("kind") != "retrieve_memory":
        raise StopRun("invalid_retrieval_intent:kind")

    query = raw.get("query")
    if not isinstance(query, str) or not query.strip():
        raise StopRun("invalid_retrieval_intent:query")

    top_k = raw.get("top_k", 4)
    if not isinstance(top_k, int) or not (1 <= top_k <= max_top_k):
        raise StopRun("invalid_retrieval_intent:top_k")

    scopes_raw = raw.get("scopes")
    normalized_scopes: list[str] = []
    if scopes_raw is not None:
        if not isinstance(scopes_raw, list) or not scopes_raw:
            raise StopRun("invalid_retrieval_intent:scopes")
        for scope in scopes_raw:
            if not isinstance(scope, str) or not scope.strip():
                raise StopRun("invalid_retrieval_intent:scope_item")
            normalized_scope = scope.strip()
            if normalized_scope not in allowed_scopes_policy:
                raise StopRun(f"invalid_retrieval_intent:scope_not_allowed:{normalized_scope}")
            normalized_scopes.append(normalized_scope)

    payload = {
        "kind": "retrieve_memory",
        "query": query.strip(),
        "top_k": top_k,
    }
    if normalized_scopes:
        payload["scopes"] = normalized_scopes
    return payload


class MemoryGateway:
    def __init__(
        self,
        *,
        store: MemoryStore,
        budget: Budget,
        allow_execution_keys: set[str],
        allow_execution_scopes: set[str],
    ):
        self.store = store
        self.budget = budget
        self.allow_execution_keys = set(allow_execution_keys)
        self.allow_execution_scopes = set(allow_execution_scopes)

    def write(
        self,
        *,
        user_id: int,
        items: list[dict[str, Any]],
        source: str,
    ) -> dict[str, Any]:
        if len(items) > self.budget.max_capture_items:
            raise StopRun("max_capture_items")

        writable: list[dict[str, Any]] = []
        blocked: list[dict[str, Any]] = []

        for item in items:
            key = item["key"]
            scope = item["scope"]

            if key not in self.allow_execution_keys:
                blocked.append({"key": key, "reason": "key_denied_execution"})
                continue
            if scope not in self.allow_execution_scopes:
                blocked.append(
                    {
                        "key": key,
                        "scope": scope,
                        "reason": "scope_denied_execution",
                    }
                )
                continue

            writable.append(item)

        written = []
        if writable:
            written = self.store.upsert_items(user_id=user_id, items=writable, source=source)

        return {
            "written": written,
            "blocked": blocked,
        }

    def retrieve(
        self,
        *,
        user_id: int,
        intent: dict[str, Any],
        include_preference_keys: bool = False,
    ) -> dict[str, Any]:
        query = intent["query"]
        if len(query) > self.budget.max_query_chars:
            raise StopRun("invalid_retrieval_intent:query_too_long")

        requested_scopes = set(intent.get("scopes") or self.allow_execution_scopes)
        denied = sorted(requested_scopes - self.allow_execution_scopes)
        if denied:
            raise StopRun(f"scope_denied:{denied[0]}")

        items = self.store.search(
            user_id=user_id,
            query=query,
            top_k=intent["top_k"],
            scopes=requested_scopes,
            include_preference_keys=include_preference_keys,
        )

        return {
            "query": query,
            "requested_scopes": sorted(requested_scopes),
            "include_preference_keys": include_preference_keys,
            "items": items,
        }

Ce qui compte le plus ici (en clair)

  • Policy boundary verifie le contrat et les cles/scopes autorises.
  • Policy est stricte : memory key/scope hors allowlist stoppe le run.
  • Gateway enforce uniquement l execution allowlist transmise depuis main.py.
  • Si key/scope sont interdits au runtime, l ecriture est bloquee et visible dans history.blocked.

llm.py — extract, retrieve-intent, apply

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


MEMORY_CAPTURE_SYSTEM_PROMPT = """
You are a memory extraction assistant.
Return exactly one JSON object in this shape:
{
  "items": [
    {
      "key": "language",
      "value": "english",
      "scope": "user",
      "ttl_days": 180,
      "confidence": 0.9
    }
  ]
}

Rules:
- Extract only stable preferences or durable constraints useful in future sessions.
- Use only keys from available_keys.
- scope must be "user" or "workspace".
- ttl_days must be between 1 and 365.
- confidence must be between 0 and 1.
- If nothing should be stored, return {"items": []}.
- Do not output markdown or extra keys.
""".strip()

RETRIEVAL_INTENT_SYSTEM_PROMPT = """
You are a memory retrieval planner.
Return exactly one JSON object in this shape:
{
  "kind": "retrieve_memory",
  "query": "short memory query",
  "top_k": 4
}

Optional key:
- "scopes": ["user", "workspace"]

Rules:
- Keep query compact and factual.
- top_k must be between 1 and 6.
- Use only scopes from available_scopes.
- Prefer omitting "scopes" unless the goal explicitly requires a specific scope.
- Do not output markdown or extra keys.
""".strip()

ANSWER_SYSTEM_PROMPT = """
You are an incident response assistant.
Return exactly one JSON object in this shape:
{
  "answer": "final answer in English",
  "used_memory_keys": ["language", "response_style"]
}

Rules:
- Use only incident_context and memory_items.
- Keep the answer concise, actionable, and suitable for an operations update.
- used_memory_keys must reference only keys present in memory_items.
- If "update_channel" is used, explicitly mention it in answer text (for example, "via email").
- If "language" is used with value "english", start answer with "Incident update:".
- If no memory was used, return an empty array.
- Do not output markdown or extra keys.
""".strip()


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def extract_memory_candidates(
    *,
    user_message: str,
    available_keys: list[str],
) -> dict[str, Any]:
    payload = {
        "user_message": user_message,
        "available_keys": available_keys,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": MEMORY_CAPTURE_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"invalid": True, "raw": text}


def plan_retrieval_intent(*, goal: str, available_scopes: list[str]) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "available_scopes": available_scopes,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": RETRIEVAL_INTENT_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_memory_augmented_answer(
    *,
    goal: str,
    incident_context: dict[str, Any],
    memory_items: list[dict[str, Any]],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "memory_items": [
            {
                "key": item.get("key"),
                "value": item.get("value"),
                "scope": item.get("scope"),
                "confidence": item.get("confidence"),
            }
            for item in memory_items
        ],
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": ANSWER_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")

    answer = data.get("answer")
    used_memory_keys = data.get("used_memory_keys")

    if not isinstance(answer, str):
        raise LLMInvalid("llm_invalid_schema")
    if not answer.strip():
        raise LLMEmpty("llm_empty")

    if not isinstance(used_memory_keys, list):
        raise LLMInvalid("llm_invalid_schema")

    normalized_keys: list[str] = []
    for value in used_memory_keys:
        if not isinstance(value, str):
            raise LLMInvalid("llm_invalid_schema")
        item = value.strip()
        if item and item not in normalized_keys:
            normalized_keys.append(item)

    return {
        "answer": answer.strip(),
        "used_memory_keys": normalized_keys,
    }

Ce qui compte le plus ici (en clair)

  • Chaque phase a son propre contrat JSON : capture, retrieve_intent, apply.
  • Les erreurs LLM sont separees en llm_timeout, llm_invalid_*, llm_empty.

main.py — Session1 Capture/Store -> Session2 Retrieve/Apply

PYTHON
from __future__ import annotations

import json
import re
import time
import uuid
from typing import Any

from gateway import (
    Budget,
    MemoryGateway,
    StopRun,
    validate_memory_candidates,
    validate_retrieval_intent,
)
from llm import (
    LLMEmpty,
    LLMInvalid,
    LLMTimeout,
    compose_memory_augmented_answer,
    extract_memory_candidates,
    plan_retrieval_intent,
)
from memory_store import MemoryStore

USER_ID = 42
SESSION_1_USER_MESSAGE = (
    "For future incident updates, write in English, keep replies concise, "
    "use email as the primary channel, and remember that I am enterprise tier."
)
SESSION_2_GOAL = "Draft today's payment incident update and next actions."

INCIDENT_CONTEXT = {
    "date": "2026-03-04",
    "region": "US",
    "incident_id": "inc_payments_20260304",
    "severity": "P1",
    "gateway_status": "degraded",
    "failed_payment_rate": 0.034,
    "chargeback_alerts": 5,
    "eta_minutes": 45,
}

BUDGET = Budget(
    max_capture_items=6,
    max_retrieve_top_k=6,
    max_query_chars=240,
    max_answer_chars=700,
    max_value_chars=120,
    max_seconds=25,
)

ALLOWED_MEMORY_KEYS_POLICY = {
    "language",
    "response_style",
    "update_channel",
    "declared_tier",
}
# Runtime can block high-risk keys even if policy allows them.
TRUST_DECLARED_TIER_FROM_CHAT = False
ALLOWED_MEMORY_KEYS_EXECUTION = (
    ALLOWED_MEMORY_KEYS_POLICY
    if TRUST_DECLARED_TIER_FROM_CHAT
    else {"language", "response_style", "update_channel"}
)

ALLOWED_SCOPES_POLICY = {"user", "workspace"}
WORKSPACE_MEMORY_RUNTIME_ENABLED = False
ALLOWED_SCOPES_EXECUTION = (
    ALLOWED_SCOPES_POLICY if WORKSPACE_MEMORY_RUNTIME_ENABLED else {"user"}
)
# Runtime policy: include default preferences for this incident-update flow.
ENABLE_PREFERENCE_BIAS = True



def _shorten(text: str, *, limit: int = 240) -> str:
    text = (text or "").strip()
    if len(text) <= limit:
        return text
    return text[: limit - 3].rstrip() + "..."



def _pick_applied_memory(
    memory_items: list[dict[str, Any]],
    used_keys: list[str],
) -> list[dict[str, Any]]:
    used = set(used_keys)
    out: list[dict[str, Any]] = []
    for item in memory_items:
        key = item.get("key")
        if key not in used:
            continue
        out.append(
            {
                "key": item["key"],
                "value": item["value"],
                "scope": item["scope"],
                "confidence": item["confidence"],
                "score": item["score"],
            }
        )
    return out



def _has_declared_memory_application(*, answer: str, applied_memory: list[dict[str, Any]]) -> bool:
    """
    Conservative audit check:
    - update_channel: value should appear in answer text.
    - response_style=concise: short-response format compliance.
    - language=english: answer should use a stable prefix "Incident update:".
    If no verifiable key exists, do not block.
    """
    if not applied_memory:
        return False

    normalized_answer = " ".join((answer or "").lower().split())
    evidenced_any = False
    has_verifiable_key = False

    def _is_concise(text: str) -> bool:
        words = re.findall(r"[a-zA-Z0-9_]+", text)
        sentence_count = len(re.findall(r"[.!?]+", text))
        return len(words) <= 80 and sentence_count <= 3

    for item in applied_memory:
        key = str(item.get("key") or "").strip().lower()
        value = str(item.get("value", "")).strip().lower()
        if not key or not value:
            continue

        if key == "update_channel":
            has_verifiable_key = True
            if value in normalized_answer:
                evidenced_any = True
        elif key == "response_style":
            has_verifiable_key = True
            if value == "concise" and _is_concise(answer):
                evidenced_any = True
        elif key == "language":
            if value in {"english", "en"}:
                has_verifiable_key = True
                if normalized_answer.startswith("incident update:"):
                    evidenced_any = True
            continue
        else:
            continue

    if not has_verifiable_key:
        return True
    return evidenced_any



def run_memory_augmented(
    *,
    user_id: int,
    session_1_message: str,
    session_2_goal: str,
) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    store = MemoryStore(max_items=100)
    gateway = MemoryGateway(
        store=store,
        budget=BUDGET,
        allow_execution_keys=ALLOWED_MEMORY_KEYS_EXECUTION,
        allow_execution_scopes=ALLOWED_SCOPES_EXECUTION,
    )

    try:
        raw_capture = extract_memory_candidates(
            user_message=session_1_message,
            available_keys=sorted(ALLOWED_MEMORY_KEYS_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="capture")

    try:
        capture_payload = validate_memory_candidates(
            raw_capture,
            allowed_keys_policy=ALLOWED_MEMORY_KEYS_POLICY,
            allowed_scopes_policy=ALLOWED_SCOPES_POLICY,
            max_items=BUDGET.max_capture_items,
            max_value_chars=BUDGET.max_value_chars,
        )
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase="capture",
            raw_capture=raw_capture,
        )

    write_result = gateway.write(
        user_id=user_id,
        items=capture_payload["items"],
        source="session_1",
    )

    refreshed_items = [item for item in write_result["written"] if item.get("refreshed")]
    written_items = [item for item in write_result["written"] if not item.get("refreshed")]

    trace.append(
        {
            "step": 1,
            "phase": "capture_store",
            "candidates": len(capture_payload["items"]),
            "written": len(written_items),
            "refreshed": len(refreshed_items),
            "blocked": len(write_result["blocked"]),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 1,
            "session": "session_1",
            "message": session_1_message,
            "written_keys": [item["key"] for item in written_items],
            "refreshed_keys": [item["key"] for item in refreshed_items],
            "blocked": write_result["blocked"],
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="retrieve_plan")

    try:
        raw_intent = plan_retrieval_intent(
            goal=session_2_goal,
            available_scopes=sorted(ALLOWED_SCOPES_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="retrieve_plan")

    try:
        intent = validate_retrieval_intent(
            raw_intent,
            allowed_scopes_policy=ALLOWED_SCOPES_POLICY,
            max_top_k=BUDGET.max_retrieve_top_k,
        )
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase="retrieve_plan",
            raw_intent=raw_intent,
        )

    try:
        retrieval = gateway.retrieve(
            user_id=user_id,
            intent=intent,
            include_preference_keys=ENABLE_PREFERENCE_BIAS,
        )
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase="retrieve",
            intent=intent,
        )

    trace.append(
        {
            "step": 2,
            "phase": "retrieve",
            "query": retrieval["query"],
            "requested_scopes": retrieval["requested_scopes"],
            "include_preference_keys": retrieval["include_preference_keys"],
            "memory_hits": len(retrieval["items"]),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 2,
            "session": "session_2",
            "intent": intent,
            "resolved_scopes": retrieval["requested_scopes"],
            "include_preference_keys": retrieval["include_preference_keys"],
            "retrieved_keys": [item["key"] for item in retrieval["items"]],
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="apply")

    try:
        final = compose_memory_augmented_answer(
            goal=session_2_goal,
            incident_context=INCIDENT_CONTEXT,
            memory_items=retrieval["items"],
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="apply")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="apply")
    except LLMEmpty:
        return stopped("llm_empty", phase="apply")

    retrieved_keys = {item["key"] for item in retrieval["items"]}
    invalid_used_keys = sorted(
        set(final["used_memory_keys"]) - retrieved_keys,
    )
    if invalid_used_keys:
        return stopped(
            "invalid_answer:memory_keys_out_of_context",
            phase="apply",
            invalid_used_memory_keys=invalid_used_keys,
            retrieved_keys=sorted(retrieved_keys),
        )

    if len(final["answer"]) > BUDGET.max_answer_chars:
        return stopped("invalid_answer:too_long", phase="apply")

    applied_memory = _pick_applied_memory(retrieval["items"], final["used_memory_keys"])
    if final["used_memory_keys"] and not _has_declared_memory_application(
        answer=final["answer"],
        applied_memory=applied_memory,
    ):
        return stopped(
            "invalid_answer:memory_declared_but_not_applied",
            phase="apply",
            used_memory_keys=final["used_memory_keys"],
            applied_memory=applied_memory,
        )

    trace.append(
        {
            "step": 3,
            "phase": "apply",
            "used_memory_keys": final["used_memory_keys"],
            "applied_memory_count": len(applied_memory),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 3,
            "action": "compose_memory_augmented_answer",
            "used_memory_keys": final["used_memory_keys"],
            "answer": _shorten(final["answer"]),
        }
    )

    return {
        "run_id": run_id,
        "status": "ok",
        "stop_reason": "success",
        "outcome": "memory_applied" if final["used_memory_keys"] else "context_only",
        "answer": final["answer"],
        "used_memory_keys": final["used_memory_keys"],
        "applied_memory": applied_memory,
        "memory_state": store.dump_user_records(user_id=user_id),
        "trace": trace,
        "history": history,
    }



def main() -> None:
    result = run_memory_augmented(
        user_id=USER_ID,
        session_1_message=SESSION_1_USER_MESSAGE,
        session_2_goal=SESSION_2_GOAL,
    )
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Ce qui compte le plus ici (en clair)

  • Session 1 et Session 2 sont simulees ici dans un seul process run via un memory store partage.
  • ALLOWED_MEMORY_KEYS_POLICY et ALLOWED_MEMORY_KEYS_EXECUTION peuvent differer volontairement.
  • ENABLE_PREFERENCE_BIAS=True est active uniquement pour le flow incident-update, ou les champs de preferences sont presque toujours necessaires.
  • Chaque payload de resultat contient run_id pour la correlation des logs.
  • La verification finale used_memory_keys + memory_declared_but_not_applied rend l etape apply audit-friendly.

Exemple de sortie

JSON
{
  "run_id": "f06d5282-bda4-44df-8bf9-38f53cdb2fb9",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "memory_applied",
  "answer": "Incident update: On 2026-03-04, the US payment gateway is in a P1 degraded state. Failed payment rate is 3.4% with 5 chargeback alerts, ETA 45 minutes. Next actions: monitor gateway performance and send customer updates via email.",
  "used_memory_keys": [
    "language",
    "update_channel",
    "response_style"
  ],
  "applied_memory": [
    {
      "key": "language",
      "value": "english",
      "scope": "user",
      "confidence": 0.95,
      "score": 0.685
    },
    {
      "key": "update_channel",
      "value": "email",
      "scope": "user",
      "confidence": 0.95,
      "score": 0.685
    },
    {
      "key": "response_style",
      "value": "concise",
      "scope": "user",
      "confidence": 0.9,
      "score": 0.67
    }
  ],
  "memory_state": [
    {
      "key": "language",
      "value": "english",
      "scope": "user",
      "source": "session_1",
      "confidence": 0.95,
      "ttl_left_days": 180.0
    },
    {
      "key": "response_style",
      "value": "concise",
      "scope": "user",
      "source": "session_1",
      "confidence": 0.9,
      "ttl_left_days": 180.0
    },
    {
      "key": "update_channel",
      "value": "email",
      "scope": "user",
      "source": "session_1",
      "confidence": 0.95,
      "ttl_left_days": 180.0
    }
  ],
  "trace": [
    {
      "step": 1,
      "phase": "capture_store",
      "candidates": 4,
      "written": 3,
      "refreshed": 0,
      "blocked": 1,
      "ok": true
    },
    {
      "step": 2,
      "phase": "retrieve",
      "query": "payment incident update and next actions",
      "requested_scopes": [
        "user"
      ],
      "include_preference_keys": true,
      "memory_hits": 3,
      "ok": true
    },
    {
      "step": 3,
      "phase": "apply",
      "used_memory_keys": [
        "language",
        "update_channel",
        "response_style"
      ],
      "applied_memory_count": 3,
      "ok": true
    }
  ],
  "history": [
    {
      "step": 1,
      "session": "session_1",
      "message": "For future incident updates, write in English, keep replies concise, use email as the primary channel, and remember that I am enterprise tier.",
      "written_keys": [
        "language",
        "response_style",
        "update_channel"
      ],
      "refreshed_keys": [],
      "blocked": [
        {
          "key": "declared_tier",
          "reason": "key_denied_execution"
        }
      ]
    },
    {
      "step": 2,
      "session": "session_2",
      "intent": {
        "kind": "retrieve_memory",
        "query": "payment incident update and next actions",
        "top_k": 4
      },
      "resolved_scopes": [
        "user"
      ],
      "include_preference_keys": true,
      "retrieved_keys": [
        "language",
        "update_channel",
        "response_style"
      ]
    },
    {
      "step": 3,
      "action": "compose_memory_augmented_answer",
      "used_memory_keys": [
        "language",
        "update_channel",
        "response_style"
      ],
      "answer": "Incident update: On 2026-03-04, the US payment gateway is in a P1 degraded state..."
    }
  ]
}

Valeurs stop_reason typiques

  • success — run termine correctement ; voir outcome (memory_applied ou context_only)
  • invalid_memory_candidates:* — memory capture n a pas passe la validation de contrat
  • invalid_memory_candidates:value_too_long — value depasse la limite max_value_chars
  • memory_key_not_allowed_policy:<key> — le LLM a propose une key hors policy allowlist
  • memory_scope_not_allowed_policy:<scope> — le LLM a propose un scope hors policy allowlist
  • invalid_retrieval_intent:* — retrieval intent n a pas passe la policy validation
  • scope_denied:<scope> — retrieval scope non autorise par execution allowlist
  • llm_timeout — le LLM n a pas repondu dans OPENAI_TIMEOUT_SECONDS
  • llm_invalid_json / llm_invalid_schema — l etape apply a retourne un JSON/shape invalide
  • llm_empty — reponse finale vide
  • invalid_answer:memory_keys_out_of_context — le modele a reference une memory key absente du retrieval
  • invalid_answer:too_long — la reponse finale depasse la limite max_answer_chars
  • invalid_answer:memory_declared_but_not_applied — le modele a declare l usage memoire, mais le texte de reponse ne le reflete pas
  • max_seconds — budget total de temps du run depasse

Ce qui n est PAS montre

  • stockage persistant (Postgres/Redis/Vector DB) au lieu de l implementation in-memory
  • chiffrement/PII redaction avant l ecriture memoire
  • retrieval semantique via embeddings (au lieu d un token-overlap simple)
  • quotas multi-tenant et politique de retention soft/hard
  • retry/backoff pour les appels LLM
  • consentement par key et memory UI visible par l utilisateur

Ce que vous pouvez essayer ensuite

  1. Active TRUST_DECLARED_TIER_FROM_CHAT=True et verifie comment blocked evolue dans history.
  2. Active WORKSPACE_MEMORY_RUNTIME_ENABLED=True et ajoute un retrieval intent avec scope workspace.
  3. Ajoute une policy rule pour la memory key timezone et verifie la personnalisation dans la answer.
  4. Remplace le store in-memory par un stockage externe et ajoute une deduplication par key + normalized_value.
⏱️ 21 min de lectureMis à jour Mars, 2026Difficulté: ★★☆
Intégré : contrôle en productionOnceOnly
Ajoutez des garde-fous aux agents tool-calling
Livrez ce pattern avec de la gouvernance :
  • Budgets (steps / plafonds de coût)
  • Permissions outils (allowlist / blocklist)
  • Kill switch & arrêt incident
  • Idempotence & déduplication
  • Audit logs & traçabilité
Mention intégrée : OnceOnly est une couche de contrôle pour des systèmes d’agents en prod.
Auteur

Cette documentation est organisée et maintenue par des ingénieurs qui déploient des agents IA en production.

Le contenu est assisté par l’IA, avec une responsabilité éditoriale humaine quant à l’exactitude, la clarté et la pertinence en production.

Les patterns et recommandations s’appuient sur des post-mortems, des modes de défaillance et des incidents opérationnels dans des systèmes déployés, notamment lors du développement et de l’exploitation d’une infrastructure de gouvernance pour les agents chez OnceOnly.