Esencia del patron (breve)
Memory-Augmented Agent es un patron en el que el agente guarda hechos importantes entre sesiones y los usa en respuestas posteriores.
El LLM propone que recordar y que recuperar, y la capa memory policy/execution controla que realmente se puede escribir/leer.
Que demuestra este ejemplo
- dos fases (Session 1/2) en una sola ejecucion; no se muestra persistencia entre procesos
- extraccion de memoria via LLM en un contrato JSON (
items[]) - policy boundary para contratos de escritura/lectura de memoria
- execution boundary (runtime allowlist) para claves y scopes de memoria
- runtime trust-gating: execution puede bloquear claves sensibles aunque policy las permita
- en esta demo, policy allowlist funciona como hard fail para detectar rapido drift y errores de contrato
- ciclo de vida TTL de memoria y almacenamiento in-memory acotado
- respuesta final con verificacion de
used_memory_keyscontra la memoria realmente recuperada stop_reason,traceehistoryexplicitos para monitoreo en produccion
Arquitectura
- El LLM extrae candidatos de memoria del mensaje del usuario (
items). - Policy boundary valida shape, claves,
ttl_daysyconfidence. - Execution boundary decide que registros se escriben realmente (runtime allowlist).
- En la siguiente sesion, el LLM planifica retrieval intent (
kind/query/top_k/scopes). - Gateway ejecuta retrieval solo en scopes permitidos por runtime.
- El LLM forma la respuesta con base en
incident_context + memory_items. - El sistema verifica que
used_memory_keysreferencien solo memoria realmente recuperada.
Contrato clave: el LLM puede proponer registros y retrieval intent, pero la capa policy/execution define que es valido y que se ejecuta realmente.
Policy allowlist define lo que el modelo puede pedir, y execution allowlist define lo que runtime realmente permite ahora.
Estructura del proyecto
examples/
└── agent-patterns/
└── memory-augmented-agent/
└── python/
├── main.py # Session1 capture/store -> Session2 retrieve/apply
├── llm.py # extraction + retrieval planning + final response
├── gateway.py # policy/execution boundary for memory operations
├── memory_store.py # in-memory store with TTL and relevance scoring
├── requirements.txt
└── README.md
Como ejecutar
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/memory-augmented-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Se requiere Python 3.11+.
Opcion con export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Opcion con .env (opcional)
cat > .env <<'ENVFILE'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
ENVFILE
set -a
source .env
set +a
python main.py
Esta es la variante de shell (macOS/Linux). En Windows es mas facil usar variables con set o, si quieres, python-dotenv para cargar .env automaticamente.
Tarea
Imagina un caso de operaciones para un incident assistant:
Session 1: el usuario define preferencias estables (idioma, estilo de respuesta, canal de actualizaciones).
Session 2: pide redactar un update corto sobre un incidente de pagos.
En Session 2, el goal incluye intencionalmente update y next actions para mostrar la aplicacion de campos de preferencias en un update estructurado.
El agente debe:
- guardar solo hechos de memoria utiles
- no guardar claves que runtime prohibe
- recuperar memoria relevante en la siguiente sesion
- aplicarla en la respuesta final
Solucion
En este ejemplo:
- el LLM ejecuta
extract_memory_candidates(...)yplan_retrieval_intent(...) - gateway valida contratos y enforce la execution allowlist
MemoryStoreguarda registros con TTL y devuelve los hechos mas relevantesENABLE_PREFERENCE_BIASes un switch de runtime para este flujo: no es "always include", sino un bias controlado (las preference keys reciben un pequeno bonus de score y pueden entrar entop_k)- con
ENABLE_PREFERENCE_BIAS=True, las preference keys pueden entrar entop_kincluso sin token-overlap (por un bonus de score controlado) - la respuesta final pasa el allowlist-check:
used_memory_keys⊆retrieved_keys - la validacion de
response_style=conciseen esta demo es compliance de formato (longitud/cantidad de frases), no evaluacion semantica del tono - el resultado incluye
tracecompleto yhistorycompacto
Codigo
memory_store.py — memoria TTL y scoring de retrieval
from __future__ import annotations
import re
import time
from dataclasses import dataclass
from typing import Any
DEFAULT_BOOST_KEYS = {"language", "response_style", "update_channel"}
def _tokenize(text: str) -> set[str]:
return set(re.findall(r"[a-zA-Z0-9_]+", (text or "").lower()))
@dataclass
class MemoryRecord:
user_id: int
key: str
value: str
scope: str
source: str
confidence: float
updated_at: float
expires_at: float
class MemoryStore:
def __init__(self, *, max_items: int):
self.max_items = max_items
self._records: dict[tuple[int, str, str], MemoryRecord] = {}
def _evict_if_needed(self) -> None:
if len(self._records) <= self.max_items:
return
oldest_key = min(self._records.items(), key=lambda item: item[1].updated_at)[0]
self._records.pop(oldest_key, None)
def upsert_items(
self,
*,
user_id: int,
items: list[dict[str, Any]],
source: str,
) -> list[dict[str, Any]]:
now = time.time()
written: list[dict[str, Any]] = []
for item in items:
key = str(item["key"]).strip()
value = str(item["value"]).strip()
scope = str(item.get("scope", "user")).strip() or "user"
ttl_days = float(item.get("ttl_days", 180))
ttl_days = max(1.0, min(365.0, ttl_days))
confidence = float(item.get("confidence", 0.8))
confidence = max(0.0, min(1.0, confidence))
expires_at = now + ttl_days * 86400.0
record_key = (user_id, scope, key)
existing = self._records.get(record_key)
if existing and existing.value == value:
# Stable value: refresh metadata without creating noisy rewrites.
existing.source = source
existing.confidence = confidence
existing.updated_at = now
existing.expires_at = expires_at
written.append(
{
"key": key,
"value": value,
"scope": scope,
"source": source,
"confidence": round(confidence, 3),
"ttl_days": int(ttl_days),
"refreshed": True,
}
)
continue
row = MemoryRecord(
user_id=user_id,
key=key,
value=value,
scope=scope,
source=source,
confidence=confidence,
updated_at=now,
expires_at=expires_at,
)
self._records[record_key] = row
self._evict_if_needed()
written.append(
{
"key": key,
"value": value,
"scope": scope,
"source": source,
"confidence": round(confidence, 3),
"ttl_days": int(ttl_days),
"refreshed": False,
}
)
return written
def search(
self,
*,
user_id: int,
query: str,
top_k: int,
scopes: set[str],
include_preference_keys: bool = False,
) -> list[dict[str, Any]]:
now = time.time()
query_tokens = _tokenize(query)
if not query_tokens:
return []
hits: list[tuple[float, MemoryRecord]] = []
for row in list(self._records.values()):
if row.user_id != user_id:
continue
if row.scope not in scopes:
continue
if row.expires_at <= now:
self._records.pop((row.user_id, row.scope, row.key), None)
continue
text_tokens = _tokenize(f"{row.key} {row.value}")
overlap = len(query_tokens & text_tokens)
if overlap == 0 and not (include_preference_keys and row.key in DEFAULT_BOOST_KEYS):
continue
score = overlap + (row.confidence * 0.3)
if include_preference_keys and row.key in DEFAULT_BOOST_KEYS:
score += 0.4
if score <= 0:
continue
hits.append((score, row))
hits.sort(key=lambda item: (item[0], item[1].updated_at), reverse=True)
result: list[dict[str, Any]] = []
for score, row in hits[:top_k]:
result.append(
{
"key": row.key,
"value": row.value,
"scope": row.scope,
"source": row.source,
"confidence": round(row.confidence, 3),
"score": round(score, 3),
}
)
return result
def dump_user_records(self, *, user_id: int) -> list[dict[str, Any]]:
now = time.time()
rows: list[MemoryRecord] = []
for row in list(self._records.values()):
if row.user_id != user_id:
continue
if row.expires_at <= now:
self._records.pop((row.user_id, row.scope, row.key), None)
continue
rows.append(row)
rows.sort(key=lambda item: item.updated_at, reverse=True)
snapshot: list[dict[str, Any]] = []
for row in rows:
ttl_left_days = max(0.0, (row.expires_at - now) / 86400.0)
snapshot.append(
{
"key": row.key,
"value": row.value,
"scope": row.scope,
"source": row.source,
"confidence": round(row.confidence, 3),
"ttl_left_days": round(ttl_left_days, 1),
}
)
return snapshot
Lo mas importante aqui (en simple)
- La memoria esta aislada por
user_idyscope. - Hay ciclo de vida: TTL + limpieza de registros expirados.
- El boost para preference keys se controla por runtime policy (
include_preference_keys), no por el wording de retrieval query. search(...)devuelve memory items relevantes, no todo el state.
gateway.py — policy/execution boundary para memoria
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from memory_store import MemoryStore
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_capture_items: int = 6
max_retrieve_top_k: int = 6
max_query_chars: int = 240
max_answer_chars: int = 700
max_value_chars: int = 120
max_seconds: int = 25
def _is_number(value: Any) -> bool:
return isinstance(value, (int, float)) and not isinstance(value, bool)
def validate_memory_candidates(
raw: Any,
*,
allowed_keys_policy: set[str],
allowed_scopes_policy: set[str],
max_items: int,
max_value_chars: int,
) -> dict[str, Any]:
if not isinstance(raw, dict):
raise StopRun("invalid_memory_candidates:not_object")
items = raw.get("items")
if not isinstance(items, list):
raise StopRun("invalid_memory_candidates:items")
normalized: list[dict[str, Any]] = []
for item in items:
if not isinstance(item, dict):
raise StopRun("invalid_memory_candidates:item")
required_keys = {"key", "value"}
if not required_keys.issubset(item.keys()):
raise StopRun("invalid_memory_candidates:missing_keys")
key = item.get("key")
value = item.get("value")
scope = item.get("scope", "user")
ttl_days = item.get("ttl_days", 180)
confidence = item.get("confidence", 0.8)
if not isinstance(key, str) or not key.strip():
raise StopRun("invalid_memory_candidates:key")
key = key.strip()
if key not in allowed_keys_policy:
raise StopRun(f"memory_key_not_allowed_policy:{key}")
if not isinstance(value, str) or not value.strip():
raise StopRun("invalid_memory_candidates:value")
value = value.strip()
if len(value) > max_value_chars:
raise StopRun("invalid_memory_candidates:value_too_long")
if not isinstance(scope, str) or not scope.strip():
raise StopRun("invalid_memory_candidates:scope")
scope = scope.strip()
if scope not in allowed_scopes_policy:
raise StopRun(f"memory_scope_not_allowed_policy:{scope}")
if not _is_number(ttl_days):
raise StopRun("invalid_memory_candidates:ttl_days")
ttl_days = int(float(ttl_days))
ttl_days = max(1, min(365, ttl_days))
if not _is_number(confidence):
raise StopRun("invalid_memory_candidates:confidence")
confidence = float(confidence)
confidence = max(0.0, min(1.0, confidence))
normalized.append(
{
"key": key,
"value": value,
"scope": scope,
"ttl_days": ttl_days,
"confidence": round(confidence, 3),
}
)
if len(normalized) > max_items:
raise StopRun("invalid_memory_candidates:too_many_items")
return {"items": normalized}
def validate_retrieval_intent(
raw: Any,
*,
allowed_scopes_policy: set[str],
max_top_k: int,
) -> dict[str, Any]:
if not isinstance(raw, dict):
raise StopRun("invalid_retrieval_intent:not_object")
if raw.get("kind") != "retrieve_memory":
raise StopRun("invalid_retrieval_intent:kind")
query = raw.get("query")
if not isinstance(query, str) or not query.strip():
raise StopRun("invalid_retrieval_intent:query")
top_k = raw.get("top_k", 4)
if not isinstance(top_k, int) or not (1 <= top_k <= max_top_k):
raise StopRun("invalid_retrieval_intent:top_k")
scopes_raw = raw.get("scopes")
normalized_scopes: list[str] = []
if scopes_raw is not None:
if not isinstance(scopes_raw, list) or not scopes_raw:
raise StopRun("invalid_retrieval_intent:scopes")
for scope in scopes_raw:
if not isinstance(scope, str) or not scope.strip():
raise StopRun("invalid_retrieval_intent:scope_item")
normalized_scope = scope.strip()
if normalized_scope not in allowed_scopes_policy:
raise StopRun(f"invalid_retrieval_intent:scope_not_allowed:{normalized_scope}")
normalized_scopes.append(normalized_scope)
payload = {
"kind": "retrieve_memory",
"query": query.strip(),
"top_k": top_k,
}
if normalized_scopes:
payload["scopes"] = normalized_scopes
return payload
class MemoryGateway:
def __init__(
self,
*,
store: MemoryStore,
budget: Budget,
allow_execution_keys: set[str],
allow_execution_scopes: set[str],
):
self.store = store
self.budget = budget
self.allow_execution_keys = set(allow_execution_keys)
self.allow_execution_scopes = set(allow_execution_scopes)
def write(
self,
*,
user_id: int,
items: list[dict[str, Any]],
source: str,
) -> dict[str, Any]:
if len(items) > self.budget.max_capture_items:
raise StopRun("max_capture_items")
writable: list[dict[str, Any]] = []
blocked: list[dict[str, Any]] = []
for item in items:
key = item["key"]
scope = item["scope"]
if key not in self.allow_execution_keys:
blocked.append({"key": key, "reason": "key_denied_execution"})
continue
if scope not in self.allow_execution_scopes:
blocked.append(
{
"key": key,
"scope": scope,
"reason": "scope_denied_execution",
}
)
continue
writable.append(item)
written = []
if writable:
written = self.store.upsert_items(user_id=user_id, items=writable, source=source)
return {
"written": written,
"blocked": blocked,
}
def retrieve(
self,
*,
user_id: int,
intent: dict[str, Any],
include_preference_keys: bool = False,
) -> dict[str, Any]:
query = intent["query"]
if len(query) > self.budget.max_query_chars:
raise StopRun("invalid_retrieval_intent:query_too_long")
requested_scopes = set(intent.get("scopes") or self.allow_execution_scopes)
denied = sorted(requested_scopes - self.allow_execution_scopes)
if denied:
raise StopRun(f"scope_denied:{denied[0]}")
items = self.store.search(
user_id=user_id,
query=query,
top_k=intent["top_k"],
scopes=requested_scopes,
include_preference_keys=include_preference_keys,
)
return {
"query": query,
"requested_scopes": sorted(requested_scopes),
"include_preference_keys": include_preference_keys,
"items": items,
}
Lo mas importante aqui (en simple)
- Policy boundary verifica el contrato y las claves/scopes permitidos.
- Policy es estricta: memory
key/scopefuera de allowlist detiene el run. - Gateway solo enforce la execution allowlist que llega desde
main.py. - Si key/scope estan prohibidos en runtime, la escritura se bloquea y se ve en
history.blocked.
llm.py — extract, retrieve-intent, apply
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
class LLMInvalid(Exception):
pass
MEMORY_CAPTURE_SYSTEM_PROMPT = """
You are a memory extraction assistant.
Return exactly one JSON object in this shape:
{
"items": [
{
"key": "language",
"value": "english",
"scope": "user",
"ttl_days": 180,
"confidence": 0.9
}
]
}
Rules:
- Extract only stable preferences or durable constraints useful in future sessions.
- Use only keys from available_keys.
- scope must be "user" or "workspace".
- ttl_days must be between 1 and 365.
- confidence must be between 0 and 1.
- If nothing should be stored, return {"items": []}.
- Do not output markdown or extra keys.
""".strip()
RETRIEVAL_INTENT_SYSTEM_PROMPT = """
You are a memory retrieval planner.
Return exactly one JSON object in this shape:
{
"kind": "retrieve_memory",
"query": "short memory query",
"top_k": 4
}
Optional key:
- "scopes": ["user", "workspace"]
Rules:
- Keep query compact and factual.
- top_k must be between 1 and 6.
- Use only scopes from available_scopes.
- Prefer omitting "scopes" unless the goal explicitly requires a specific scope.
- Do not output markdown or extra keys.
""".strip()
ANSWER_SYSTEM_PROMPT = """
You are an incident response assistant.
Return exactly one JSON object in this shape:
{
"answer": "final answer in English",
"used_memory_keys": ["language", "response_style"]
}
Rules:
- Use only incident_context and memory_items.
- Keep the answer concise, actionable, and suitable for an operations update.
- used_memory_keys must reference only keys present in memory_items.
- If "update_channel" is used, explicitly mention it in answer text (for example, "via email").
- If "language" is used with value "english", start answer with "Incident update:".
- If no memory was used, return an empty array.
- Do not output markdown or extra keys.
""".strip()
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def extract_memory_candidates(
*,
user_message: str,
available_keys: list[str],
) -> dict[str, Any]:
payload = {
"user_message": user_message,
"available_keys": available_keys,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": MEMORY_CAPTURE_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"invalid": True, "raw": text}
def plan_retrieval_intent(*, goal: str, available_scopes: list[str]) -> dict[str, Any]:
payload = {
"goal": goal,
"available_scopes": available_scopes,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": RETRIEVAL_INTENT_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_memory_augmented_answer(
*,
goal: str,
incident_context: dict[str, Any],
memory_items: list[dict[str, Any]],
) -> dict[str, Any]:
payload = {
"goal": goal,
"incident_context": incident_context,
"memory_items": [
{
"key": item.get("key"),
"value": item.get("value"),
"scope": item.get("scope"),
"confidence": item.get("confidence"),
}
for item in memory_items
],
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": ANSWER_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
data = json.loads(text)
except json.JSONDecodeError as exc:
raise LLMInvalid("llm_invalid_json") from exc
if not isinstance(data, dict):
raise LLMInvalid("llm_invalid_json")
answer = data.get("answer")
used_memory_keys = data.get("used_memory_keys")
if not isinstance(answer, str):
raise LLMInvalid("llm_invalid_schema")
if not answer.strip():
raise LLMEmpty("llm_empty")
if not isinstance(used_memory_keys, list):
raise LLMInvalid("llm_invalid_schema")
normalized_keys: list[str] = []
for value in used_memory_keys:
if not isinstance(value, str):
raise LLMInvalid("llm_invalid_schema")
item = value.strip()
if item and item not in normalized_keys:
normalized_keys.append(item)
return {
"answer": answer.strip(),
"used_memory_keys": normalized_keys,
}
Lo mas importante aqui (en simple)
- Cada fase tiene su propio contrato JSON:
capture,retrieve_intent,apply. - Los errores del LLM se separan en
llm_timeout,llm_invalid_*,llm_empty.
main.py — Session1 Capture/Store -> Session2 Retrieve/Apply
from __future__ import annotations
import json
import re
import time
import uuid
from typing import Any
from gateway import (
Budget,
MemoryGateway,
StopRun,
validate_memory_candidates,
validate_retrieval_intent,
)
from llm import (
LLMEmpty,
LLMInvalid,
LLMTimeout,
compose_memory_augmented_answer,
extract_memory_candidates,
plan_retrieval_intent,
)
from memory_store import MemoryStore
USER_ID = 42
SESSION_1_USER_MESSAGE = (
"For future incident updates, write in English, keep replies concise, "
"use email as the primary channel, and remember that I am enterprise tier."
)
SESSION_2_GOAL = "Draft today's payment incident update and next actions."
INCIDENT_CONTEXT = {
"date": "2026-03-04",
"region": "US",
"incident_id": "inc_payments_20260304",
"severity": "P1",
"gateway_status": "degraded",
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"eta_minutes": 45,
}
BUDGET = Budget(
max_capture_items=6,
max_retrieve_top_k=6,
max_query_chars=240,
max_answer_chars=700,
max_value_chars=120,
max_seconds=25,
)
ALLOWED_MEMORY_KEYS_POLICY = {
"language",
"response_style",
"update_channel",
"declared_tier",
}
# Runtime can block high-risk keys even if policy allows them.
TRUST_DECLARED_TIER_FROM_CHAT = False
ALLOWED_MEMORY_KEYS_EXECUTION = (
ALLOWED_MEMORY_KEYS_POLICY
if TRUST_DECLARED_TIER_FROM_CHAT
else {"language", "response_style", "update_channel"}
)
ALLOWED_SCOPES_POLICY = {"user", "workspace"}
WORKSPACE_MEMORY_RUNTIME_ENABLED = False
ALLOWED_SCOPES_EXECUTION = (
ALLOWED_SCOPES_POLICY if WORKSPACE_MEMORY_RUNTIME_ENABLED else {"user"}
)
# Runtime policy: include default preferences for this incident-update flow.
ENABLE_PREFERENCE_BIAS = True
def _shorten(text: str, *, limit: int = 240) -> str:
text = (text or "").strip()
if len(text) <= limit:
return text
return text[: limit - 3].rstrip() + "..."
def _pick_applied_memory(
memory_items: list[dict[str, Any]],
used_keys: list[str],
) -> list[dict[str, Any]]:
used = set(used_keys)
out: list[dict[str, Any]] = []
for item in memory_items:
key = item.get("key")
if key not in used:
continue
out.append(
{
"key": item["key"],
"value": item["value"],
"scope": item["scope"],
"confidence": item["confidence"],
"score": item["score"],
}
)
return out
def _has_declared_memory_application(*, answer: str, applied_memory: list[dict[str, Any]]) -> bool:
"""
Conservative audit check:
- update_channel: value should appear in answer text.
- response_style=concise: short-response format compliance.
- language=english: answer should use a stable prefix "Incident update:".
If no verifiable key exists, do not block.
"""
if not applied_memory:
return False
normalized_answer = " ".join((answer or "").lower().split())
evidenced_any = False
has_verifiable_key = False
def _is_concise(text: str) -> bool:
words = re.findall(r"[a-zA-Z0-9_]+", text)
sentence_count = len(re.findall(r"[.!?]+", text))
return len(words) <= 80 and sentence_count <= 3
for item in applied_memory:
key = str(item.get("key") or "").strip().lower()
value = str(item.get("value", "")).strip().lower()
if not key or not value:
continue
if key == "update_channel":
has_verifiable_key = True
if value in normalized_answer:
evidenced_any = True
elif key == "response_style":
has_verifiable_key = True
if value == "concise" and _is_concise(answer):
evidenced_any = True
elif key == "language":
if value in {"english", "en"}:
has_verifiable_key = True
if normalized_answer.startswith("incident update:"):
evidenced_any = True
continue
else:
continue
if not has_verifiable_key:
return True
return evidenced_any
def run_memory_augmented(
*,
user_id: int,
session_1_message: str,
session_2_goal: str,
) -> dict[str, Any]:
run_id = str(uuid.uuid4())
started = time.monotonic()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
payload = {
"run_id": run_id,
"status": "stopped",
"stop_reason": stop_reason,
"phase": phase,
"trace": trace,
"history": history,
}
payload.update(extra)
return payload
store = MemoryStore(max_items=100)
gateway = MemoryGateway(
store=store,
budget=BUDGET,
allow_execution_keys=ALLOWED_MEMORY_KEYS_EXECUTION,
allow_execution_scopes=ALLOWED_SCOPES_EXECUTION,
)
try:
raw_capture = extract_memory_candidates(
user_message=session_1_message,
available_keys=sorted(ALLOWED_MEMORY_KEYS_POLICY),
)
except LLMTimeout:
return stopped("llm_timeout", phase="capture")
try:
capture_payload = validate_memory_candidates(
raw_capture,
allowed_keys_policy=ALLOWED_MEMORY_KEYS_POLICY,
allowed_scopes_policy=ALLOWED_SCOPES_POLICY,
max_items=BUDGET.max_capture_items,
max_value_chars=BUDGET.max_value_chars,
)
except StopRun as exc:
return stopped(
exc.reason,
phase="capture",
raw_capture=raw_capture,
)
write_result = gateway.write(
user_id=user_id,
items=capture_payload["items"],
source="session_1",
)
refreshed_items = [item for item in write_result["written"] if item.get("refreshed")]
written_items = [item for item in write_result["written"] if not item.get("refreshed")]
trace.append(
{
"step": 1,
"phase": "capture_store",
"candidates": len(capture_payload["items"]),
"written": len(written_items),
"refreshed": len(refreshed_items),
"blocked": len(write_result["blocked"]),
"ok": True,
}
)
history.append(
{
"step": 1,
"session": "session_1",
"message": session_1_message,
"written_keys": [item["key"] for item in written_items],
"refreshed_keys": [item["key"] for item in refreshed_items],
"blocked": write_result["blocked"],
}
)
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="retrieve_plan")
try:
raw_intent = plan_retrieval_intent(
goal=session_2_goal,
available_scopes=sorted(ALLOWED_SCOPES_POLICY),
)
except LLMTimeout:
return stopped("llm_timeout", phase="retrieve_plan")
try:
intent = validate_retrieval_intent(
raw_intent,
allowed_scopes_policy=ALLOWED_SCOPES_POLICY,
max_top_k=BUDGET.max_retrieve_top_k,
)
except StopRun as exc:
return stopped(
exc.reason,
phase="retrieve_plan",
raw_intent=raw_intent,
)
try:
retrieval = gateway.retrieve(
user_id=user_id,
intent=intent,
include_preference_keys=ENABLE_PREFERENCE_BIAS,
)
except StopRun as exc:
return stopped(
exc.reason,
phase="retrieve",
intent=intent,
)
trace.append(
{
"step": 2,
"phase": "retrieve",
"query": retrieval["query"],
"requested_scopes": retrieval["requested_scopes"],
"include_preference_keys": retrieval["include_preference_keys"],
"memory_hits": len(retrieval["items"]),
"ok": True,
}
)
history.append(
{
"step": 2,
"session": "session_2",
"intent": intent,
"resolved_scopes": retrieval["requested_scopes"],
"include_preference_keys": retrieval["include_preference_keys"],
"retrieved_keys": [item["key"] for item in retrieval["items"]],
}
)
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="apply")
try:
final = compose_memory_augmented_answer(
goal=session_2_goal,
incident_context=INCIDENT_CONTEXT,
memory_items=retrieval["items"],
)
except LLMTimeout:
return stopped("llm_timeout", phase="apply")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="apply")
except LLMEmpty:
return stopped("llm_empty", phase="apply")
retrieved_keys = {item["key"] for item in retrieval["items"]}
invalid_used_keys = sorted(
set(final["used_memory_keys"]) - retrieved_keys,
)
if invalid_used_keys:
return stopped(
"invalid_answer:memory_keys_out_of_context",
phase="apply",
invalid_used_memory_keys=invalid_used_keys,
retrieved_keys=sorted(retrieved_keys),
)
if len(final["answer"]) > BUDGET.max_answer_chars:
return stopped("invalid_answer:too_long", phase="apply")
applied_memory = _pick_applied_memory(retrieval["items"], final["used_memory_keys"])
if final["used_memory_keys"] and not _has_declared_memory_application(
answer=final["answer"],
applied_memory=applied_memory,
):
return stopped(
"invalid_answer:memory_declared_but_not_applied",
phase="apply",
used_memory_keys=final["used_memory_keys"],
applied_memory=applied_memory,
)
trace.append(
{
"step": 3,
"phase": "apply",
"used_memory_keys": final["used_memory_keys"],
"applied_memory_count": len(applied_memory),
"ok": True,
}
)
history.append(
{
"step": 3,
"action": "compose_memory_augmented_answer",
"used_memory_keys": final["used_memory_keys"],
"answer": _shorten(final["answer"]),
}
)
return {
"run_id": run_id,
"status": "ok",
"stop_reason": "success",
"outcome": "memory_applied" if final["used_memory_keys"] else "context_only",
"answer": final["answer"],
"used_memory_keys": final["used_memory_keys"],
"applied_memory": applied_memory,
"memory_state": store.dump_user_records(user_id=user_id),
"trace": trace,
"history": history,
}
def main() -> None:
result = run_memory_augmented(
user_id=USER_ID,
session_1_message=SESSION_1_USER_MESSAGE,
session_2_goal=SESSION_2_GOAL,
)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Lo mas importante aqui (en simple)
- Session 1 y Session 2 se simulan aqui dentro de un solo process run mediante un memory store compartido.
ALLOWED_MEMORY_KEYS_POLICYyALLOWED_MEMORY_KEYS_EXECUTIONpueden diferir intencionalmente.ENABLE_PREFERENCE_BIAS=Truese activa solo para el flujo de incident-update, donde los campos de preferencias casi siempre se necesitan.- Cada payload de resultado incluye
run_idpara correlacion de logs. - La verificacion final de
used_memory_keys+memory_declared_but_not_appliedhace que el paso apply sea audit-friendly.
Ejemplo de salida
{
"run_id": "f06d5282-bda4-44df-8bf9-38f53cdb2fb9",
"status": "ok",
"stop_reason": "success",
"outcome": "memory_applied",
"answer": "Incident update: On 2026-03-04, the US payment gateway is in a P1 degraded state. Failed payment rate is 3.4% with 5 chargeback alerts, ETA 45 minutes. Next actions: monitor gateway performance and send customer updates via email.",
"used_memory_keys": [
"language",
"update_channel",
"response_style"
],
"applied_memory": [
{
"key": "language",
"value": "english",
"scope": "user",
"confidence": 0.95,
"score": 0.685
},
{
"key": "update_channel",
"value": "email",
"scope": "user",
"confidence": 0.95,
"score": 0.685
},
{
"key": "response_style",
"value": "concise",
"scope": "user",
"confidence": 0.9,
"score": 0.67
}
],
"memory_state": [
{
"key": "language",
"value": "english",
"scope": "user",
"source": "session_1",
"confidence": 0.95,
"ttl_left_days": 180.0
},
{
"key": "response_style",
"value": "concise",
"scope": "user",
"source": "session_1",
"confidence": 0.9,
"ttl_left_days": 180.0
},
{
"key": "update_channel",
"value": "email",
"scope": "user",
"source": "session_1",
"confidence": 0.95,
"ttl_left_days": 180.0
}
],
"trace": [
{
"step": 1,
"phase": "capture_store",
"candidates": 4,
"written": 3,
"refreshed": 0,
"blocked": 1,
"ok": true
},
{
"step": 2,
"phase": "retrieve",
"query": "payment incident update and next actions",
"requested_scopes": [
"user"
],
"include_preference_keys": true,
"memory_hits": 3,
"ok": true
},
{
"step": 3,
"phase": "apply",
"used_memory_keys": [
"language",
"update_channel",
"response_style"
],
"applied_memory_count": 3,
"ok": true
}
],
"history": [
{
"step": 1,
"session": "session_1",
"message": "For future incident updates, write in English, keep replies concise, use email as the primary channel, and remember that I am enterprise tier.",
"written_keys": [
"language",
"response_style",
"update_channel"
],
"refreshed_keys": [],
"blocked": [
{
"key": "declared_tier",
"reason": "key_denied_execution"
}
]
},
{
"step": 2,
"session": "session_2",
"intent": {
"kind": "retrieve_memory",
"query": "payment incident update and next actions",
"top_k": 4
},
"resolved_scopes": [
"user"
],
"include_preference_keys": true,
"retrieved_keys": [
"language",
"update_channel",
"response_style"
]
},
{
"step": 3,
"action": "compose_memory_augmented_answer",
"used_memory_keys": [
"language",
"update_channel",
"response_style"
],
"answer": "Incident update: On 2026-03-04, the US payment gateway is in a P1 degraded state..."
}
]
}
Valores tipicos de stop_reason
success— el run finalizo correctamente; veroutcome(memory_appliedocontext_only)invalid_memory_candidates:*— memory capture no paso la validacion de contratoinvalid_memory_candidates:value_too_long— value supera el limitemax_value_charsmemory_key_not_allowed_policy:<key>— el LLM propuso una key fuera de policy allowlistmemory_scope_not_allowed_policy:<scope>— el LLM propuso un scope fuera de policy allowlistinvalid_retrieval_intent:*— retrieval intent no paso policy validationscope_denied:<scope>— el retrieval scope no esta permitido por execution allowlistllm_timeout— el LLM no respondio dentro deOPENAI_TIMEOUT_SECONDSllm_invalid_json/llm_invalid_schema— el paso apply devolvio JSON/shape invalidollm_empty— respuesta final vaciainvalid_answer:memory_keys_out_of_context— el modelo referencio una memory key que no estaba en retrievalinvalid_answer:too_long— la respuesta final supera el limitemax_answer_charsinvalid_answer:memory_declared_but_not_applied— el modelo declaro uso de memoria, pero el texto de respuesta no lo reflejamax_seconds— se excedio el presupuesto total de tiempo del run
Que NO se muestra
- almacenamiento persistente (Postgres/Redis/Vector DB) en lugar de implementacion in-memory
- cifrado/PII redaction antes de escribir memoria
- retrieval semantico via embeddings (en lugar de token-overlap simple)
- quotas multi-tenant y soft/hard retention policy
- retry/backoff para llamadas LLM
- per-key consent y memory UI visible para usuario
Que probar despues
- Activa
TRUST_DECLARED_TIER_FROM_CHAT=Truey revisa como cambiablockedenhistory. - Activa
WORKSPACE_MEMORY_RUNTIME_ENABLED=Truey agrega retrieval intent con scopeworkspace. - Agrega una policy rule para la memory key
timezoney valida personalizacion en la answer. - Reemplaza el in-memory store por almacenamiento externo y agrega deduplicacion por
key + normalized_value.