Source code for ncm_variant_cache

"""NCM Cue Variant Cache — LLM-generated rephrasing variants for cascade cues.

On first use of any cue/reason string a background task fires a cheap
OpenRouter LLM call to generate alternative phrasings, then immediately
batch-embeds them using google/gemini-embedding-001. The result is stored in
Redis with a randomized 7-14 day TTL and held in-memory so subsequent turns
pay no I/O cost.

Variant selection uses cosine similarity (dot product on unit vectors) against
the current limbic emotional context. Falls back to random.choice when the
context embedding is not yet available.

Redis layout:

- Key: ``ncm:cue_variant:{sha256_hex_of_original_string}``
- Value (v2): JSON object — see ``_CacheEntry`` below
- Value (v1): JSON array of strings (old format — loaded as text-only,
  re-embedded lazily on next ``ensure_cached`` call)
"""

from __future__ import annotations

import asyncio
import hashlib
import jsonutil as json
import logging
import random
import re
from collections import OrderedDict
from typing import Optional

import httpx

from gemini_embed_pool import embed_batch_via_gemini
from openrouter_client import OpenRouterClient
from core.distributed_lock import DistributedLock


logger = logging.getLogger(__name__)

OPENROUTER_CHAT_URL = "https://openrouter.ai/api/v1/chat/completions"
EMBED_MODEL = "google/gemini-embedding-001"
REDIS_KEY_PREFIX = "ncm:cue_variant:"
NUM_VARIANTS = 6
_TTL_MIN = 7 * 86400  # 7 days
_TTL_MAX = 14 * 86400  # 14 days


def _random_ttl() -> int:
    """Return a randomized cache TTL in seconds, jittered across 7-14 days.

    The jitter spreads Redis key expirations so that cached cue-variant entries
    do not all expire in a single thundering herd, smoothing out the LLM
    regeneration load. Called by ``CueVariantCache._write_redis`` to pick the
    ``ex=`` value when persisting a v2 entry; no other callers exist in this
    module (the identically named helpers in ``ncm_semantic_triggers.py`` and
    ``scripts/pregenerate_ncm_triggers.py`` are independent local definitions).

    Returns:
        int: A random TTL in seconds in the inclusive range
        ``[_TTL_MIN, _TTL_MAX]`` (7 to 14 days).
    """
    return random.randint(_TTL_MIN, _TTL_MAX)


VARIANT_MODELS = [
    "google/gemini-3.1-flash-lite",
    "stepfun/step-3.5-flash:free",
    "nvidia/nemotron-3-nano-30b-a3b:free",
    "openai/gpt-oss-120b:free",
    "z-ai/glm-4.5-air:free",
]


def _system_prompt() -> str:
    """Internal helper: system prompt.

    Returns:
        str: Result string.
    """
    n = NUM_VARIANTS
    examples = ", ".join(f'"variant{i}"' for i in range(1, n + 1))
    return (
        "You are a neurochemical prose writer for an AI character. "
        f"Given an internal emotional/somatic cue phrase, write exactly {n} alternative "
        "phrasings. Preserve the core meaning; vary vocabulary, rhythm, and metaphor. "
        "Keep each variant terse (5–18 words). "
        f"Output ONLY a JSON array with exactly {n} strings: "
        f"[{examples}]\n"
        "No other text, no markdown, no explanation."
    )


# ─────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────


def _cache_key(original: str) -> str:
    """Internal helper: cache key.

    Args:
        original (str): The original value.

    Returns:
        str: Result string.
    """
    h = hashlib.sha256(original.encode("utf-8")).hexdigest()
    return f"{REDIS_KEY_PREFIX}{h}"


def _dot(a: list[float], b: list[float]) -> float:
    """Dot product of two vectors, used as cosine similarity.

    Because every embedding in this module is L2-normalized via
    ``_normalize`` before storage, a plain element-wise dot product equals
    the cosine similarity between the two vectors. Pure arithmetic with no
    side effects. Called by ``CueVariantCache.get_variant`` to score each
    candidate variant against the current emotional context vector.

    Args:
        a (list[float]): The first (unit-normalized) vector.
        b (list[float]): The second (unit-normalized) vector.

    Returns:
        float: The dot product, equal to cosine similarity for unit vectors.
    """
    return sum(x * y for x, y in zip(a, b))


