Source code for ncm_semantic_triggers

"""NCM Semantic Trigger Matcher — cosine-similarity-based emotion detection.

Replaces the exact-word ``scan_text_for_triggers`` with embedding-based
semantic matching. Each emotion in ``reality_marble_recursion_index.yaml``
gets:

  1. An embedding of its *affect* description string.
  2. LLM-generated colloquial phrases a person might say when feeling
     that emotion, each also embedded.
  3. A ``mean_vec`` (L2-normalized centroid of all above embeddings) that
     serves as the emotion's semantic fingerprint for matching.

At runtime, the incoming text is embedded once and dot-producted against
every cached ``mean_vec``. Emotions above ``MATCH_THRESHOLD`` are returned
as ``(emotion_name, delta_vector)`` pairs, capped at ``TOP_N``.

Falls back to an empty list (caller should use ``scan_text_for_triggers``
from ``ncm_delta_parser``) when no embeddings are ready yet.

Redis key:  ncm:trigger_embed:{sha256_hex_of_emotion_name}
Redis value: JSON object — see below
"""

from __future__ import annotations

import asyncio
import hashlib
import json
import logging
import random
import re
from typing import Dict, List, Optional, Tuple


import httpx

from gemini_embed_pool import embed_batch_via_gemini
from openrouter_client import OpenRouterClient

logger = logging.getLogger(__name__)

OPENROUTER_CHAT_URL = "https://openrouter.ai/api/v1/chat/completions"
EMBED_MODEL = "google/gemini-embedding-001"
REDIS_KEY_PREFIX = "ncm:trigger_embed:v2:"
NUM_PHRASES = 50
PHRASES_PER_CALL = 10  # Generate in batches across different models
_TTL_MIN = 7 * 86400   # 7 days
_TTL_MAX = 14 * 86400  # 14 days

def _random_ttl() -> int:
    return random.randint(_TTL_MIN, _TTL_MAX)

# Similarity threshold — only fire emotions above this cosine similarity.
# 0.70 filters noise; exact-word fallback catches anything semantic misses.
MATCH_THRESHOLD = 0.55
# Maximum emotions to return per text scan
TOP_N = 6
# Semantic matches fire far more often than the exact-word system the YAML
# deltas were tuned for.  This scale factor compensates.
SEMANTIC_SCALE = 0.5
# Maximum absolute delta per node from trigger matching
_TRIGGER_NODE_CAP = 0.5

# Paid model repeated 5× so 5 calls × 10 phrases/call = 50 total phrases.
PHRASE_MODELS = ["google/gemini-3.1-flash-lite-preview"] * 5

def _phrase_system() -> str:
    """Internal helper: phrase system.

        Returns:
            str: Result string.
        """
    n = PHRASES_PER_CALL
    examples = ", ".join(f'"phrase{i}"' for i in range(1, n + 1))
    return (
        "You are a psychology writer. Given a named emotion and its character, "
        f"write exactly {n} short phrases (max 20 words each) that someone might "
        "naturally say or type when experiencing this emotion. Use casual, "
        "everyday language — not clinical terms. Include variety: questions, "
        "statements, exclamations, and stream-of-consciousness fragments. "
        f"Output ONLY a JSON array of {n} strings: "
        f"[{examples}]\n"
        "No other text, no markdown, no explanation."
    )


# ─────────────────────────────────────────────────────────────────────
# Vector helpers (identical to ncm_variant_cache — no numpy dep)
# ─────────────────────────────────────────────────────────────────────

def _dot(a: list, b: list) -> float:
    """Internal helper: dot.

        Args:
            a (list): The a value.
            b (list): The b value.

        Returns:
            float: The result.
        """
    return sum(x * y for x, y in zip(a, b))


def _normalize(vec: list) -> list:
    """Internal helper: normalize.

        Args:
            vec (list): The vec value.

        Returns:
            list: List of results.
        """
    norm = sum(x * x for x in vec) ** 0.5
    if norm < 1e-10:
        return vec
    return [x / norm for x in vec]


def _mean_vec(vecs: list) -> list:
    """Internal helper: mean vec.

        Args:
            vecs (list): The vecs value.

        Returns:
            list: List of results.
        """
    if not vecs:
        return []
    dim = len(vecs[0])
    centroid = [sum(v[i] for v in vecs) / len(vecs) for i in range(dim)]
    return _normalize(centroid)


def _cache_key(emotion_name: str) -> str:
    """Internal helper: cache key.

        Args:
            emotion_name (str): The emotion name value.

        Returns:
            str: Result string.
        """
    h = hashlib.sha256(emotion_name.encode("utf-8")).hexdigest()
    return f"{REDIS_KEY_PREFIX}{h}"


