"""NCM Semantic Trigger Matcher — cosine-similarity-based emotion detection.
Replaces the exact-word ``scan_text_for_triggers`` with embedding-based
semantic matching. Each emotion in ``reality_marble_recursion_index.yaml``
gets:
1. An embedding of its *affect* description string.
2. LLM-generated colloquial phrases a person might say when feeling
that emotion, each also embedded.
3. A ``mean_vec`` (L2-normalized centroid of all above embeddings) that
serves as the emotion's semantic fingerprint for matching.
At runtime, the incoming text is embedded once and dot-producted against
every cached ``mean_vec``. Emotions above ``MATCH_THRESHOLD`` are returned
as ``(emotion_name, delta_vector)`` pairs, capped at ``TOP_N``.
Falls back to an empty list (caller should use ``scan_text_for_triggers``
from ``ncm_delta_parser``) when no embeddings are ready yet.
Redis key: ncm:trigger_embed:{sha256_hex_of_emotion_name}
Redis value: JSON object — see below
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import random
import re
from typing import Dict, List, Optional, Tuple
import httpx
from gemini_embed_pool import embed_batch_via_gemini
from openrouter_client import OpenRouterClient
logger = logging.getLogger(__name__)
OPENROUTER_CHAT_URL = "https://openrouter.ai/api/v1/chat/completions"
EMBED_MODEL = "google/gemini-embedding-001"
REDIS_KEY_PREFIX = "ncm:trigger_embed:v2:"
NUM_PHRASES = 50
PHRASES_PER_CALL = 10 # Generate in batches across different models
_TTL_MIN = 7 * 86400 # 7 days
_TTL_MAX = 14 * 86400 # 14 days
def _random_ttl() -> int:
return random.randint(_TTL_MIN, _TTL_MAX)
# Similarity threshold — only fire emotions above this cosine similarity.
# 0.70 filters noise; exact-word fallback catches anything semantic misses.
MATCH_THRESHOLD = 0.55
# Maximum emotions to return per text scan
TOP_N = 6
# Semantic matches fire far more often than the exact-word system the YAML
# deltas were tuned for. This scale factor compensates.
SEMANTIC_SCALE = 0.5
# Maximum absolute delta per node from trigger matching
_TRIGGER_NODE_CAP = 0.5
# Paid model repeated 5× so 5 calls × 10 phrases/call = 50 total phrases.
PHRASE_MODELS = ["google/gemini-3.1-flash-lite-preview"] * 5
def _phrase_system() -> str:
"""Internal helper: phrase system.
Returns:
str: Result string.
"""
n = PHRASES_PER_CALL
examples = ", ".join(f'"phrase{i}"' for i in range(1, n + 1))
return (
"You are a psychology writer. Given a named emotion and its character, "
f"write exactly {n} short phrases (max 20 words each) that someone might "
"naturally say or type when experiencing this emotion. Use casual, "
"everyday language — not clinical terms. Include variety: questions, "
"statements, exclamations, and stream-of-consciousness fragments. "
f"Output ONLY a JSON array of {n} strings: "
f"[{examples}]\n"
"No other text, no markdown, no explanation."
)
# ─────────────────────────────────────────────────────────────────────
# Vector helpers (identical to ncm_variant_cache — no numpy dep)
# ─────────────────────────────────────────────────────────────────────
def _dot(a: list, b: list) -> float:
"""Internal helper: dot.
Args:
a (list): The a value.
b (list): The b value.
Returns:
float: The result.
"""
return sum(x * y for x, y in zip(a, b))
def _normalize(vec: list) -> list:
"""Internal helper: normalize.
Args:
vec (list): The vec value.
Returns:
list: List of results.
"""
norm = sum(x * x for x in vec) ** 0.5
if norm < 1e-10:
return vec
return [x / norm for x in vec]
def _mean_vec(vecs: list) -> list:
"""Internal helper: mean vec.
Args:
vecs (list): The vecs value.
Returns:
list: List of results.
"""
if not vecs:
return []
dim = len(vecs[0])
centroid = [sum(v[i] for v in vecs) / len(vecs) for i in range(dim)]
return _normalize(centroid)
def _cache_key(emotion_name: str) -> str:
"""Internal helper: cache key.
Args:
emotion_name (str): The emotion name value.
Returns:
str: Result string.
"""
h = hashlib.sha256(emotion_name.encode("utf-8")).hexdigest()
return f"{REDIS_KEY_PREFIX}{h}"
def _parse_phrases(text: str) -> list[str]:
"""Extract a list of strings from a possibly messy LLM response."""
if not text:
return []
text = text.strip()
if "```" in text:
m = re.search(r"```(?:json)?\s*(.*?)```", text, re.DOTALL)
if m:
text = m.group(1).strip()
start = text.find("[")
end = text.rfind("]")
if start == -1 or end <= start:
return []
try:
arr = json.loads(text[start : end + 1])
if isinstance(arr, list):
return [s.strip() for s in arr if isinstance(s, str) and s.strip()][:PHRASES_PER_CALL]
except (json.JSONDecodeError, ValueError):
pass
return []
[docs]
class SemanticTriggerMatcher:
"""Cosine-similarity emotion trigger detector.
Parameters
----------
redis_client:
An ``redis.asyncio.Redis`` instance. May be None.
api_key:
OpenRouter API key. When None the matcher is a no-op and
``find_triggers`` always returns an empty list.
"""
[docs]
def __init__(
self,
redis_client=None,
api_key: Optional[str] = None,
openrouter_client: OpenRouterClient | None = None,
) -> None:
"""Initialize the instance.
Args:
redis_client: Redis connection client.
api_key (Optional[str]): The api key value.
openrouter_client: Shared OpenRouterClient for connection
pooling and batch embedding. Falls back to direct HTTP
when None.
"""
self._redis = redis_client
self._api_key = api_key
self._openrouter = openrouter_client
# emotion_name → {"name", "affect", "vec", "phrase_vecs", "mean_vec"}
self._embeddings: Dict[str, dict] = {}
self._pending: set[str] = set()
# ------------------------------------------------------------------
# Public hot-path
# ------------------------------------------------------------------
[docs]
async def find_triggers(
self,
text: str,
threshold: float = MATCH_THRESHOLD,
top_n: int = TOP_N,
) -> List[Tuple[str, Dict[str, float]]]:
"""Embed *text* and return top-N emotions above *threshold*.
Returns list of (emotion_name, delta_vector) tuples, sorted by
descending similarity. Returns an empty list if no embeddings are
ready (caller should fall back to exact matching).
"""
if not text or not self._api_key or not self._embeddings:
return []
try:
vecs = await self._embed_texts([text])
if not vecs:
return []
query_vec = vecs[0]
scored: list[tuple[float, str]] = []
for name, entry in self._embeddings.items():
mean = entry.get("mean_vec", [])
if not mean:
continue
score = _dot(query_vec, mean)
if score >= threshold:
scored.append((score, name))
if not scored:
return []
scored.sort(reverse=True)
results: List[Tuple[str, Dict[str, float]]] = []
from ncm_delta_parser import get_emotion_delta
for score, name in scored[:top_n]:
delta = get_emotion_delta(name)
if delta:
# Scale by cosine similarity AND SEMANTIC_SCALE.
# Cos-sim handles discrimination (strong > weak match).
# SEMANTIC_SCALE compensates for semantic matching
# firing more often than the exact-word system the
# YAML deltas were authored for.
scaled = {
k: max(-_TRIGGER_NODE_CAP, min(_TRIGGER_NODE_CAP, v * score * SEMANTIC_SCALE))
for k, v in delta.items()
}
results.append((name, scaled))
return results
except Exception as e:
logger.debug("SemanticTriggerMatcher.find_triggers error: %s", e)
return []
# ------------------------------------------------------------------
# Startup warm-up
# ------------------------------------------------------------------
[docs]
async def ensure_all_cached(self) -> None:
"""Build the full emotion embedding index.
For each emotion in the recursion index:
1. Load from Redis if already cached.
2. Otherwise embed the affect string + generate/embed 5 variant
trigger phrases, then store in Redis.
Designed to run once at startup as a background task. Safe to call
multiple times — already-cached emotions are skipped.
"""
if not self._api_key:
return
try:
from ncm_delta_parser import get_all_emotions
all_emotions = get_all_emotions()
except Exception as e:
logger.warning("SemanticTriggerMatcher: failed to load emotions: %s", e)
return
# Phase 1: load any already-cached entries from Redis
if self._redis:
await self._load_all_from_redis(set(all_emotions.keys()))
# Phase 2: collect what still needs embedding
missing = [
(name, entry)
for name, entry in all_emotions.items()
if name not in self._embeddings
and name not in self._pending
]
if not missing:
logger.info(
"SemanticTriggerMatcher: all %d emotions already cached",
len(self._embeddings),
)
return
logger.info(
"SemanticTriggerMatcher: embedding %d emotions", len(missing)
)
# Phase 3: batch-embed all affect strings in one shot
names = [n for n, _ in missing]
affect_texts = [e.get("affect", n) for n, e in missing]
for name in names:
self._pending.add(name)
try:
affect_vecs = await self._embed_texts(affect_texts)
except Exception as e:
logger.warning("SemanticTriggerMatcher: batch embed failed: %s", e)
for name in names:
self._pending.discard(name)
return
# Phase 4: for each emotion, optionally generate variant phrases then store.
# Semaphore limits concurrent _build_and_store tasks to avoid a
# storm of parallel embedding HTTP requests at startup.
sem = asyncio.Semaphore(5)
async def _build_limited(
name: str, affect_str: str, affect_vec: list,
) -> None:
async with sem:
await self._build_and_store(name, affect_str, affect_vec)
for i, (name, emo_entry) in enumerate(missing):
affect_vec = affect_vecs[i] if i < len(affect_vecs) else []
affect_str = emo_entry.get("affect", name)
asyncio.create_task(
_build_limited(name, affect_str, affect_vec)
)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
async def _load_all_from_redis(self, emotion_names: set[str]) -> None:
"""Load all cached trigger embeddings from Redis into _embeddings."""
if not self._redis:
return
loaded = 0
for name in emotion_names:
try:
raw = await self._redis.get(_cache_key(name))
if not raw:
continue
data = json.loads(raw)
if isinstance(data, dict) and data.get("mean_vec"):
self._embeddings[name] = data
loaded += 1
except Exception:
pass
if loaded:
logger.info(
"SemanticTriggerMatcher: loaded %d emotion embeddings from Redis",
loaded,
)
async def _build_and_store(
self,
name: str,
affect: str,
affect_vec: list,
) -> None:
"""Generate variant phrases, embed them, compute mean_vec, store."""
try:
# Generate colloquial trigger phrases via LLM
phrases = await self._generate_phrases(name, affect)
all_vecs = [affect_vec] if affect_vec else []
phrase_vecs: list[list] = []
if phrases:
try:
p_vecs = await self._embed_texts(phrases)
phrase_vecs = p_vecs
all_vecs.extend(p_vecs)
except Exception as e:
logger.debug(
"Phrase embed failed for %s: %s", name, e
)
mean = _mean_vec(all_vecs) if all_vecs else affect_vec
entry = {
"name": name,
"affect": affect,
"vec": affect_vec,
"phrase_vecs": phrase_vecs,
"mean_vec": mean,
}
self._embeddings[name] = entry
if self._redis:
try:
await self._redis.set(
_cache_key(name), json.dumps(entry),
ex=_random_ttl(),
)
except Exception as e:
logger.debug("Redis write failed for %s: %s", name, e)
logger.debug(
"SemanticTriggerMatcher: cached %s (%d phrase vecs)",
name, len(phrase_vecs),
)
except Exception as e:
logger.warning(
"SemanticTriggerMatcher: _build_and_store failed for %s: %s",
name, e,
)
finally:
self._pending.discard(name)
async def _generate_phrases(self, name: str, affect: str) -> list[str]:
"""Generate colloquial phrases for this emotion via multiple LLM calls.
Makes up to 5 calls (one per model) to collect ~50 diverse phrases.
"""
if not self._api_key:
return []
headers = {
"Authorization": f"Bearer {self._api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/matrix-llm-bot",
"X-Title": "Matrix LLM Bot",
}
user_msg = f'Emotion: "{name}"\nCharacter: "{affect}"'
all_phrases: list[str] = []
models = list(PHRASE_MODELS)
random.shuffle(models)
async with httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=10.0)
) as client:
for model in models:
payload = {
"model": model,
"messages": [
{"role": "system", "content": _phrase_system()},
{"role": "user", "content": user_msg},
],
"temperature": 0.9,
"max_tokens": 10240,
}
try:
resp = await client.post(
OPENROUTER_CHAT_URL, json=payload, headers=headers
)
if resp.status_code == 429:
await asyncio.sleep(5)
continue
resp.raise_for_status()
data = resp.json()
if "error" in data:
continue
text = (
data.get("choices", [{}])[0]
.get("message", {})
.get("content", "")
)
phrases = _parse_phrases(text)
if phrases:
all_phrases.extend(phrases)
except Exception:
continue
# Stop early if we have enough
if len(all_phrases) >= NUM_PHRASES:
break
if not all_phrases:
logger.debug("All phrase models failed for %s", name)
# Deduplicate while preserving order, cap at NUM_PHRASES
seen = set()
unique: list[str] = []
for p in all_phrases:
p_lower = p.lower()
if p_lower not in seen:
seen.add(p_lower)
unique.append(p)
return unique[:NUM_PHRASES]
async def _embed_texts(self, texts: list[str]) -> list[list]:
"""Batch-embed texts using google/gemini-embedding-001.
When an ``OpenRouterClient`` was provided at init time, delegates
to ``client.embed_batch()`` for shared connection pooling,
dual-provider failover, and the batch embedding API. Otherwise
falls back to direct HTTP (original implementation).
"""
if not texts:
return []
# ── Preferred path: shared OpenRouterClient ──────────────
if self._openrouter is not None:
try:
raw = await self._openrouter.embed_batch(
texts, EMBED_MODEL,
)
return [
_normalize(v) if v else []
for v in raw
]
except Exception as e:
logger.warning(
"Shared client embed_batch failed, "
"falling back to direct HTTP: %s", e,
)
# Fall through to direct HTTP below
# ── Fallback: Gemini API via shared key pool ─────────────
try:
raw = await embed_batch_via_gemini(texts, EMBED_MODEL)
return [_normalize(v) if v else [] for v in raw]
except Exception as e:
logger.debug("Gemini embed fallback failed: %s", e)
return []