def _normalize(vec: list[float]) -> list[float]:
    """L2-normalize a vector so its Euclidean length becomes 1.

    Scaling embeddings to unit length lets later cosine comparisons collapse
    to a cheap dot product (see ``_dot``). A near-zero vector is returned
    unchanged to avoid division by zero. Pure function with no side effects.
    Called by ``_mean_vec`` and by ``CueVariantCache._embed_texts`` to
    normalize both centroid and freshly fetched embedding vectors.

    Args:
        vec (list[float]): The raw embedding vector.

    Returns:
        list[float]: The unit-length vector, or the original when its norm is
        effectively zero.
    """
    norm = sum(x * x for x in vec) ** 0.5
    if norm < 1e-10:
        return vec
    return [x / norm for x in vec]


def _mean_vec(vecs: list[list[float]]) -> list[float]:
    """Compute the L2-normalized centroid of a list of vectors.

    Averages the input vectors component-wise and re-normalizes the result
    via ``_normalize``, producing a single representative direction for the
    whole variant set that is stored as the entry's ``mean_vec``. Pure
    function with no side effects. Called by ``_entry_to_redis`` and by
    ``CueVariantCache.ensure_cached`` when assembling cache entries.

    Args:
        vecs (list[list[float]]): The vectors to average; all assumed to share
            the same dimensionality.

    Returns:
        list[float]: The unit-length centroid, or an empty list when ``vecs``
        is empty.
    """
    if not vecs:
        return []
    dim = len(vecs[0])
    centroid = [sum(v[i] for v in vecs) / len(vecs) for i in range(dim)]
    return _normalize(centroid)


def _parse_variants(text: str) -> list[str]:
    """Extract a list of variant strings from a possibly messy LLM response.

    Tolerantly recovers the JSON array of rephrasings that ``_system_prompt``
    instructs the model to return, even when wrapped in a Markdown code fence
    or surrounded by stray text. It strips any fenced block, slices from the
    first ``[`` to the last ``]``, parses that span with the ``jsonutil``
    loader, keeps only non-empty strings, and caps the result at
    ``NUM_VARIANTS``. Pure string processing with no I/O or side effects.
    Called by ``CueVariantCache._generate`` to clean each model's raw
    completion before the variants are embedded.

    Args:
        text (str): The raw text content of an LLM completion.

    Returns:
        list[str]: The parsed variant strings (trimmed, non-empty), at most
        ``NUM_VARIANTS`` long; an empty list when nothing parses.
    """
    if not text:
        return []
    text = text.strip()

    if "```" in text:
        m = re.search(r"```(?:json)?\s*(.*?)```", text, re.DOTALL)
        if m:
            text = m.group(1).strip()

    start = text.find("[")
    end = text.rfind("]")
    if start == -1 or end <= start:
        return []

    try:
        arr = json.loads(text[start : end + 1])
        if isinstance(arr, list):
            valid = [s.strip() for s in arr if isinstance(s, str) and s.strip()]
            if valid:
                return valid[:NUM_VARIANTS]
    except (json.JSONDecodeError, ValueError):
        pass

    return []


def _entry_to_redis(original: str, texts: list[str], vecs: list[list[float]]) -> str:
    """Serialize a variant cache entry to its v2 Redis JSON representation.

    Builds the on-disk object for a cue: the original string, a list of
    ``text``/``vec`` variant records pairing each phrasing with its embedding,
    and a ``mean_vec`` centroid (from ``_mean_vec``) used for fast context
    scoring. Storing ``original`` inside the value is what lets
    ``load_all_from_redis`` reconstruct the in-memory map from a key scan. Pure
    serialization via the ``jsonutil`` dumper with no I/O. Called by
    ``CueVariantCache._write_redis`` just before the value is persisted.

    Args:
        original (str): The original cue/reason string.
        texts (list[str]): The generated variant phrasings.
        vecs (list[list[float]]): The per-variant embeddings, aligned with
            ``texts``.

    Returns:
        str: The JSON-encoded v2 cache entry ready to ``SET`` in Redis.
    """
    mean = _mean_vec(vecs) if vecs else []
    variants = [{"text": t, "vec": v} for t, v in zip(texts, vecs)]
    return json.dumps({"original": original, "variants": variants, "mean_vec": mean})


