Source code for ncm_semantic_triggers

"""NCM Semantic Trigger Matcher — cosine-similarity-based emotion detection.

Replaces the exact-word ``scan_text_for_triggers`` with embedding-based
semantic matching. Each emotion in ``reality_marble_recursion_index.yaml``
gets:

  1. An embedding of its *affect* description string.
  2. LLM-generated colloquial phrases a person might say when feeling
     that emotion, each also embedded.
  3. A ``mean_vec`` (L2-normalized centroid of all above embeddings) that
     serves as the emotion's semantic fingerprint for matching.

At runtime, the incoming text is embedded once and dot-producted against
every cached ``mean_vec``. Emotions above ``MATCH_THRESHOLD`` are returned
as ``(emotion_name, delta_vector)`` pairs, capped at ``TOP_N``.

Falls back to an empty list (caller should use ``scan_text_for_triggers``
from ``ncm_delta_parser``) when no embeddings are ready yet.

Redis key:  ncm:trigger_embed:v2:{sha256_hex_of_emotion_name}
Redis value: JSON object — see below
"""

from __future__ import annotations

import asyncio
import hashlib
import jsonutil as json
import logging
import random
import re
from typing import Dict, List, Optional, Tuple


import httpx

from gemini_embed_pool import embed_batch_via_gemini
from openrouter_client import OpenRouterClient
from vector_store import AsyncPgVectorCollection

logger = logging.getLogger(__name__)

import os
DISABLE_LLM_PHRASES = os.environ.get("SG_DISABLE_STARTUP_LLM_WARMING", "").lower() in ("1", "true", "yes", "on")

OPENROUTER_CHAT_URL = "https://openrouter.ai/api/v1/chat/completions"
EMBED_MODEL = "google/gemini-embedding-001"
REDIS_KEY_PREFIX = "ncm:trigger_embed:v2:"
NUM_PHRASES = 50
PHRASES_PER_CALL = 10  # Generate in batches across different models
_TTL_MIN = 7 * 86400  # 7 days
_TTL_MAX = 14 * 86400  # 14 days


def _random_ttl() -> int:
    """Pick a randomized cache TTL in seconds, jittered over a 7–14 day window.

    Returns a uniformly random integer between ``_TTL_MIN`` (7 days) and
    ``_TTL_MAX`` (14 days) so that cached trigger-embedding keys expire at
    staggered times rather than all at once, avoiding a synchronized
    re-warming storm. Called only by ``SemanticTriggerMatcher._build_and_store``
    in this module to set the ``ex=`` expiry on the Redis ``set`` of an
    emotion entry (the identically named helpers in ``ncm_variant_cache`` and
    ``scripts/pregenerate_ncm_triggers`` are separate definitions).

    Returns:
        int: A random TTL in seconds, ``_TTL_MIN <= n <= _TTL_MAX``.
    """
    return random.randint(_TTL_MIN, _TTL_MAX)


# Similarity threshold — only fire emotions above this cosine similarity.
# 0.70 filters noise; exact-word fallback catches anything semantic misses.
MATCH_THRESHOLD = 0.55
# Maximum emotions to return per text scan
TOP_N = 6
# Semantic matches fire far more often than the exact-word system the YAML
# deltas were tuned for.  This scale factor compensates.
SEMANTIC_SCALE = 0.5
# Maximum absolute delta per node from trigger matching
_TRIGGER_NODE_CAP = 0.5

# Optimize to make exactly 1 LLM completions call per emotion to avoid warming storms.
PHRASE_MODELS = ["google/gemini-3.1-flash-lite"]
NUM_PHRASES = 10
PHRASES_PER_CALL = 10


def _phrase_system() -> str:
    """Internal helper: phrase system.

    Returns:
        str: Result string.
    """
    n = PHRASES_PER_CALL
    examples = ", ".join(f'"phrase{i}"' for i in range(1, n + 1))
    return (
        "You are a psychology writer. Given a named emotion and its character, "
        f"write exactly {n} short phrases (max 20 words each) that someone might "
        "naturally say or type when experiencing this emotion. Use casual, "
        "everyday language — not clinical terms. Include variety: questions, "
        "statements, exclamations, and stream-of-consciousness fragments. "
        f"Output ONLY a JSON array of {n} strings: "
        f"[{examples}]\n"
        "No other text, no markdown, no explanation."
    )


