"""NCM Cue Variant Cache — LLM-generated rephrasing variants for cascade cues.
On first use of any cue/reason string a background task fires a cheap
OpenRouter LLM call to generate alternative phrasings, then immediately
batch-embeds them using google/gemini-embedding-001. The result is stored in
Redis with a randomized 7-14 day TTL and held in-memory so subsequent turns
pay no I/O cost.
Variant selection uses cosine similarity (dot product on unit vectors) against
the current limbic emotional context. Falls back to random.choice when the
context embedding is not yet available.
Redis layout:
- Key: ``ncm:cue_variant:{sha256_hex_of_original_string}``
- Value (v2): JSON object — see ``_CacheEntry`` below
- Value (v1): JSON array of strings (old format — loaded as text-only,
re-embedded lazily on next ``ensure_cached`` call)
"""
from __future__ import annotations
import asyncio
import hashlib
import jsonutil as json
import logging
import random
import re
from collections import OrderedDict
from typing import Optional
import httpx
from gemini_embed_pool import embed_batch_via_gemini
from openrouter_client import OpenRouterClient
from core.distributed_lock import DistributedLock
logger = logging.getLogger(__name__)
OPENROUTER_CHAT_URL = "https://openrouter.ai/api/v1/chat/completions"
EMBED_MODEL = "google/gemini-embedding-001"
REDIS_KEY_PREFIX = "ncm:cue_variant:"
NUM_VARIANTS = 6
_TTL_MIN = 7 * 86400 # 7 days
_TTL_MAX = 14 * 86400 # 14 days
def _random_ttl() -> int:
"""Return a randomized cache TTL in seconds, jittered across 7-14 days.
The jitter spreads Redis key expirations so that cached cue-variant entries
do not all expire in a single thundering herd, smoothing out the LLM
regeneration load. Called by ``CueVariantCache._write_redis`` to pick the
``ex=`` value when persisting a v2 entry; no other callers exist in this
module (the identically named helpers in ``ncm_semantic_triggers.py`` and
``scripts/pregenerate_ncm_triggers.py`` are independent local definitions).
Returns:
int: A random TTL in seconds in the inclusive range
``[_TTL_MIN, _TTL_MAX]`` (7 to 14 days).
"""
return random.randint(_TTL_MIN, _TTL_MAX)
VARIANT_MODELS = [
"google/gemini-3.1-flash-lite",
"stepfun/step-3.5-flash:free",
"nvidia/nemotron-3-nano-30b-a3b:free",
"openai/gpt-oss-120b:free",
"z-ai/glm-4.5-air:free",
]
def _system_prompt() -> str:
"""Internal helper: system prompt.
Returns:
str: Result string.
"""
n = NUM_VARIANTS
examples = ", ".join(f'"variant{i}"' for i in range(1, n + 1))
return (
"You are a neurochemical prose writer for an AI character. "
f"Given an internal emotional/somatic cue phrase, write exactly {n} alternative "
"phrasings. Preserve the core meaning; vary vocabulary, rhythm, and metaphor. "
"Keep each variant terse (5–18 words). "
f"Output ONLY a JSON array with exactly {n} strings: "
f"[{examples}]\n"
"No other text, no markdown, no explanation."
)
# ─────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────
def _cache_key(original: str) -> str:
"""Internal helper: cache key.
Args:
original (str): The original value.
Returns:
str: Result string.
"""
h = hashlib.sha256(original.encode("utf-8")).hexdigest()
return f"{REDIS_KEY_PREFIX}{h}"
def _dot(a: list[float], b: list[float]) -> float:
"""Dot product of two vectors, used as cosine similarity.
Because every embedding in this module is L2-normalized via
``_normalize`` before storage, a plain element-wise dot product equals
the cosine similarity between the two vectors. Pure arithmetic with no
side effects. Called by ``CueVariantCache.get_variant`` to score each
candidate variant against the current emotional context vector.
Args:
a (list[float]): The first (unit-normalized) vector.
b (list[float]): The second (unit-normalized) vector.
Returns:
float: The dot product, equal to cosine similarity for unit vectors.
"""
return sum(x * y for x, y in zip(a, b))
def _normalize(vec: list[float]) -> list[float]:
"""L2-normalize a vector so its Euclidean length becomes 1.
Scaling embeddings to unit length lets later cosine comparisons collapse
to a cheap dot product (see ``_dot``). A near-zero vector is returned
unchanged to avoid division by zero. Pure function with no side effects.
Called by ``_mean_vec`` and by ``CueVariantCache._embed_texts`` to
normalize both centroid and freshly fetched embedding vectors.
Args:
vec (list[float]): The raw embedding vector.
Returns:
list[float]: The unit-length vector, or the original when its norm is
effectively zero.
"""
norm = sum(x * x for x in vec) ** 0.5
if norm < 1e-10:
return vec
return [x / norm for x in vec]
def _mean_vec(vecs: list[list[float]]) -> list[float]:
"""Compute the L2-normalized centroid of a list of vectors.
Averages the input vectors component-wise and re-normalizes the result
via ``_normalize``, producing a single representative direction for the
whole variant set that is stored as the entry's ``mean_vec``. Pure
function with no side effects. Called by ``_entry_to_redis`` and by
``CueVariantCache.ensure_cached`` when assembling cache entries.
Args:
vecs (list[list[float]]): The vectors to average; all assumed to share
the same dimensionality.
Returns:
list[float]: The unit-length centroid, or an empty list when ``vecs``
is empty.
"""
if not vecs:
return []
dim = len(vecs[0])
centroid = [sum(v[i] for v in vecs) / len(vecs) for i in range(dim)]
return _normalize(centroid)
def _parse_variants(text: str) -> list[str]:
"""Extract a list of variant strings from a possibly messy LLM response.
Tolerantly recovers the JSON array of rephrasings that ``_system_prompt``
instructs the model to return, even when wrapped in a Markdown code fence
or surrounded by stray text. It strips any fenced block, slices from the
first ``[`` to the last ``]``, parses that span with the ``jsonutil``
loader, keeps only non-empty strings, and caps the result at
``NUM_VARIANTS``. Pure string processing with no I/O or side effects.
Called by ``CueVariantCache._generate`` to clean each model's raw
completion before the variants are embedded.
Args:
text (str): The raw text content of an LLM completion.
Returns:
list[str]: The parsed variant strings (trimmed, non-empty), at most
``NUM_VARIANTS`` long; an empty list when nothing parses.
"""
if not text:
return []
text = text.strip()
if "```" in text:
m = re.search(r"```(?:json)?\s*(.*?)```", text, re.DOTALL)
if m:
text = m.group(1).strip()
start = text.find("[")
end = text.rfind("]")
if start == -1 or end <= start:
return []
try:
arr = json.loads(text[start : end + 1])
if isinstance(arr, list):
valid = [s.strip() for s in arr if isinstance(s, str) and s.strip()]
if valid:
return valid[:NUM_VARIANTS]
except (json.JSONDecodeError, ValueError):
pass
return []
def _entry_to_redis(original: str, texts: list[str], vecs: list[list[float]]) -> str:
"""Serialize a variant cache entry to its v2 Redis JSON representation.
Builds the on-disk object for a cue: the original string, a list of
``text``/``vec`` variant records pairing each phrasing with its embedding,
and a ``mean_vec`` centroid (from ``_mean_vec``) used for fast context
scoring. Storing ``original`` inside the value is what lets
``load_all_from_redis`` reconstruct the in-memory map from a key scan. Pure
serialization via the ``jsonutil`` dumper with no I/O. Called by
``CueVariantCache._write_redis`` just before the value is persisted.
Args:
original (str): The original cue/reason string.
texts (list[str]): The generated variant phrasings.
vecs (list[list[float]]): The per-variant embeddings, aligned with
``texts``.
Returns:
str: The JSON-encoded v2 cache entry ready to ``SET`` in Redis.
"""
mean = _mean_vec(vecs) if vecs else []
variants = [{"text": t, "vec": v} for t, v in zip(texts, vecs)]
return json.dumps({"original": original, "variants": variants, "mean_vec": mean})
def _entry_from_redis(raw: str) -> dict | None:
"""Deserialize a cached variant value, tolerating both formats.
Decodes a raw Redis value with the ``jsonutil`` loader and normalizes it
into the in-memory entry shape regardless of vintage. A v1 value (a plain
JSON list of strings) yields texts with no embeddings (and ``original``
set to None), signalling the caller to lazily re-embed; a v2 value (an
object with a ``variants`` key) yields aligned ``texts`` and ``vecs`` plus
the stored ``original`` and ``mean_vec``. Returns None when the value is
unparseable or carries no usable texts. Pure decoding with no side
effects. Called by ``CueVariantCache.ensure_cached`` and
``CueVariantCache.load_all_from_redis``.
Args:
raw (str): The raw JSON value read from Redis.
Returns:
dict | None: A dict with keys ``original``, ``texts``, ``vecs``, and
``mean_vec``, or None on failure.
"""
try:
data = json.loads(raw)
except Exception:
return None
# v1 format — plain list of strings
if isinstance(data, list):
texts = [s for s in data if isinstance(s, str)]
if not texts:
return None
return {"original": None, "texts": texts, "vecs": [], "mean_vec": []}
# v2 format
if not isinstance(data, dict) or "variants" not in data:
return None
texts = []
vecs = []
for item in data.get("variants", []):
if isinstance(item, dict) and "text" in item:
texts.append(item["text"])
vecs.append(item.get("vec", []))
if not texts:
return None
return {
"original": data.get("original"),
"texts": texts,
"vecs": vecs,
"mean_vec": data.get("mean_vec", []),
}
# ─────────────────────────────────────────────────────────────────────
# Main class
# ─────────────────────────────────────────────────────────────────────
[docs]
class CueVariantCache:
"""Lazy LLM-backed variant cache for cascade cue and reason strings.
Variant selection is context-aware: before the cascade engine runs,
call ``set_context(emotion_text)`` to register the current dominant-
emotion string. ``get_variant()`` then picks the variant whose
embedding has the highest cosine similarity to the context embedding.
Falls back to ``random.choice`` until the context embedding is ready.
Parameters
----------
redis_client:
An ``redis.asyncio.Redis`` instance. May be None.
api_key:
OpenRouter API key. When None the cache is a no-op.
"""
[docs]
def __init__(
self,
redis_client=None,
api_key: Optional[str] = None,
openrouter_client: OpenRouterClient | None = None,
variant_models: Optional[list[str]] = None,
) -> None:
"""Initialize the instance.
Args:
redis_client: Redis connection client.
api_key (Optional[str]): The api key value.
openrouter_client: Shared OpenRouterClient for connection
pooling and batch embedding. Falls back to direct HTTP
when None.
variant_models: Override models for LLM generation. When None,
uses VARIANT_MODELS. Use e.g. ["google/gemini-3.1-flash-lite"]
for paid-only, high-throughput pregeneration.
"""
self._redis = redis_client
self._api_key = api_key
self._openrouter = openrouter_client
self._variant_models = (
variant_models if variant_models is not None else VARIANT_MODELS
)
# original → {"texts": [...], "vecs": [...], "mean_vec": [...]}
self._mem: dict[str, dict] = {}
# Strings currently being generated / embedded
self._pending: set[str] = set()
# context_text → embedding vector (LRU-evicted, max 128)
self._query_cache: OrderedDict[str, list[float]] = OrderedDict()
self._query_cache_max = 128
# Strings whose context embedding is currently being fetched
self._embed_pending: set[str] = set()
# The query vector for the current turn (set by set_context)
self._current_query_vec: list[float] | None = None
# L2: Track background tasks for clean shutdown
self._background_tasks: set[asyncio.Task] = set()
# ------------------------------------------------------------------
# Public hot-path (sync)
# ------------------------------------------------------------------
[docs]
def set_context(self, context_text: str) -> None:
"""Register the current turn's emotional context for variant selection.
Records the dominant-emotion string that ``get_variant`` should resonate
with this turn. Returns immediately (no awaiting): on an
``_query_cache`` hit it sets ``_current_query_vec`` synchronously so the
very next ``get_variant`` is context-aware; on a miss it clears the
query vector (so this turn falls back to ``random.choice``) and
schedules a detached ``_embed_context`` task to populate the cache for
later turns, tracking it in ``_background_tasks``. A missing running
loop is swallowed by discarding the pending marker. Side effects:
mutates ``_current_query_vec`` and ``_embed_pending`` and may spawn a
background embedding task; no Redis or filesystem access here. Called by
``LimbicCoordinator`` in ``limbic_system/coordinator.py`` (twice, when
the dominant emotion and the broader context are known).
Args:
context_text (str): The current dominant-emotion / context string.
Returns:
None
"""
if not context_text or not self._api_key:
self._current_query_vec = None
return
cached = self._query_cache.get(context_text)
if cached is not None:
self._current_query_vec = cached
return
# Not yet embedded — clear for this turn and schedule background fetch
self._current_query_vec = None
if context_text not in self._embed_pending:
self._embed_pending.add(context_text)
try:
task = asyncio.get_running_loop().create_task(
self._embed_context(context_text)
)
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
except RuntimeError:
self._embed_pending.discard(context_text)
[docs]
def get_variant(self, s: str) -> str:
"""Return the most contextually resonant cached variant, or the original.
The synchronous hot-path read: looks up ``s`` in the in-memory ``_mem``
map, and when the entry has per-variant embeddings and a current query
vector (set by ``set_context``) it scores each variant with ``_dot``
and returns the highest-similarity phrasing. With no warmed query
vector, no embeddings, or any scoring error it falls back to
``random.choice`` over the cached texts; with no cache entry at all it
returns ``s`` unchanged. Read-only with no I/O or side effects. Called
by ``cascade_engine.py`` and by ``LimbicCoordinator`` in
``limbic_system/coordinator.py`` to colour cue and reason strings.
Args:
s (str): The original cue/reason string to find a variant for.
Returns:
str: The selected variant phrasing, or ``s`` itself when no cached
variants exist.
"""
entry = self._mem.get(s)
if not entry:
return s
texts = entry.get("texts", [])
if not texts:
return s
vecs = entry.get("vecs", [])
qvec = self._current_query_vec
if qvec and vecs and len(vecs) == len(texts):
try:
scores = [_dot(qvec, v) for v in vecs if v]
if len(scores) == len(texts):
best_idx = scores.index(max(scores))
return texts[best_idx]
except Exception:
pass
return random.choice(texts)
# ------------------------------------------------------------------
# Public warm-up paths (async)
# ------------------------------------------------------------------
[docs]
async def ensure_cached(self, s: str) -> None:
"""Generate, embed, and cache variants for ``s`` if not already present.
The lazy warm-up path for a single cue: short-circuits when ``s`` is
already fully embedded in ``_mem``, then takes a ``DistributedLock``
(``sg:lock:variant_gen:{cue_hash}``) so only one worker across the
cluster generates a given cue. It reads Redis first via ``_cache_key``
and ``_entry_from_redis`` — loading a complete v2 hit directly, or
keeping a v1/embedding-less hit's texts to re-embed — and otherwise
runs the full pipeline: ``_generate`` (LLM rephrasings) then
``_embed_texts`` (Gemini/OpenRouter embeddings), populating ``_mem``
and writing back through ``_write_redis``. Fire-and-forget safe and
idempotent; the lock is always released in a ``finally`` block. Side
effects: Redis reads/writes, LLM and embedding network calls, and a
distributed lock. Called by ``cascade_engine.py`` and
``LimbicCoordinator`` (via ``asyncio.create_task``), by the cue
pregeneration script, and in the telemetry/lock-migration tests.
Args:
s (str): The cue/reason string to ensure variants for.
Returns:
None
"""
if not s or not self._api_key:
return
# Already in memory and has embeddings — nothing to do
entry = self._mem.get(s)
if entry and entry.get("vecs"):
return
cue_hash = hashlib.sha256(s.encode()).hexdigest()
lock = DistributedLock(self._redis, f"sg:lock:variant_gen:{cue_hash}", ttl=30)
if not await lock.acquire():
return
# Check Redis first
if self._redis:
try:
raw = await self._redis.get(_cache_key(s))
if raw:
parsed = _entry_from_redis(raw)
if parsed:
logger.info(
"CueVariantCache [Redis Hit]: Loaded cached cue variants for '%r' from Redis key '%s'",
s,
_cache_key(s),
)
if parsed["vecs"]:
# Full v2 entry — load directly
self._mem[s] = parsed
return
else:
# v1 or embedding-less entry — use texts but
# fall through to (re-)embed below
self._mem[s] = parsed
else:
logger.warning(
"CueVariantCache [Redis Corrupt]: Invalid JSON structure in Redis key '%s' for cue '%r'",
_cache_key(s),
s,
)
else:
logger.info(
"CueVariantCache [Redis Miss]: Variants for '%r' not found in Redis cache under key '%s'",
s,
_cache_key(s),
)
except Exception as e:
logger.warning(
"CueVariantCache [Redis Read Error]: Unexpected error reading cue '%r' key '%s': %s",
s,
_cache_key(s),
e,
)
try:
# Do we already have texts (from v1 Redis hit) or need to generate?
existing = self._mem.get(s)
if existing and existing.get("texts") and not existing.get("vecs"):
texts = existing["texts"]
# Need to embed only
vecs = await self._embed_texts(texts)
if vecs:
entry = {
"original": s,
"texts": texts,
"vecs": vecs,
"mean_vec": _mean_vec(vecs),
}
self._mem[s] = entry
await self._write_redis(s, texts, vecs)
else:
# Full generation pipeline
texts = await self._generate(s)
if texts:
vecs = await self._embed_texts(texts)
entry = {
"original": s,
"texts": texts,
"vecs": vecs if vecs else [],
"mean_vec": _mean_vec(vecs) if vecs else [],
}
self._mem[s] = entry
await self._write_redis(s, texts, vecs or [])
except Exception as e:
logger.warning("ensure_cached failed for %r: %s", s[:60], e)
finally:
await lock.release()
[docs]
async def load_all_from_redis(self) -> None:
"""Warm the in-memory layer by scanning every cached entry in Redis.
Bulk-restores ``_mem`` on startup by ``SCAN``-paging the
``ncm:cue_variant:`` keyspace, fetching each page's values
concurrently behind an ``asyncio.Semaphore`` (limit 2) to avoid
saturating the connection pool, and decoding them with
``_entry_from_redis``. Only v2 entries that carry both ``original`` and
``texts`` are reinstated — because the original string lives inside the
value, every previously generated cue can be keyed back into ``_mem``
without re-querying the LLM. Read errors and decode failures are logged
and skipped. Side effects: Redis scans/reads and mutation of ``_mem``;
a no-op when ``self._redis`` is unset. Called once by
``LimbicCoordinator`` at startup (via ``asyncio.create_task``), by the
cue pregeneration script, and in the telemetry/lock-migration tests.
Returns:
None
"""
if not self._redis:
return
try:
pattern = f"{REDIS_KEY_PREFIX}*"
cursor = 0
loaded = 0
while True:
cursor, keys = await self._redis.scan(cursor, match=pattern, count=200)
if not keys:
if cursor == 0:
break
continue
# Fetch all keys in the current page concurrently, throttled to 2 connections
sem = asyncio.Semaphore(2)
async def sem_get(k):
"""Fetch one Redis key under the shared concurrency semaphore.
Closure helper used to read a single variant-cache key while
holding ``sem``, capping the warm-up scan to 2 concurrent
Redis ``GET`` calls per page so the bulk restore does not
saturate the connection pool. Awaits ``self._redis.get(k)``
and is invoked only by the surrounding ``load_all_from_redis``
via ``asyncio.gather`` over the current page of keys.
Args:
k: The Redis key (``bytes`` or ``str``) to fetch.
Returns:
The raw Redis value for ``k`` (typically ``bytes`` JSON),
or None when the key is absent.
"""
async with sem:
return await self._redis.get(k)
results = await asyncio.gather(
*(sem_get(key) for key in keys),
return_exceptions=True
)
for key, raw in zip(keys, results):
if isinstance(raw, Exception):
logger.warning(
"CueVariantCache [Redis Read Error]: Unexpected error reading variant key '%s': %s",
key.decode() if isinstance(key, bytes) else str(key),
raw,
)
continue
if not raw:
continue
try:
parsed = _entry_from_redis(raw)
if parsed and parsed.get("original") and parsed.get("texts"):
self._mem[parsed["original"]] = parsed
loaded += 1
logger.info(
"CueVariantCache [Redis Load]: Restored cached cue variants for '%r' from key '%s'",
parsed["original"],
key.decode() if isinstance(key, bytes) else str(key),
)
except Exception:
pass
if cursor == 0:
break
logger.info("Variant cache warm-up: loaded %d entries from Redis", loaded)
except Exception as e:
logger.debug("Variant cache warm-up scan failed: %s", e)
# ------------------------------------------------------------------
# Internal embedding helpers
# ------------------------------------------------------------------
async def _embed_context(self, context_text: str) -> None:
"""Embed a context string and cache its query vector for later turns.
The background completion of the cache miss in ``set_context``: it takes a
short-lived ``DistributedLock`` (``sg:lock:embed:{sha256}``) so only
one worker embeds a given context, calls ``_embed_texts``, and stores
the result in the LRU ``_query_cache`` (evicting the oldest entry past
``_query_cache_max``). If no context has superseded it in the meantime,
it also updates the live ``_current_query_vec`` so the current turn can
start scoring as soon as the embedding lands. The lock is released and
the pending marker cleared in a ``finally`` block. Side effects:
embedding network call, a distributed lock, and mutation of
``_query_cache`` / ``_current_query_vec`` / ``_embed_pending``. Called
only by ``set_context`` (as a detached task) and directly in the
lock-migration tests.
Args:
context_text (str): The emotional-context string to embed.
Returns:
None
"""
key = hashlib.sha256(context_text.encode()).hexdigest()
lock = DistributedLock(self._redis, f"sg:lock:embed:{key}", ttl=30)
if not await lock.acquire():
return
try:
vecs = await self._embed_texts([context_text])
if vecs:
# LRU eviction
if len(self._query_cache) >= self._query_cache_max:
self._query_cache.popitem(last=False)
self._query_cache[context_text] = vecs[0]
# If this is still the intended context, update live
if self._current_query_vec is None:
self._current_query_vec = vecs[0]
logger.debug("Context embedding ready for: %.60s", context_text)
except Exception as e:
logger.debug("Context embed failed for %r: %s", context_text[:60], e)
finally:
await lock.release()
self._embed_pending.discard(context_text)
[docs]
async def drain(self) -> None:
"""Await all in-flight background embedding tasks, then clear the set.
Provides a clean shutdown / barrier point: it gathers every task in
``_background_tasks`` (the detached ``_embed_context`` coroutines spawned
by ``set_context``) with exceptions suppressed, then empties the set so
no stragglers leak. Side effects: blocks on outstanding tasks and
mutates ``_background_tasks``. Called by the cue pregeneration script
(``scripts/pregenerate_ncm_cue_variants``) to make sure all embeddings
finish before the process exits.
Returns:
None
"""
if self._background_tasks:
await asyncio.gather(*self._background_tasks, return_exceptions=True)
self._background_tasks.clear()
async def _embed_texts(self, texts: list[str]) -> list[list[float]]:
"""Batch-embed *texts* using ``google/gemini-embedding-001``.
When an ``OpenRouterClient`` was provided at init time, delegates
to ``client.embed_batch()`` for shared connection pooling,
dual-provider failover, and the batch embedding API. Otherwise
falls back to direct HTTP (original implementation).
"""
if not texts:
return []
# ── Preferred path: shared OpenRouterClient ──────────────
if self._openrouter is not None:
try:
raw = await self._openrouter.embed_batch(
texts,
EMBED_MODEL,
)
return [_normalize(v) for v in raw if v]
except Exception as e:
logger.warning(
"Shared client embed_batch failed, "
"falling back to direct HTTP: %s",
e,
)
# Fall through to direct HTTP below
# ── Fallback: Gemini API via shared key pool ─────────────
try:
raw = await embed_batch_via_gemini(texts, EMBED_MODEL)
return [_normalize(v) for v in raw]
except Exception as e:
logger.debug("Embedding request failed: %s", e)
return []
async def _write_redis(
self, original: str, texts: list[str], vecs: list[list[float]]
) -> None:
"""Persist a v2 variant entry to Redis with a jittered 7-14 day TTL.
Serializes the cue, its variant texts, and their embeddings via
``_entry_to_redis`` and writes the result with ``SET`` under ``_cache_key(original)``
with an ``ex=`` expiry from ``_random_ttl`` (so entries do not all
expire together and trigger a regeneration herd). A no-op when
``self._redis`` is unset; write failures are logged and swallowed
rather than raised. Side effects: a single Redis write. Called by
``ensure_cached`` after both the re-embed and the full-generation
branches, and directly in the telemetry tests.
Args:
original (str): The original cue/reason string.
texts (list[str]): The variant phrasings to store.
vecs (list[list[float]]): The per-variant embeddings aligned with
``texts``.
Returns:
None
"""
if not self._redis:
return
try:
ttl = _random_ttl()
logger.info(
"CueVariantCache [Redis Store Attempt]: Saving variants for cue '%r' to Redis key '%s' with TTL=%d",
original,
_cache_key(original),
ttl,
)
payload = _entry_to_redis(original, texts, vecs)
await self._redis.set(_cache_key(original), payload, ex=ttl)
logger.info(
"CueVariantCache [Redis Store Success]: Successfully cached variants for cue '%r' in Redis key '%s'",
original,
_cache_key(original),
)
except Exception as e:
logger.warning(
"CueVariantCache [Redis Store Error]: Failed to write cue '%r' to Redis key '%s': %s",
original,
_cache_key(original),
e,
)
# ------------------------------------------------------------------
# Internal LLM generation
# ------------------------------------------------------------------
async def _generate(self, original: str) -> list[str]:
"""Generate rephrasing variants for a cue via OpenRouter, rotating models.
Posts a chat completion (system prompt from ``_system_prompt``, the cue
as the user message) directly to the OpenRouter chat endpoint over a
fresh ``httpx.AsyncClient``, trying each model in ``self._variant_models``
in turn and returning the first batch that ``_parse_variants`` extracts
successfully. A 429 sleeps briefly and advances to the next model;
API-level errors and exceptions are recorded and skipped so one bad
model cannot stall the rest. Side effects: outbound LLM HTTP calls and
logging; no Redis or filesystem access, and the OpenRouter API key is
read but never logged. Called by ``ensure_cached`` in this module.
Args:
original (str): The cue/reason string to rephrase.
Returns:
list[str]: The parsed variant phrasings from the first model that
succeeds, or an empty list when every model fails.
"""
if not self._api_key:
return []
headers = {
"Authorization": f"Bearer {self._api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/matrix-llm-bot",
"X-Title": "Matrix LLM Bot",
}
user_msg = f'Original: "{original}"'
last_error: str = ""
async with httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=10.0)
) as client:
for model in self._variant_models:
payload = {
"model": model,
"messages": [
{"role": "system", "content": _system_prompt()},
{"role": "user", "content": user_msg},
],
"temperature": 0.9,
"max_tokens": 1024,
}
try:
resp = await client.post(
OPENROUTER_CHAT_URL, json=payload, headers=headers
)
if resp.status_code == 429:
logger.debug(
"Variant gen rate-limited on %s, waiting 5s then next",
model,
)
last_error = "rate limited"
await asyncio.sleep(5)
continue
resp.raise_for_status()
data = resp.json()
if "error" in data:
last_error = str(data["error"])
logger.debug("Variant gen error from %s: %s", model, last_error)
continue
text = (
data.get("choices", [{}])[0]
.get("message", {})
.get("content", "")
)
variants = _parse_variants(text)
if variants:
logger.info(
"Generated %d variants for %r via %s",
len(variants),
original[:50],
model,
)
return variants
logger.debug(
"Empty variant parse from %s (raw: %.200s)", model, text
)
last_error = "empty parse"
except Exception as e:
last_error = str(e)
logger.debug("Variant gen exception on %s: %s", model, e)
continue
logger.warning(
"All variant models failed for %r — last error: %s",
original[:60],
last_error,
)
return []