def _entry_from_redis(raw: str) -> dict | None:
    """Deserialize a cached variant value, tolerating both formats.

    Decodes a raw Redis value with the ``jsonutil`` loader and normalizes it
    into the in-memory entry shape regardless of vintage. A v1 value (a plain
    JSON list of strings) yields texts with no embeddings (and ``original``
    set to None), signalling the caller to lazily re-embed; a v2 value (an
    object with a ``variants`` key) yields aligned ``texts`` and ``vecs`` plus
    the stored ``original`` and ``mean_vec``. Returns None when the value is
    unparseable or carries no usable texts. Pure decoding with no side
    effects. Called by ``CueVariantCache.ensure_cached`` and
    ``CueVariantCache.load_all_from_redis``.

    Args:
        raw (str): The raw JSON value read from Redis.

    Returns:
        dict | None: A dict with keys ``original``, ``texts``, ``vecs``, and
        ``mean_vec``, or None on failure.
    """
    try:
        data = json.loads(raw)
    except Exception:
        return None

    # v1 format — plain list of strings
    if isinstance(data, list):
        texts = [s for s in data if isinstance(s, str)]
        if not texts:
            return None
        return {"original": None, "texts": texts, "vecs": [], "mean_vec": []}

    # v2 format
    if not isinstance(data, dict) or "variants" not in data:
        return None

    texts = []
    vecs = []
    for item in data.get("variants", []):
        if isinstance(item, dict) and "text" in item:
            texts.append(item["text"])
            vecs.append(item.get("vec", []))

    if not texts:
        return None

    return {
        "original": data.get("original"),
        "texts": texts,
        "vecs": vecs,
        "mean_vec": data.get("mean_vec", []),
    }


# ─────────────────────────────────────────────────────────────────────
# Main class
# ─────────────────────────────────────────────────────────────────────