# ─────────────────────────────────────────────────────────────────────
# Vector helpers (identical to ncm_variant_cache — no numpy dep)
# ─────────────────────────────────────────────────────────────────────


def _dot(a: list, b: list) -> float:
    """Internal helper: dot.

    Args:
        a (list): The a value.
        b (list): The b value.

    Returns:
        float: The result.
    """
    return sum(x * y for x, y in zip(a, b))


def _normalize(vec: list) -> list:
    """Internal helper: normalize.

    Args:
        vec (list): The vec value.

    Returns:
        list: List of results.
    """
    norm = sum(x * x for x in vec) ** 0.5
    if norm < 1e-10:
        return vec
    return [x / norm for x in vec]


def _mean_vec(vecs: list) -> list:
    """Internal helper: mean vec.

    Args:
        vecs (list): The vecs value.

    Returns:
        list: List of results.
    """
    if not vecs:
        return []
    dim = len(vecs[0])
    centroid = [sum(v[i] for v in vecs) / len(vecs) for i in range(dim)]
    return _normalize(centroid)


def _cache_key(emotion_name: str) -> str:
    """Internal helper: cache key.

    Args:
        emotion_name (str): The emotion name value.

    Returns:
        str: Result string.
    """
    h = hashlib.sha256(emotion_name.encode("utf-8")).hexdigest()
    return f"{REDIS_KEY_PREFIX}{h}"


def _parse_phrases(text: str) -> list[str]:
    """Extract a list of phrase strings from a possibly messy LLM response.

    Tolerantly recovers the JSON array of colloquial trigger phrases that
    ``_phrase_system`` asks the model to emit, even when the completion is
    wrapped in a Markdown code fence or padded with stray prose. It strips
    any fenced block, slices from the first ``[`` to the last ``]``, parses
    that span with the ``jsonutil`` loader, keeps only non-empty strings, and
    caps the result at ``PHRASES_PER_CALL``. Pure string processing with no
    I/O or side effects. Called by ``SemanticTriggerMatcher._generate_phrases``
    in this module (and by the standalone ``scripts/pregenerate_ncm_triggers``
    pregeneration helper) to clean each per-model phrase batch.

    Args:
        text (str): The raw text content of an LLM completion.

    Returns:
        list[str]: The parsed phrases (trimmed, non-empty), at most
        ``PHRASES_PER_CALL`` long; an empty list when nothing parses.
    """
    if not text:
        return []
    text = text.strip()
    if "```" in text:
        m = re.search(r"```(?:json)?\s*(.*?)```", text, re.DOTALL)
        if m:
            text = m.group(1).strip()
    start = text.find("[")
    end = text.rfind("]")
    if start == -1 or end <= start:
        return []
    try:
        arr = json.loads(text[start : end + 1])
        if isinstance(arr, list):
            return [s.strip() for s in arr if isinstance(s, str) and s.strip()][
                :PHRASES_PER_CALL
            ]
    except (json.JSONDecodeError, ValueError):
        pass
    return []