def _parse_phrases(text: str) -> list[str]:
    """Extract a list of strings from a possibly messy LLM response."""
    if not text:
        return []
    text = text.strip()
    if "```" in text:
        m = re.search(r"```(?:json)?\s*(.*?)```", text, re.DOTALL)
        if m:
            text = m.group(1).strip()
    start = text.find("[")
    end = text.rfind("]")
    if start == -1 or end <= start:
        return []
    try:
        arr = json.loads(text[start : end + 1])
        if isinstance(arr, list):
            return [s.strip() for s in arr if isinstance(s, str) and s.strip()][:PHRASES_PER_CALL]
    except (json.JSONDecodeError, ValueError):
        pass
    return []


[docs] class SemanticTriggerMatcher: """Cosine-similarity emotion trigger detector. Parameters ---------- redis_client: An ``redis.asyncio.Redis`` instance. May be None. api_key: OpenRouter API key. When None the matcher is a no-op and ``find_triggers`` always returns an empty list. """
[docs] def __init__( self, redis_client=None, api_key: Optional[str] = None, openrouter_client: OpenRouterClient | None = None, ) -> None: """Initialize the instance. Args: redis_client: Redis connection client. api_key (Optional[str]): The api key value. openrouter_client: Shared OpenRouterClient for connection pooling and batch embedding. Falls back to direct HTTP when None. """ self._redis = redis_client self._api_key = api_key self._openrouter = openrouter_client # emotion_name → {"name", "affect", "vec", "phrase_vecs", "mean_vec"} self._embeddings: Dict[str, dict] = {} self._pending: set[str] = set()
# ------------------------------------------------------------------ # Public hot-path # ------------------------------------------------------------------
[docs] async def find_triggers( self, text: str, threshold: float = MATCH_THRESHOLD, top_n: int = TOP_N, ) -> List[Tuple[str, Dict[str, float]]]: """Embed *text* and return top-N emotions above *threshold*. Returns list of (emotion_name, delta_vector) tuples, sorted by descending similarity. Returns an empty list if no embeddings are ready (caller should fall back to exact matching). """ if not text or not self._api_key or not self._embeddings: return [] try: vecs = await self._embed_texts([text]) if not vecs: return [] query_vec = vecs[0] scored: list[tuple[float, str]] = [] for name, entry in self._embeddings.items(): mean = entry.get("mean_vec", []) if not mean: continue score = _dot(query_vec, mean) if score >= threshold: scored.append((score, name)) if not scored: return [] scored.sort(reverse=True) results: List[Tuple[str, Dict[str, float]]] = [] from ncm_delta_parser import get_emotion_delta for score, name in scored[:top_n]: delta = get_emotion_delta(name) if delta: # Scale by cosine similarity AND SEMANTIC_SCALE. # Cos-sim handles discrimination (strong > weak match). # SEMANTIC_SCALE compensates for semantic matching # firing more often than the exact-word system the # YAML deltas were authored for. scaled = { k: max(-_TRIGGER_NODE_CAP, min(_TRIGGER_NODE_CAP, v * score * SEMANTIC_SCALE)) for k, v in delta.items() } results.append((name, scaled)) return results except Exception as e: logger.debug("SemanticTriggerMatcher.find_triggers error: %s", e) return []
# ------------------------------------------------------------------ # Startup warm-up # ------------------------------------------------------------------
[docs] async def ensure_all_cached(self) -> None: """Build the full emotion embedding index. For each emotion in the recursion index: 1. Load from Redis if already cached. 2. Otherwise embed the affect string + generate/embed 5 variant trigger phrases, then store in Redis. Designed to run once at startup as a background task. Safe to call multiple times — already-cached emotions are skipped. """ if not self._api_key: return try: from ncm_delta_parser import get_all_emotions all_emotions = get_all_emotions() except Exception as e: logger.warning("SemanticTriggerMatcher: failed to load emotions: %s", e) return # Phase 1: load any already-cached entries from Redis if self._redis: await self._load_all_from_redis(set(all_emotions.keys())) # Phase 2: collect what still needs embedding missing = [ (name, entry) for name, entry in all_emotions.items() if name not in self._embeddings and name not in self._pending ] if not missing: logger.info( "SemanticTriggerMatcher: all %d emotions already cached", len(self._embeddings), ) return logger.info( "SemanticTriggerMatcher: embedding %d emotions", len(missing) ) # Phase 3: batch-embed all affect strings in one shot names = [n for n, _ in missing] affect_texts = [e.get("affect", n) for n, e in missing] for name in names: self._pending.add(name) try: affect_vecs = await self._embed_texts(affect_texts) except Exception as e: logger.warning("SemanticTriggerMatcher: batch embed failed: %s", e) for name in names: self._pending.discard(name) return # Phase 4: for each emotion, optionally generate variant phrases then store. # Semaphore limits concurrent _build_and_store tasks to avoid a # storm of parallel embedding HTTP requests at startup. sem = asyncio.Semaphore(5) async def _build_limited( name: str, affect_str: str, affect_vec: list, ) -> None: async with sem: await self._build_and_store(name, affect_str, affect_vec) for i, (name, emo_entry) in enumerate(missing): affect_vec = affect_vecs[i] if i < len(affect_vecs) else [] affect_str = emo_entry.get("affect", name) asyncio.create_task( _build_limited(name, affect_str, affect_vec) )
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ async def _load_all_from_redis(self, emotion_names: set[str]) -> None: """Load all cached trigger embeddings from Redis into _embeddings.""" if not self._redis: return loaded = 0 for name in emotion_names: try: raw = await self._redis.get(_cache_key(name)) if not raw: continue data = json.loads(raw) if isinstance(data, dict) and data.get("mean_vec"): self._embeddings[name] = data loaded += 1 except Exception: pass if loaded: logger.info( "SemanticTriggerMatcher: loaded %d emotion embeddings from Redis", loaded, ) async def _build_and_store( self, name: str, affect: str, affect_vec: list, ) -> None: """Generate variant phrases, embed them, compute mean_vec, store.""" try: # Generate colloquial trigger phrases via LLM phrases = await self._generate_phrases(name, affect) all_vecs = [affect_vec] if affect_vec else [] phrase_vecs: list[list] = [] if phrases: try: p_vecs = await self._embed_texts(phrases) phrase_vecs = p_vecs all_vecs.extend(p_vecs) except Exception as e: logger.debug( "Phrase embed failed for %s: %s", name, e ) mean = _mean_vec(all_vecs) if all_vecs else affect_vec entry = { "name": name, "affect": affect, "vec": affect_vec, "phrase_vecs": phrase_vecs, "mean_vec": mean, } self._embeddings[name] = entry if self._redis: try: await self._redis.set( _cache_key(name), json.dumps(entry), ex=_random_ttl(), ) except Exception as e: logger.debug("Redis write failed for %s: %s", name, e) logger.debug( "SemanticTriggerMatcher: cached %s (%d phrase vecs)", name, len(phrase_vecs), ) except Exception as e: logger.warning( "SemanticTriggerMatcher: _build_and_store failed for %s: %s", name, e, ) finally: self._pending.discard(name) async def _generate_phrases(self, name: str, affect: str) -> list[str]: """Generate colloquial phrases for this emotion via multiple LLM calls. Makes up to 5 calls (one per model) to collect ~50 diverse phrases. """ if not self._api_key: return [] headers = { "Authorization": f"Bearer {self._api_key}", "Content-Type": "application/json", "HTTP-Referer": "https://github.com/matrix-llm-bot", "X-Title": "Matrix LLM Bot", } user_msg = f'Emotion: "{name}"\nCharacter: "{affect}"' all_phrases: list[str] = [] models = list(PHRASE_MODELS) random.shuffle(models) async with httpx.AsyncClient( timeout=httpx.Timeout(30.0, connect=10.0) ) as client: for model in models: payload = { "model": model, "messages": [ {"role": "system", "content": _phrase_system()}, {"role": "user", "content": user_msg}, ], "temperature": 0.9, "max_tokens": 10240, } try: resp = await client.post( OPENROUTER_CHAT_URL, json=payload, headers=headers ) if resp.status_code == 429: await asyncio.sleep(5) continue resp.raise_for_status() data = resp.json() if "error" in data: continue text = ( data.get("choices", [{}])[0] .get("message", {}) .get("content", "") ) phrases = _parse_phrases(text) if phrases: all_phrases.extend(phrases) except Exception: continue # Stop early if we have enough if len(all_phrases) >= NUM_PHRASES: break if not all_phrases: logger.debug("All phrase models failed for %s", name) # Deduplicate while preserving order, cap at NUM_PHRASES seen = set() unique: list[str] = [] for p in all_phrases: p_lower = p.lower() if p_lower not in seen: seen.add(p_lower) unique.append(p) return unique[:NUM_PHRASES] async def _embed_texts(self, texts: list[str]) -> list[list]: """Batch-embed texts using google/gemini-embedding-001. When an ``OpenRouterClient`` was provided at init time, delegates to ``client.embed_batch()`` for shared connection pooling, dual-provider failover, and the batch embedding API. Otherwise falls back to direct HTTP (original implementation). """ if not texts: return [] # ── Preferred path: shared OpenRouterClient ────────────── if self._openrouter is not None: try: raw = await self._openrouter.embed_batch( texts, EMBED_MODEL, ) return [ _normalize(v) if v else [] for v in raw ] except Exception as e: logger.warning( "Shared client embed_batch failed, " "falling back to direct HTTP: %s", e, ) # Fall through to direct HTTP below # ── Fallback: Gemini API via shared key pool ───────────── try: raw = await embed_batch_via_gemini(texts, EMBED_MODEL) return [_normalize(v) if v else [] for v in raw] except Exception as e: logger.debug("Gemini embed fallback failed: %s", e) return []