[docs] class CueVariantCache: """Lazy LLM-backed variant cache for cascade cue and reason strings. Variant selection is context-aware: before the cascade engine runs, call ``set_context(emotion_text)`` to register the current dominant- emotion string. ``get_variant()`` then picks the variant whose embedding has the highest cosine similarity to the context embedding. Falls back to ``random.choice`` until the context embedding is ready. Parameters ---------- redis_client: An ``redis.asyncio.Redis`` instance. May be None. api_key: OpenRouter API key. When None the cache is a no-op. """
[docs] def __init__( self, redis_client=None, api_key: Optional[str] = None, openrouter_client: OpenRouterClient | None = None, variant_models: Optional[list[str]] = None, ) -> None: """Initialize the instance. Args: redis_client: Redis connection client. api_key (Optional[str]): The api key value. openrouter_client: Shared OpenRouterClient for connection pooling and batch embedding. Falls back to direct HTTP when None. variant_models: Override models for LLM generation. When None, uses VARIANT_MODELS. Use e.g. ["google/gemini-3.1-flash-lite"] for paid-only, high-throughput pregeneration. """ self._redis = redis_client self._api_key = api_key self._openrouter = openrouter_client self._variant_models = ( variant_models if variant_models is not None else VARIANT_MODELS ) # original → {"texts": [...], "vecs": [...], "mean_vec": [...]} self._mem: dict[str, dict] = {} # Strings currently being generated / embedded self._pending: set[str] = set() # context_text → embedding vector (LRU-evicted, max 128) self._query_cache: OrderedDict[str, list[float]] = OrderedDict() self._query_cache_max = 128 # Strings whose context embedding is currently being fetched self._embed_pending: set[str] = set() # The query vector for the current turn (set by set_context) self._current_query_vec: list[float] | None = None # L2: Track background tasks for clean shutdown self._background_tasks: set[asyncio.Task] = set()
# ------------------------------------------------------------------ # Public hot-path (sync) # ------------------------------------------------------------------
[docs] def set_context(self, context_text: str) -> None: """Register the current turn's emotional context for variant selection. Records the dominant-emotion string that ``get_variant`` should resonate with this turn. Returns immediately (no awaiting): on an ``_query_cache`` hit it sets ``_current_query_vec`` synchronously so the very next ``get_variant`` is context-aware; on a miss it clears the query vector (so this turn falls back to ``random.choice``) and schedules a detached ``_embed_context`` task to populate the cache for later turns, tracking it in ``_background_tasks``. A missing running loop is swallowed by discarding the pending marker. Side effects: mutates ``_current_query_vec`` and ``_embed_pending`` and may spawn a background embedding task; no Redis or filesystem access here. Called by ``LimbicCoordinator`` in ``limbic_system/coordinator.py`` (twice, when the dominant emotion and the broader context are known). Args: context_text (str): The current dominant-emotion / context string. Returns: None """ if not context_text or not self._api_key: self._current_query_vec = None return cached = self._query_cache.get(context_text) if cached is not None: self._current_query_vec = cached return # Not yet embedded — clear for this turn and schedule background fetch self._current_query_vec = None if context_text not in self._embed_pending: self._embed_pending.add(context_text) try: task = asyncio.get_running_loop().create_task( self._embed_context(context_text) ) self._background_tasks.add(task) task.add_done_callback(self._background_tasks.discard) except RuntimeError: self._embed_pending.discard(context_text)
[docs] def get_variant(self, s: str) -> str: """Return the most contextually resonant cached variant, or the original. The synchronous hot-path read: looks up ``s`` in the in-memory ``_mem`` map, and when the entry has per-variant embeddings and a current query vector (set by ``set_context``) it scores each variant with ``_dot`` and returns the highest-similarity phrasing. With no warmed query vector, no embeddings, or any scoring error it falls back to ``random.choice`` over the cached texts; with no cache entry at all it returns ``s`` unchanged. Read-only with no I/O or side effects. Called by ``cascade_engine.py`` and by ``LimbicCoordinator`` in ``limbic_system/coordinator.py`` to colour cue and reason strings. Args: s (str): The original cue/reason string to find a variant for. Returns: str: The selected variant phrasing, or ``s`` itself when no cached variants exist. """ entry = self._mem.get(s) if not entry: return s texts = entry.get("texts", []) if not texts: return s vecs = entry.get("vecs", []) qvec = self._current_query_vec if qvec and vecs and len(vecs) == len(texts): try: scores = [_dot(qvec, v) for v in vecs if v] if len(scores) == len(texts): best_idx = scores.index(max(scores)) return texts[best_idx] except Exception: pass return random.choice(texts)
# ------------------------------------------------------------------ # Public warm-up paths (async) # ------------------------------------------------------------------
[docs] async def ensure_cached(self, s: str) -> None: """Generate, embed, and cache variants for ``s`` if not already present. The lazy warm-up path for a single cue: short-circuits when ``s`` is already fully embedded in ``_mem``, then takes a ``DistributedLock`` (``sg:lock:variant_gen:{cue_hash}``) so only one worker across the cluster generates a given cue. It reads Redis first via ``_cache_key`` and ``_entry_from_redis`` — loading a complete v2 hit directly, or keeping a v1/embedding-less hit's texts to re-embed — and otherwise runs the full pipeline: ``_generate`` (LLM rephrasings) then ``_embed_texts`` (Gemini/OpenRouter embeddings), populating ``_mem`` and writing back through ``_write_redis``. Fire-and-forget safe and idempotent; the lock is always released in a ``finally`` block. Side effects: Redis reads/writes, LLM and embedding network calls, and a distributed lock. Called by ``cascade_engine.py`` and ``LimbicCoordinator`` (via ``asyncio.create_task``), by the cue pregeneration script, and in the telemetry/lock-migration tests. Args: s (str): The cue/reason string to ensure variants for. Returns: None """ if not s or not self._api_key: return # Already in memory and has embeddings — nothing to do entry = self._mem.get(s) if entry and entry.get("vecs"): return cue_hash = hashlib.sha256(s.encode()).hexdigest() lock = DistributedLock(self._redis, f"sg:lock:variant_gen:{cue_hash}", ttl=30) if not await lock.acquire(): return # Check Redis first if self._redis: try: raw = await self._redis.get(_cache_key(s)) if raw: parsed = _entry_from_redis(raw) if parsed: logger.info( "CueVariantCache [Redis Hit]: Loaded cached cue variants for '%r' from Redis key '%s'", s, _cache_key(s), ) if parsed["vecs"]: # Full v2 entry — load directly self._mem[s] = parsed return else: # v1 or embedding-less entry — use texts but # fall through to (re-)embed below self._mem[s] = parsed else: logger.warning( "CueVariantCache [Redis Corrupt]: Invalid JSON structure in Redis key '%s' for cue '%r'", _cache_key(s), s, ) else: logger.info( "CueVariantCache [Redis Miss]: Variants for '%r' not found in Redis cache under key '%s'", s, _cache_key(s), ) except Exception as e: logger.warning( "CueVariantCache [Redis Read Error]: Unexpected error reading cue '%r' key '%s': %s", s, _cache_key(s), e, ) try: # Do we already have texts (from v1 Redis hit) or need to generate? existing = self._mem.get(s) if existing and existing.get("texts") and not existing.get("vecs"): texts = existing["texts"] # Need to embed only vecs = await self._embed_texts(texts) if vecs: entry = { "original": s, "texts": texts, "vecs": vecs, "mean_vec": _mean_vec(vecs), } self._mem[s] = entry await self._write_redis(s, texts, vecs) else: # Full generation pipeline texts = await self._generate(s) if texts: vecs = await self._embed_texts(texts) entry = { "original": s, "texts": texts, "vecs": vecs if vecs else [], "mean_vec": _mean_vec(vecs) if vecs else [], } self._mem[s] = entry await self._write_redis(s, texts, vecs or []) except Exception as e: logger.warning("ensure_cached failed for %r: %s", s[:60], e) finally: await lock.release()
[docs] async def load_all_from_redis(self) -> None: """Warm the in-memory layer by scanning every cached entry in Redis. Bulk-restores ``_mem`` on startup by ``SCAN``-paging the ``ncm:cue_variant:`` keyspace, fetching each page's values concurrently behind an ``asyncio.Semaphore`` (limit 2) to avoid saturating the connection pool, and decoding them with ``_entry_from_redis``. Only v2 entries that carry both ``original`` and ``texts`` are reinstated — because the original string lives inside the value, every previously generated cue can be keyed back into ``_mem`` without re-querying the LLM. Read errors and decode failures are logged and skipped. Side effects: Redis scans/reads and mutation of ``_mem``; a no-op when ``self._redis`` is unset. Called once by ``LimbicCoordinator`` at startup (via ``asyncio.create_task``), by the cue pregeneration script, and in the telemetry/lock-migration tests. Returns: None """ if not self._redis: return try: pattern = f"{REDIS_KEY_PREFIX}*" cursor = 0 loaded = 0 while True: cursor, keys = await self._redis.scan(cursor, match=pattern, count=200) if not keys: if cursor == 0: break continue # Fetch all keys in the current page concurrently, throttled to 2 connections sem = asyncio.Semaphore(2) async def sem_get(k): """Fetch one Redis key under the shared concurrency semaphore. Closure helper used to read a single variant-cache key while holding ``sem``, capping the warm-up scan to 2 concurrent Redis ``GET`` calls per page so the bulk restore does not saturate the connection pool. Awaits ``self._redis.get(k)`` and is invoked only by the surrounding ``load_all_from_redis`` via ``asyncio.gather`` over the current page of keys. Args: k: The Redis key (``bytes`` or ``str``) to fetch. Returns: The raw Redis value for ``k`` (typically ``bytes`` JSON), or None when the key is absent. """ async with sem: return await self._redis.get(k) results = await asyncio.gather( *(sem_get(key) for key in keys), return_exceptions=True ) for key, raw in zip(keys, results): if isinstance(raw, Exception): logger.warning( "CueVariantCache [Redis Read Error]: Unexpected error reading variant key '%s': %s", key.decode() if isinstance(key, bytes) else str(key), raw, ) continue if not raw: continue try: parsed = _entry_from_redis(raw) if parsed and parsed.get("original") and parsed.get("texts"): self._mem[parsed["original"]] = parsed loaded += 1 logger.info( "CueVariantCache [Redis Load]: Restored cached cue variants for '%r' from key '%s'", parsed["original"], key.decode() if isinstance(key, bytes) else str(key), ) except Exception: pass if cursor == 0: break logger.info("Variant cache warm-up: loaded %d entries from Redis", loaded) except Exception as e: logger.debug("Variant cache warm-up scan failed: %s", e)
# ------------------------------------------------------------------ # Internal embedding helpers # ------------------------------------------------------------------ async def _embed_context(self, context_text: str) -> None: """Embed a context string and cache its query vector for later turns. The background completion of the cache miss in ``set_context``: it takes a short-lived ``DistributedLock`` (``sg:lock:embed:{sha256}``) so only one worker embeds a given context, calls ``_embed_texts``, and stores the result in the LRU ``_query_cache`` (evicting the oldest entry past ``_query_cache_max``). If no context has superseded it in the meantime, it also updates the live ``_current_query_vec`` so the current turn can start scoring as soon as the embedding lands. The lock is released and the pending marker cleared in a ``finally`` block. Side effects: embedding network call, a distributed lock, and mutation of ``_query_cache`` / ``_current_query_vec`` / ``_embed_pending``. Called only by ``set_context`` (as a detached task) and directly in the lock-migration tests. Args: context_text (str): The emotional-context string to embed. Returns: None """ key = hashlib.sha256(context_text.encode()).hexdigest() lock = DistributedLock(self._redis, f"sg:lock:embed:{key}", ttl=30) if not await lock.acquire(): return try: vecs = await self._embed_texts([context_text]) if vecs: # LRU eviction if len(self._query_cache) >= self._query_cache_max: self._query_cache.popitem(last=False) self._query_cache[context_text] = vecs[0] # If this is still the intended context, update live if self._current_query_vec is None: self._current_query_vec = vecs[0] logger.debug("Context embedding ready for: %.60s", context_text) except Exception as e: logger.debug("Context embed failed for %r: %s", context_text[:60], e) finally: await lock.release() self._embed_pending.discard(context_text)
[docs] async def drain(self) -> None: """Await all in-flight background embedding tasks, then clear the set. Provides a clean shutdown / barrier point: it gathers every task in ``_background_tasks`` (the detached ``_embed_context`` coroutines spawned by ``set_context``) with exceptions suppressed, then empties the set so no stragglers leak. Side effects: blocks on outstanding tasks and mutates ``_background_tasks``. Called by the cue pregeneration script (``scripts/pregenerate_ncm_cue_variants``) to make sure all embeddings finish before the process exits. Returns: None """ if self._background_tasks: await asyncio.gather(*self._background_tasks, return_exceptions=True) self._background_tasks.clear()
async def _embed_texts(self, texts: list[str]) -> list[list[float]]: """Batch-embed *texts* using ``google/gemini-embedding-001``. When an ``OpenRouterClient`` was provided at init time, delegates to ``client.embed_batch()`` for shared connection pooling, dual-provider failover, and the batch embedding API. Otherwise falls back to direct HTTP (original implementation). """ if not texts: return [] # ── Preferred path: shared OpenRouterClient ────────────── if self._openrouter is not None: try: raw = await self._openrouter.embed_batch( texts, EMBED_MODEL, ) return [_normalize(v) for v in raw if v] except Exception as e: logger.warning( "Shared client embed_batch failed, " "falling back to direct HTTP: %s", e, ) # Fall through to direct HTTP below # ── Fallback: Gemini API via shared key pool ───────────── try: raw = await embed_batch_via_gemini(texts, EMBED_MODEL) return [_normalize(v) for v in raw] except Exception as e: logger.debug("Embedding request failed: %s", e) return [] async def _write_redis( self, original: str, texts: list[str], vecs: list[list[float]] ) -> None: """Persist a v2 variant entry to Redis with a jittered 7-14 day TTL. Serializes the cue, its variant texts, and their embeddings via ``_entry_to_redis`` and writes the result with ``SET`` under ``_cache_key(original)`` with an ``ex=`` expiry from ``_random_ttl`` (so entries do not all expire together and trigger a regeneration herd). A no-op when ``self._redis`` is unset; write failures are logged and swallowed rather than raised. Side effects: a single Redis write. Called by ``ensure_cached`` after both the re-embed and the full-generation branches, and directly in the telemetry tests. Args: original (str): The original cue/reason string. texts (list[str]): The variant phrasings to store. vecs (list[list[float]]): The per-variant embeddings aligned with ``texts``. Returns: None """ if not self._redis: return try: ttl = _random_ttl() logger.info( "CueVariantCache [Redis Store Attempt]: Saving variants for cue '%r' to Redis key '%s' with TTL=%d", original, _cache_key(original), ttl, ) payload = _entry_to_redis(original, texts, vecs) await self._redis.set(_cache_key(original), payload, ex=ttl) logger.info( "CueVariantCache [Redis Store Success]: Successfully cached variants for cue '%r' in Redis key '%s'", original, _cache_key(original), ) except Exception as e: logger.warning( "CueVariantCache [Redis Store Error]: Failed to write cue '%r' to Redis key '%s': %s", original, _cache_key(original), e, ) # ------------------------------------------------------------------ # Internal LLM generation # ------------------------------------------------------------------ async def _generate(self, original: str) -> list[str]: """Generate rephrasing variants for a cue via OpenRouter, rotating models. Posts a chat completion (system prompt from ``_system_prompt``, the cue as the user message) directly to the OpenRouter chat endpoint over a fresh ``httpx.AsyncClient``, trying each model in ``self._variant_models`` in turn and returning the first batch that ``_parse_variants`` extracts successfully. A 429 sleeps briefly and advances to the next model; API-level errors and exceptions are recorded and skipped so one bad model cannot stall the rest. Side effects: outbound LLM HTTP calls and logging; no Redis or filesystem access, and the OpenRouter API key is read but never logged. Called by ``ensure_cached`` in this module. Args: original (str): The cue/reason string to rephrase. Returns: list[str]: The parsed variant phrasings from the first model that succeeds, or an empty list when every model fails. """ if not self._api_key: return [] headers = { "Authorization": f"Bearer {self._api_key}", "Content-Type": "application/json", "HTTP-Referer": "https://github.com/matrix-llm-bot", "X-Title": "Matrix LLM Bot", } user_msg = f'Original: "{original}"' last_error: str = "" async with httpx.AsyncClient( timeout=httpx.Timeout(30.0, connect=10.0) ) as client: for model in self._variant_models: payload = { "model": model, "messages": [ {"role": "system", "content": _system_prompt()}, {"role": "user", "content": user_msg}, ], "temperature": 0.9, "max_tokens": 1024, } try: resp = await client.post( OPENROUTER_CHAT_URL, json=payload, headers=headers ) if resp.status_code == 429: logger.debug( "Variant gen rate-limited on %s, waiting 5s then next", model, ) last_error = "rate limited" await asyncio.sleep(5) continue resp.raise_for_status() data = resp.json() if "error" in data: last_error = str(data["error"]) logger.debug("Variant gen error from %s: %s", model, last_error) continue text = ( data.get("choices", [{}])[0] .get("message", {}) .get("content", "") ) variants = _parse_variants(text) if variants: logger.info( "Generated %d variants for %r via %s", len(variants), original[:50], model, ) return variants logger.debug( "Empty variant parse from %s (raw: %.200s)", model, text ) last_error = "empty parse" except Exception as e: last_error = str(e) logger.debug("Variant gen exception on %s: %s", model, e) continue logger.warning( "All variant models failed for %r — last error: %s", original[:60], last_error, ) return []