[docs] class SemanticTriggerMatcher: """Cosine-similarity emotion trigger detector. Parameters ---------- redis_client: An ``redis.asyncio.Redis`` instance. May be None. api_key: OpenRouter API key. When None the matcher is a no-op and ``find_triggers`` always returns an empty list. """
[docs] def __init__( self, redis_client=None, api_key: Optional[str] = None, openrouter_client: OpenRouterClient | None = None, ) -> None: """Initialize the instance. Args: redis_client: Redis connection client. api_key (Optional[str]): The api key value. openrouter_client: Shared OpenRouterClient for connection pooling and batch embedding. Falls back to direct HTTP when None. """ self._redis = redis_client self._api_key = api_key self._openrouter = openrouter_client self._collection = AsyncPgVectorCollection("ncm_system", "emotion_triggers") # emotion_name → {"name", "affect", "vec", "phrase_vecs", "mean_vec"} self._embeddings: Dict[str, dict] = {} self._pending: set[str] = set() self._warming_task: Optional[asyncio.Task] = None self._warming_lock = asyncio.Lock() self._redis_max_retries = 30 self._redis_retry_interval = 2.0
# ------------------------------------------------------------------ # Public hot-path # ------------------------------------------------------------------
[docs] async def find_triggers( self, text: str, threshold: float = MATCH_THRESHOLD, top_n: int = TOP_N, query_embedding: list[float] | None = None, ) -> List[Tuple[str, Dict[str, float]]]: """Embed ``text`` and return the top-N emotions above ``threshold``. This is the hot-path semantic replacement for exact-word trigger scanning: it embeds the incoming text once, dot-products that query vector against every cached emotion ``mean_vec`` fingerprint, keeps matches at or above ``threshold``, and returns the strongest few as scaled limbic delta vectors so the cascade can nudge the right nodes. Embedding goes through ``self._embed_texts`` (the shared ``OpenRouterClient.embed_batch`` when present, else the Gemini key pool), so this issues network I/O but touches no Redis or filesystem. Per-emotion delta templates are pulled lazily via ``ncm_delta_parser.get_emotion_delta`` and scaled by the cosine score, ``SEMANTIC_SCALE``, and clamped to ``_TRIGGER_NODE_CAP`` per node. It is a no-op returning an empty list when there is no text, no API key, or no warmed embeddings yet, signalling the caller to fall back to exact matching. Called by ``LimbicCoordinator`` in ``limbic_system/coordinator.py`` as ``self._trigger_matcher.find_triggers(text)``. Args: text (str): The user/turn text to scan for emotional triggers. threshold (float): Minimum cosine similarity for an emotion to fire. Defaults to ``MATCH_THRESHOLD``. top_n (int): Maximum number of emotions to return. Defaults to ``TOP_N``. query_embedding (list[float], optional): Precomputed embedding for the query text. If provided, embedding generation is bypassed. Returns: List[Tuple[str, Dict[str, float]]]: ``(emotion_name, delta_vector)`` pairs sorted by descending similarity, where each delta maps node names to clamped, scaled adjustments. Empty when nothing matches or no embeddings are ready. """ if not self._api_key: logger.info("SemanticTriggerMatcher.find_triggers: no API key configured. Bypassing semantic trigger match.") return [] if not query_embedding: if not text: logger.debug("SemanticTriggerMatcher.find_triggers: empty text and no query_embedding. Bypassing.") return [] try: logger.debug("SemanticTriggerMatcher.find_triggers: generating query embedding for text: '%s...'", text[:60]) vecs = await self._embed_texts([text]) if not vecs or not vecs[0]: logger.warning("SemanticTriggerMatcher.find_triggers: embedding generation returned empty vector.") return [] query_embedding = vecs[0] except Exception as e: logger.error("SemanticTriggerMatcher.find_triggers: embedding generation failed: %s", e) return [] else: logger.debug("SemanticTriggerMatcher.find_triggers: reusing precomputed query_embedding.") try: # Primary: query pgvector logger.debug("SemanticTriggerMatcher.find_triggers: querying pgvector store with embedding (dim=%d)", len(query_embedding)) try: hits = await self._collection.query( embedding=query_embedding, n_results=top_n, ) logger.debug("SemanticTriggerMatcher.find_triggers: pgvector query returned %d hits.", len(hits)) except Exception as e: logger.error("SemanticTriggerMatcher.find_triggers: pgvector query failed: %s. Falling back to in-memory.", e) hits = [] # Fallback to in-memory dictionary if populated if self._embeddings: logger.info("SemanticTriggerMatcher.find_triggers: evaluating %d in-memory emotions.", len(self._embeddings)) scored: list[dict] = [] for name, entry in self._embeddings.items(): mean = entry.get("mean_vec", []) if not mean: continue score = _dot(query_embedding, mean) scored.append({ "id": name, "similarity": score, "document": entry.get("affect", name), }) # Sort descending by similarity scored.sort(key=lambda h: h["similarity"], reverse=True) hits = scored[:top_n] results: List[Tuple[str, Dict[str, float]]] = [] from ncm_delta_parser import get_emotion_delta for hit in hits: name = hit["id"] score = hit["similarity"] if score is None or score < threshold: logger.debug("SemanticTriggerMatcher.find_triggers: hit '%s' score %.4f is below threshold %.4f", name, score or 0.0, threshold) continue delta = get_emotion_delta(name) if delta: # Scale by cosine similarity AND SEMANTIC_SCALE. # Cos-sim handles discrimination (strong > weak match). # SEMANTIC_SCALE compensates for semantic matching # firing more often than the exact-word system the # YAML deltas were authored for. scaled = { k: max( -_TRIGGER_NODE_CAP, min(_TRIGGER_NODE_CAP, v * score * SEMANTIC_SCALE), ) for k, v in delta.items() } logger.info("SemanticTriggerMatcher.find_triggers: MATCHED emotion '%s' similarity %.4f, delta vector: %s", name, score, scaled) results.append((name, scaled)) return results except Exception as e: logger.debug("SemanticTriggerMatcher.find_triggers error: %s", e) return []
# ------------------------------------------------------------------ # Startup warm-up # ------------------------------------------------------------------
[docs] async def ensure_all_cached(self) -> None: """Build the full emotion embedding index. For each emotion in the recursion index: 1. Load from Redis if already cached. 2. Otherwise embed the affect string + generate/embed up to ``NUM_PHRASES`` variant trigger phrases, then store in Redis. Designed to run once at startup as a background task. Safe to call multiple times — already-cached emotions are skipped. """ if not self._api_key: return if self._warming_task is not None: logger.info("SemanticTriggerMatcher: trigger cache warming already in progress; awaiting existing warming task") try: await self._warming_task except Exception as e: logger.warning("SemanticTriggerMatcher: existing warming task failed: %s", e) return async with self._warming_lock: # Double check within the lock if self._warming_task is None: self._warming_task = asyncio.create_task(self._ensure_all_cached_internal()) try: await self._warming_task except Exception as e: logger.warning("SemanticTriggerMatcher: warming failed: %s", e)
async def _ensure_all_cached_internal(self) -> None: """Run the actual emotion-embedding warm-up, in four phases. This is the real work behind ``ensure_all_cached``: it loads the full emotion set from ``ncm_delta_parser.get_all_emotions``, ensures the pgvector table exists, queries pgvector to see which emotions are missing, pulls missing ones from Redis if cached there, saves them in pgvector, and generates/embeds/saves any remaining ones to both Redis and Postgres. """ try: from ncm_delta_parser import get_all_emotions all_emotions = get_all_emotions() except Exception as e: logger.warning("SemanticTriggerMatcher: failed to load emotions: %s", e) return try: await self._collection.ensure() except Exception as e: logger.error("SemanticTriggerMatcher: failed to ensure pgvector table: %s", e) pg_ids = set() try: pg_res = await self._collection.get(limit=1000) pg_ids = set(pg_res.get("ids", [])) except Exception as e: logger.warning("SemanticTriggerMatcher: failed to fetch from pgvector: %s", e) missing_in_pg = [name for name in all_emotions if name not in pg_ids] if not missing_in_pg: logger.info("SemanticTriggerMatcher: all %d emotions already present in pgvector", len(all_emotions)) return logger.info( "SemanticTriggerMatcher: found %d emotions missing from pgvector: %s", len(missing_in_pg), missing_in_pg, ) redis_ready = False if not self._redis: logger.warning( "SemanticTriggerMatcher [Redis Offline/Absent]: Redis client is not available. " "Trigger warming will bypass cache." ) else: for attempt in range(1, self._redis_max_retries + 1): try: await self._redis.ping() redis_ready = True logger.info("SemanticTriggerMatcher [Redis Online]: Successfully connected to Redis.") break except Exception as e: logger.warning( "SemanticTriggerMatcher [Redis Connection Attempt %d/%d Failed]: Failed to ping Redis: %s. " "Retrying in %s seconds...", attempt, self._redis_max_retries, e, self._redis_retry_interval, ) await asyncio.sleep(self._redis_retry_interval) if not redis_ready: logger.error( "SemanticTriggerMatcher [Redis Connection Timeout]: Failed to connect to Redis after %d attempts.", self._redis_max_retries, ) if redis_ready: await self._load_all_from_redis(set(missing_in_pg)) migrated_names = [] migrated_docs = [] migrated_metas = [] migrated_vecs = [] still_missing = [] for name in missing_in_pg: if name in self._embeddings: entry = self._embeddings[name] mean_vec = entry.get("mean_vec", []) if mean_vec: migrated_names.append(name) migrated_docs.append(entry.get("affect", name)) migrated_metas.append({"phrase_vecs": entry.get("phrase_vecs", [])}) migrated_vecs.append(mean_vec) elif name not in self._pending: still_missing.append((name, all_emotions[name])) if migrated_names: try: await self._collection.upsert( ids=migrated_names, documents=migrated_docs, metadatas=migrated_metas, embeddings=migrated_vecs ) logger.info( "SemanticTriggerMatcher: successfully migrated %d emotions from Redis to pgvector: %s", len(migrated_names), migrated_names ) except Exception as e: logger.error("SemanticTriggerMatcher: failed to migrate emotions to pgvector: %s", e) if not still_missing: logger.info("SemanticTriggerMatcher: all missing emotions successfully resolved/migrated.") return logger.info( "SemanticTriggerMatcher: %d emotions need LLM phrase generation and embedding: %s", len(still_missing), [name for name, _ in still_missing], ) names = [n for n, _ in still_missing] affect_texts = [e.get("affect", n) for n, e in still_missing] for name in names: self._pending.add(name) try: affect_vecs = await self._embed_texts(affect_texts) except Exception as e: logger.warning("SemanticTriggerMatcher: batch embed failed: %s", e) for name in names: self._pending.discard(name) return sem = asyncio.Semaphore(2) async def _build_limited( name: str, affect_str: str, affect_vec: list, ) -> None: async with sem: await self._build_and_store(name, affect_str, affect_vec) for i, (name, emo_entry) in enumerate(still_missing): affect_vec = affect_vecs[i] if i < len(affect_vecs) else [] affect_str = emo_entry.get("affect", name) asyncio.create_task(_build_limited(name, affect_str, affect_vec)) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ async def _load_all_from_redis(self, emotion_names: set[str]) -> None: """Bulk-load cached trigger embeddings from Redis into memory. Concurrently reads the ``ncm:trigger_embed:v2:`` key (hashed from each emotion name by ``_cache_key``) for every requested emotion, throttled to two simultaneous ``GET`` calls by an ``asyncio.Semaphore`` so the warm-up scan does not saturate the connection pool. Each value is JSON-decoded with the ``jsonutil`` loader and, when it is a dict, merged into ``self._embeddings`` keyed by emotion name; misses, read errors, and corrupt JSON are logged and skipped rather than raised. Side effects: issues Redis reads and mutates ``self._embeddings``; does nothing when ``self._redis`` is unset. Called by ``_ensure_all_cached_internal`` in this module, by ``scripts/pregenerate_trigger_embeddings``, and directly by the warm-up and telemetry test suites. Args: emotion_names (set[str]): The emotion names to attempt to load from the cache. Returns: None """ if not self._redis: return # Concurrently fetch all emotion keys from Redis, throttled to 2 connections names_list = list(emotion_names) sem = asyncio.Semaphore(2) async def sem_get(name): """Fetch one emotion's cached entry from Redis under a connection cap. Acquires the enclosing ``sem`` semaphore (limit 2) before issuing a ``self._redis.get`` on ``_cache_key(name)``, throttling concurrent Redis reads to at most two when the surrounding ``_load_all_from_redis`` gathers a fetch task per emotion. Closes over ``sem`` and ``self``. Args: name (str): The emotion name whose cached embedding entry to read; hashed into the Redis key by ``_cache_key``. Returns: The raw Redis value for the key (JSON string or ``None`` if the key is absent). """ async with sem: return await self._redis.get(_cache_key(name)) tasks = [sem_get(name) for name in names_list] results = await asyncio.gather(*tasks, return_exceptions=True) loaded = 0 for name, raw in zip(names_list, results): if isinstance(raw, Exception): logger.warning( "SemanticTriggerMatcher [Redis Read Error]: Unexpected error reading emotion '%s' key '%s': %s", name, _cache_key(name), raw, ) continue if not raw: logger.info( "SemanticTriggerMatcher [Redis Miss]: Emotion '%s' not found in Redis cache under key '%s'", name, _cache_key(name), ) continue try: data = json.loads(raw) except (json.JSONDecodeError, ValueError): logger.warning( "SemanticTriggerMatcher [Redis Corrupt]: Invalid JSON structure in Redis key '%s' for emotion '%s'", _cache_key(name), name, ) continue if isinstance(data, dict): self._embeddings[name] = data loaded += 1 logger.info( "SemanticTriggerMatcher [Redis Hit]: Loaded cached emotion '%s' from Redis key '%s'", name, _cache_key(name), ) else: logger.warning( "SemanticTriggerMatcher [Redis Corrupt]: Invalid JSON structure in Redis key '%s' for emotion '%s'", _cache_key(name), name, ) if loaded: logger.info( "SemanticTriggerMatcher: loaded %d emotion embeddings from Redis", loaded, ) async def _build_and_store( self, name: str, affect: str, affect_vec: list, ) -> None: """Build one emotion's semantic fingerprint and persist it. Completes the per-emotion warm-up pipeline: unless startup LLM warming is disabled, it asks ``_generate_phrases`` for colloquial trigger phrases, embeds them via ``_embed_texts``, combines those phrase vectors with the precomputed ``affect_vec`` into an L2-normalized centroid through ``_mean_vec``, and assembles the cache entry holding the name, affect text, affect vector, phrase vectors, and ``mean_vec`` fingerprint used by ``find_triggers``. The entry is stored in ``self._embeddings`` and, when Redis is available, written to the ``ncm:trigger_embed:v2:`` key (from ``_cache_key``) with a jittered TTL from ``_random_ttl``. It always discards the name from ``self._pending`` in a ``finally`` block, and swallows per-emotion failures so one bad emotion cannot abort the whole warm-up. Side effects: LLM and embedding network calls plus a Redis write; no filesystem access. Called by ``_build_limited`` during ``_ensure_all_cached_internal`` fan-out, by ``scripts/pregenerate_trigger_embeddings``, and directly in the telemetry tests. Args: name (str): The emotion name (key into the recursion index). affect (str): The emotion's affect description string. affect_vec (list): The precomputed embedding of ``affect``, or an empty list when affect embedding was unavailable. Returns: None """ try: phrases = [] if not DISABLE_LLM_PHRASES: # Generate colloquial trigger phrases via LLM phrases = await self._generate_phrases(name, affect) all_vecs = [affect_vec] if affect_vec else [] phrase_vecs: list[list] = [] if phrases: try: p_vecs = await self._embed_texts(phrases) phrase_vecs = p_vecs all_vecs.extend(p_vecs) except Exception as e: logger.debug("Phrase embed failed for %s: %s", name, e) mean = _mean_vec(all_vecs) if all_vecs else affect_vec entry = { "name": name, "affect": affect, "vec": affect_vec, "phrase_vecs": phrase_vecs, "mean_vec": mean, } self._embeddings[name] = entry try: await self._collection.upsert( ids=[name], documents=[affect], metadatas=[{"phrase_vecs": phrase_vecs}], embeddings=[mean] ) logger.info("SemanticTriggerMatcher: successfully saved emotion '%s' to pgvector", name) except Exception as e: logger.error("SemanticTriggerMatcher: failed to save emotion '%s' to pgvector: %s", name, e) if self._redis: try: ttl = _random_ttl() logger.info( "SemanticTriggerMatcher [Redis Store Attempt]: Saving emotion '%s' to Redis key '%s' with TTL=%d", name, _cache_key(name), ttl, ) await self._redis.set( _cache_key(name), json.dumps(entry), ex=ttl, ) logger.info( "SemanticTriggerMatcher [Redis Store Success]: Successfully cached emotion '%s' in Redis key '%s'", name, _cache_key(name), ) except Exception as e: logger.warning( "SemanticTriggerMatcher [Redis Store Error]: Failed to write emotion '%s' to Redis key '%s': %s", name, _cache_key(name), e, ) logger.debug( "SemanticTriggerMatcher: cached %s (%d phrase vecs)", name, len(phrase_vecs), ) except Exception as e: logger.warning( "SemanticTriggerMatcher: _build_and_store failed for %s: %s", name, e, ) finally: self._pending.discard(name) async def _generate_phrases(self, name: str, affect: str) -> list[str]: """Generate colloquial phrases for this emotion via LLM calls. Iterates ``PHRASE_MODELS`` (one call per model), stopping early once at least ``NUM_PHRASES`` phrases are collected, then deduplicates and caps the result at ``NUM_PHRASES``. """ if not self._api_key: return [] headers = { "Authorization": f"Bearer {self._api_key}", "Content-Type": "application/json", "HTTP-Referer": "https://github.com/matrix-llm-bot", "X-Title": "Matrix LLM Bot", } user_msg = f'Emotion: "{name}"\nCharacter: "{affect}"' all_phrases: list[str] = [] models = list(PHRASE_MODELS) random.shuffle(models) async with httpx.AsyncClient( timeout=httpx.Timeout(30.0, connect=10.0) ) as client: for model in models: payload = { "model": model, "messages": [ {"role": "system", "content": _phrase_system()}, {"role": "user", "content": user_msg}, ], "temperature": 0.9, "max_tokens": 10240, } try: resp = await client.post( OPENROUTER_CHAT_URL, json=payload, headers=headers ) if resp.status_code == 429: await asyncio.sleep(5) continue resp.raise_for_status() data = resp.json() if "error" in data: continue text = ( data.get("choices", [{}])[0] .get("message", {}) .get("content", "") ) phrases = _parse_phrases(text) if phrases: all_phrases.extend(phrases) except Exception: continue # Stop early if we have enough if len(all_phrases) >= NUM_PHRASES: break if not all_phrases: logger.debug("All phrase models failed for %s", name) # Deduplicate while preserving order, cap at NUM_PHRASES seen = set() unique: list[str] = [] for p in all_phrases: p_lower = p.lower() if p_lower not in seen: seen.add(p_lower) unique.append(p) return unique[:NUM_PHRASES] async def _embed_texts(self, texts: list[str]) -> list[list]: """Batch-embed texts using google/gemini-embedding-001. When an ``OpenRouterClient`` was provided at init time, delegates to ``client.embed_batch()`` for shared connection pooling, dual-provider failover, and the batch embedding API. Otherwise falls back to direct HTTP (original implementation). """ if not texts: return [] # ── Preferred path: shared OpenRouterClient ────────────── if self._openrouter is not None: try: raw = await self._openrouter.embed_batch( texts, EMBED_MODEL, ) return [_normalize(v) if v else [] for v in raw] except Exception as e: logger.warning( "Shared client embed_batch failed, " "falling back to direct HTTP: %s", e, ) # Fall through to direct HTTP below # ── Fallback: Gemini API via shared key pool ───────────── try: raw = await embed_batch_via_gemini(texts, EMBED_MODEL) return [_normalize(v) if v else [] for v in raw] except Exception as e: logger.debug("Gemini embed fallback failed: %s", e) return []