"""NCM Semantic Trigger Matcher — cosine-similarity-based emotion detection.
Replaces the exact-word ``scan_text_for_triggers`` with embedding-based
semantic matching. Each emotion in ``reality_marble_recursion_index.yaml``
gets:
1. An embedding of its *affect* description string.
2. LLM-generated colloquial phrases a person might say when feeling
that emotion, each also embedded.
3. A ``mean_vec`` (L2-normalized centroid of all above embeddings) that
serves as the emotion's semantic fingerprint for matching.
At runtime, the incoming text is embedded once and dot-producted against
every cached ``mean_vec``. Emotions above ``MATCH_THRESHOLD`` are returned
as ``(emotion_name, delta_vector)`` pairs, capped at ``TOP_N``.
Falls back to an empty list (caller should use ``scan_text_for_triggers``
from ``ncm_delta_parser``) when no embeddings are ready yet.
Redis key: ncm:trigger_embed:v2:{sha256_hex_of_emotion_name}
Redis value: JSON object — see below
"""
from __future__ import annotations
import asyncio
import hashlib
import jsonutil as json
import logging
import random
import re
from typing import Dict, List, Optional, Tuple
import httpx
from gemini_embed_pool import embed_batch_via_gemini
from openrouter_client import OpenRouterClient
from vector_store import AsyncPgVectorCollection
logger = logging.getLogger(__name__)
import os
DISABLE_LLM_PHRASES = os.environ.get("SG_DISABLE_STARTUP_LLM_WARMING", "").lower() in ("1", "true", "yes", "on")
OPENROUTER_CHAT_URL = "https://openrouter.ai/api/v1/chat/completions"
EMBED_MODEL = "google/gemini-embedding-001"
REDIS_KEY_PREFIX = "ncm:trigger_embed:v2:"
NUM_PHRASES = 50
PHRASES_PER_CALL = 10 # Generate in batches across different models
_TTL_MIN = 7 * 86400 # 7 days
_TTL_MAX = 14 * 86400 # 14 days
def _random_ttl() -> int:
"""Pick a randomized cache TTL in seconds, jittered over a 7–14 day window.
Returns a uniformly random integer between ``_TTL_MIN`` (7 days) and
``_TTL_MAX`` (14 days) so that cached trigger-embedding keys expire at
staggered times rather than all at once, avoiding a synchronized
re-warming storm. Called only by ``SemanticTriggerMatcher._build_and_store``
in this module to set the ``ex=`` expiry on the Redis ``set`` of an
emotion entry (the identically named helpers in ``ncm_variant_cache`` and
``scripts/pregenerate_ncm_triggers`` are separate definitions).
Returns:
int: A random TTL in seconds, ``_TTL_MIN <= n <= _TTL_MAX``.
"""
return random.randint(_TTL_MIN, _TTL_MAX)
# Similarity threshold — only fire emotions above this cosine similarity.
# 0.70 filters noise; exact-word fallback catches anything semantic misses.
MATCH_THRESHOLD = 0.55
# Maximum emotions to return per text scan
TOP_N = 6
# Semantic matches fire far more often than the exact-word system the YAML
# deltas were tuned for. This scale factor compensates.
SEMANTIC_SCALE = 0.5
# Maximum absolute delta per node from trigger matching
_TRIGGER_NODE_CAP = 0.5
# Optimize to make exactly 1 LLM completions call per emotion to avoid warming storms.
PHRASE_MODELS = ["google/gemini-3.1-flash-lite"]
NUM_PHRASES = 10
PHRASES_PER_CALL = 10
def _phrase_system() -> str:
"""Internal helper: phrase system.
Returns:
str: Result string.
"""
n = PHRASES_PER_CALL
examples = ", ".join(f'"phrase{i}"' for i in range(1, n + 1))
return (
"You are a psychology writer. Given a named emotion and its character, "
f"write exactly {n} short phrases (max 20 words each) that someone might "
"naturally say or type when experiencing this emotion. Use casual, "
"everyday language — not clinical terms. Include variety: questions, "
"statements, exclamations, and stream-of-consciousness fragments. "
f"Output ONLY a JSON array of {n} strings: "
f"[{examples}]\n"
"No other text, no markdown, no explanation."
)
# ─────────────────────────────────────────────────────────────────────
# Vector helpers (identical to ncm_variant_cache — no numpy dep)
# ─────────────────────────────────────────────────────────────────────
def _dot(a: list, b: list) -> float:
"""Internal helper: dot.
Args:
a (list): The a value.
b (list): The b value.
Returns:
float: The result.
"""
return sum(x * y for x, y in zip(a, b))
def _normalize(vec: list) -> list:
"""Internal helper: normalize.
Args:
vec (list): The vec value.
Returns:
list: List of results.
"""
norm = sum(x * x for x in vec) ** 0.5
if norm < 1e-10:
return vec
return [x / norm for x in vec]
def _mean_vec(vecs: list) -> list:
"""Internal helper: mean vec.
Args:
vecs (list): The vecs value.
Returns:
list: List of results.
"""
if not vecs:
return []
dim = len(vecs[0])
centroid = [sum(v[i] for v in vecs) / len(vecs) for i in range(dim)]
return _normalize(centroid)
def _cache_key(emotion_name: str) -> str:
"""Internal helper: cache key.
Args:
emotion_name (str): The emotion name value.
Returns:
str: Result string.
"""
h = hashlib.sha256(emotion_name.encode("utf-8")).hexdigest()
return f"{REDIS_KEY_PREFIX}{h}"
def _parse_phrases(text: str) -> list[str]:
"""Extract a list of phrase strings from a possibly messy LLM response.
Tolerantly recovers the JSON array of colloquial trigger phrases that
``_phrase_system`` asks the model to emit, even when the completion is
wrapped in a Markdown code fence or padded with stray prose. It strips
any fenced block, slices from the first ``[`` to the last ``]``, parses
that span with the ``jsonutil`` loader, keeps only non-empty strings, and
caps the result at ``PHRASES_PER_CALL``. Pure string processing with no
I/O or side effects. Called by ``SemanticTriggerMatcher._generate_phrases``
in this module (and by the standalone ``scripts/pregenerate_ncm_triggers``
pregeneration helper) to clean each per-model phrase batch.
Args:
text (str): The raw text content of an LLM completion.
Returns:
list[str]: The parsed phrases (trimmed, non-empty), at most
``PHRASES_PER_CALL`` long; an empty list when nothing parses.
"""
if not text:
return []
text = text.strip()
if "```" in text:
m = re.search(r"```(?:json)?\s*(.*?)```", text, re.DOTALL)
if m:
text = m.group(1).strip()
start = text.find("[")
end = text.rfind("]")
if start == -1 or end <= start:
return []
try:
arr = json.loads(text[start : end + 1])
if isinstance(arr, list):
return [s.strip() for s in arr if isinstance(s, str) and s.strip()][
:PHRASES_PER_CALL
]
except (json.JSONDecodeError, ValueError):
pass
return []
[docs]
class SemanticTriggerMatcher:
"""Cosine-similarity emotion trigger detector.
Parameters
----------
redis_client:
An ``redis.asyncio.Redis`` instance. May be None.
api_key:
OpenRouter API key. When None the matcher is a no-op and
``find_triggers`` always returns an empty list.
"""
[docs]
def __init__(
self,
redis_client=None,
api_key: Optional[str] = None,
openrouter_client: OpenRouterClient | None = None,
) -> None:
"""Initialize the instance.
Args:
redis_client: Redis connection client.
api_key (Optional[str]): The api key value.
openrouter_client: Shared OpenRouterClient for connection
pooling and batch embedding. Falls back to direct HTTP
when None.
"""
self._redis = redis_client
self._api_key = api_key
self._openrouter = openrouter_client
self._collection = AsyncPgVectorCollection("ncm_system", "emotion_triggers")
# emotion_name → {"name", "affect", "vec", "phrase_vecs", "mean_vec"}
self._embeddings: Dict[str, dict] = {}
self._pending: set[str] = set()
self._warming_task: Optional[asyncio.Task] = None
self._warming_lock = asyncio.Lock()
self._redis_max_retries = 30
self._redis_retry_interval = 2.0
# ------------------------------------------------------------------
# Public hot-path
# ------------------------------------------------------------------
[docs]
async def find_triggers(
self,
text: str,
threshold: float = MATCH_THRESHOLD,
top_n: int = TOP_N,
query_embedding: list[float] | None = None,
) -> List[Tuple[str, Dict[str, float]]]:
"""Embed ``text`` and return the top-N emotions above ``threshold``.
This is the hot-path semantic replacement for exact-word trigger
scanning: it embeds the incoming text once, dot-products that query
vector against every cached emotion ``mean_vec`` fingerprint, keeps
matches at or above ``threshold``, and returns the strongest few as
scaled limbic delta vectors so the cascade can nudge the right nodes.
Embedding goes through ``self._embed_texts`` (the shared
``OpenRouterClient.embed_batch`` when present, else the Gemini key
pool), so this issues network I/O but touches no Redis or filesystem.
Per-emotion delta templates are pulled lazily via
``ncm_delta_parser.get_emotion_delta`` and scaled by the cosine score,
``SEMANTIC_SCALE``, and clamped to ``_TRIGGER_NODE_CAP`` per node. It
is a no-op returning an empty list when there is no text, no API key,
or no warmed embeddings yet, signalling the caller to fall back to
exact matching. Called by ``LimbicCoordinator`` in
``limbic_system/coordinator.py`` as ``self._trigger_matcher.find_triggers(text)``.
Args:
text (str): The user/turn text to scan for emotional triggers.
threshold (float): Minimum cosine similarity for an emotion to
fire. Defaults to ``MATCH_THRESHOLD``.
top_n (int): Maximum number of emotions to return. Defaults to
``TOP_N``.
query_embedding (list[float], optional): Precomputed embedding
for the query text. If provided, embedding generation is bypassed.
Returns:
List[Tuple[str, Dict[str, float]]]: ``(emotion_name, delta_vector)``
pairs sorted by descending similarity, where each delta maps node
names to clamped, scaled adjustments. Empty when nothing matches
or no embeddings are ready.
"""
if not self._api_key:
logger.info("SemanticTriggerMatcher.find_triggers: no API key configured. Bypassing semantic trigger match.")
return []
if not query_embedding:
if not text:
logger.debug("SemanticTriggerMatcher.find_triggers: empty text and no query_embedding. Bypassing.")
return []
try:
logger.debug("SemanticTriggerMatcher.find_triggers: generating query embedding for text: '%s...'", text[:60])
vecs = await self._embed_texts([text])
if not vecs or not vecs[0]:
logger.warning("SemanticTriggerMatcher.find_triggers: embedding generation returned empty vector.")
return []
query_embedding = vecs[0]
except Exception as e:
logger.error("SemanticTriggerMatcher.find_triggers: embedding generation failed: %s", e)
return []
else:
logger.debug("SemanticTriggerMatcher.find_triggers: reusing precomputed query_embedding.")
try:
# Primary: query pgvector
logger.debug("SemanticTriggerMatcher.find_triggers: querying pgvector store with embedding (dim=%d)", len(query_embedding))
try:
hits = await self._collection.query(
embedding=query_embedding,
n_results=top_n,
)
logger.debug("SemanticTriggerMatcher.find_triggers: pgvector query returned %d hits.", len(hits))
except Exception as e:
logger.error("SemanticTriggerMatcher.find_triggers: pgvector query failed: %s. Falling back to in-memory.", e)
hits = []
# Fallback to in-memory dictionary if populated
if self._embeddings:
logger.info("SemanticTriggerMatcher.find_triggers: evaluating %d in-memory emotions.", len(self._embeddings))
scored: list[dict] = []
for name, entry in self._embeddings.items():
mean = entry.get("mean_vec", [])
if not mean:
continue
score = _dot(query_embedding, mean)
scored.append({
"id": name,
"similarity": score,
"document": entry.get("affect", name),
})
# Sort descending by similarity
scored.sort(key=lambda h: h["similarity"], reverse=True)
hits = scored[:top_n]
results: List[Tuple[str, Dict[str, float]]] = []
from ncm_delta_parser import get_emotion_delta
for hit in hits:
name = hit["id"]
score = hit["similarity"]
if score is None or score < threshold:
logger.debug("SemanticTriggerMatcher.find_triggers: hit '%s' score %.4f is below threshold %.4f", name, score or 0.0, threshold)
continue
delta = get_emotion_delta(name)
if delta:
# Scale by cosine similarity AND SEMANTIC_SCALE.
# Cos-sim handles discrimination (strong > weak match).
# SEMANTIC_SCALE compensates for semantic matching
# firing more often than the exact-word system the
# YAML deltas were authored for.
scaled = {
k: max(
-_TRIGGER_NODE_CAP,
min(_TRIGGER_NODE_CAP, v * score * SEMANTIC_SCALE),
)
for k, v in delta.items()
}
logger.info("SemanticTriggerMatcher.find_triggers: MATCHED emotion '%s' similarity %.4f, delta vector: %s", name, score, scaled)
results.append((name, scaled))
return results
except Exception as e:
logger.debug("SemanticTriggerMatcher.find_triggers error: %s", e)
return []
# ------------------------------------------------------------------
# Startup warm-up
# ------------------------------------------------------------------
[docs]
async def ensure_all_cached(self) -> None:
"""Build the full emotion embedding index.
For each emotion in the recursion index:
1. Load from Redis if already cached.
2. Otherwise embed the affect string + generate/embed up to
``NUM_PHRASES`` variant trigger phrases, then store in Redis.
Designed to run once at startup as a background task. Safe to call
multiple times — already-cached emotions are skipped.
"""
if not self._api_key:
return
if self._warming_task is not None:
logger.info("SemanticTriggerMatcher: trigger cache warming already in progress; awaiting existing warming task")
try:
await self._warming_task
except Exception as e:
logger.warning("SemanticTriggerMatcher: existing warming task failed: %s", e)
return
async with self._warming_lock:
# Double check within the lock
if self._warming_task is None:
self._warming_task = asyncio.create_task(self._ensure_all_cached_internal())
try:
await self._warming_task
except Exception as e:
logger.warning("SemanticTriggerMatcher: warming failed: %s", e)
async def _ensure_all_cached_internal(self) -> None:
"""Run the actual emotion-embedding warm-up, in four phases.
This is the real work behind ``ensure_all_cached``: it loads the full
emotion set from ``ncm_delta_parser.get_all_emotions``, ensures the
pgvector table exists, queries pgvector to see which emotions are missing,
pulls missing ones from Redis if cached there, saves them in pgvector,
and generates/embeds/saves any remaining ones to both Redis and Postgres.
"""
try:
from ncm_delta_parser import get_all_emotions
all_emotions = get_all_emotions()
except Exception as e:
logger.warning("SemanticTriggerMatcher: failed to load emotions: %s", e)
return
try:
await self._collection.ensure()
except Exception as e:
logger.error("SemanticTriggerMatcher: failed to ensure pgvector table: %s", e)
pg_ids = set()
try:
pg_res = await self._collection.get(limit=1000)
pg_ids = set(pg_res.get("ids", []))
except Exception as e:
logger.warning("SemanticTriggerMatcher: failed to fetch from pgvector: %s", e)
missing_in_pg = [name for name in all_emotions if name not in pg_ids]
if not missing_in_pg:
logger.info("SemanticTriggerMatcher: all %d emotions already present in pgvector", len(all_emotions))
return
logger.info(
"SemanticTriggerMatcher: found %d emotions missing from pgvector: %s",
len(missing_in_pg),
missing_in_pg,
)
redis_ready = False
if not self._redis:
logger.warning(
"SemanticTriggerMatcher [Redis Offline/Absent]: Redis client is not available. "
"Trigger warming will bypass cache."
)
else:
for attempt in range(1, self._redis_max_retries + 1):
try:
await self._redis.ping()
redis_ready = True
logger.info("SemanticTriggerMatcher [Redis Online]: Successfully connected to Redis.")
break
except Exception as e:
logger.warning(
"SemanticTriggerMatcher [Redis Connection Attempt %d/%d Failed]: Failed to ping Redis: %s. "
"Retrying in %s seconds...",
attempt,
self._redis_max_retries,
e,
self._redis_retry_interval,
)
await asyncio.sleep(self._redis_retry_interval)
if not redis_ready:
logger.error(
"SemanticTriggerMatcher [Redis Connection Timeout]: Failed to connect to Redis after %d attempts.",
self._redis_max_retries,
)
if redis_ready:
await self._load_all_from_redis(set(missing_in_pg))
migrated_names = []
migrated_docs = []
migrated_metas = []
migrated_vecs = []
still_missing = []
for name in missing_in_pg:
if name in self._embeddings:
entry = self._embeddings[name]
mean_vec = entry.get("mean_vec", [])
if mean_vec:
migrated_names.append(name)
migrated_docs.append(entry.get("affect", name))
migrated_metas.append({"phrase_vecs": entry.get("phrase_vecs", [])})
migrated_vecs.append(mean_vec)
elif name not in self._pending:
still_missing.append((name, all_emotions[name]))
if migrated_names:
try:
await self._collection.upsert(
ids=migrated_names,
documents=migrated_docs,
metadatas=migrated_metas,
embeddings=migrated_vecs
)
logger.info(
"SemanticTriggerMatcher: successfully migrated %d emotions from Redis to pgvector: %s",
len(migrated_names),
migrated_names
)
except Exception as e:
logger.error("SemanticTriggerMatcher: failed to migrate emotions to pgvector: %s", e)
if not still_missing:
logger.info("SemanticTriggerMatcher: all missing emotions successfully resolved/migrated.")
return
logger.info(
"SemanticTriggerMatcher: %d emotions need LLM phrase generation and embedding: %s",
len(still_missing),
[name for name, _ in still_missing],
)
names = [n for n, _ in still_missing]
affect_texts = [e.get("affect", n) for n, e in still_missing]
for name in names:
self._pending.add(name)
try:
affect_vecs = await self._embed_texts(affect_texts)
except Exception as e:
logger.warning("SemanticTriggerMatcher: batch embed failed: %s", e)
for name in names:
self._pending.discard(name)
return
sem = asyncio.Semaphore(2)
async def _build_limited(
name: str,
affect_str: str,
affect_vec: list,
) -> None:
async with sem:
await self._build_and_store(name, affect_str, affect_vec)
for i, (name, emo_entry) in enumerate(still_missing):
affect_vec = affect_vecs[i] if i < len(affect_vecs) else []
affect_str = emo_entry.get("affect", name)
asyncio.create_task(_build_limited(name, affect_str, affect_vec))
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
async def _load_all_from_redis(self, emotion_names: set[str]) -> None:
"""Bulk-load cached trigger embeddings from Redis into memory.
Concurrently reads the ``ncm:trigger_embed:v2:`` key (hashed from each
emotion name by ``_cache_key``) for every requested emotion, throttled
to two simultaneous ``GET`` calls by an ``asyncio.Semaphore`` so the
warm-up scan does not saturate the connection pool. Each value is
JSON-decoded with the ``jsonutil`` loader and, when it is a dict,
merged into ``self._embeddings`` keyed by emotion name; misses,
read errors, and corrupt JSON are logged and skipped rather than
raised.
Side effects: issues Redis reads and mutates ``self._embeddings``;
does nothing when ``self._redis`` is unset. Called by
``_ensure_all_cached_internal`` in this module, by
``scripts/pregenerate_trigger_embeddings``, and directly by the
warm-up and telemetry test suites.
Args:
emotion_names (set[str]): The emotion names to attempt to load
from the cache.
Returns:
None
"""
if not self._redis:
return
# Concurrently fetch all emotion keys from Redis, throttled to 2 connections
names_list = list(emotion_names)
sem = asyncio.Semaphore(2)
async def sem_get(name):
"""Fetch one emotion's cached entry from Redis under a connection cap.
Acquires the enclosing ``sem`` semaphore (limit 2) before issuing a
``self._redis.get`` on ``_cache_key(name)``, throttling concurrent
Redis reads to at most two when the surrounding ``_load_all_from_redis``
gathers a fetch task per emotion. Closes over ``sem`` and ``self``.
Args:
name (str): The emotion name whose cached embedding entry to
read; hashed into the Redis key by ``_cache_key``.
Returns:
The raw Redis value for the key (JSON string or ``None`` if the
key is absent).
"""
async with sem:
return await self._redis.get(_cache_key(name))
tasks = [sem_get(name) for name in names_list]
results = await asyncio.gather(*tasks, return_exceptions=True)
loaded = 0
for name, raw in zip(names_list, results):
if isinstance(raw, Exception):
logger.warning(
"SemanticTriggerMatcher [Redis Read Error]: Unexpected error reading emotion '%s' key '%s': %s",
name,
_cache_key(name),
raw,
)
continue
if not raw:
logger.info(
"SemanticTriggerMatcher [Redis Miss]: Emotion '%s' not found in Redis cache under key '%s'",
name,
_cache_key(name),
)
continue
try:
data = json.loads(raw)
except (json.JSONDecodeError, ValueError):
logger.warning(
"SemanticTriggerMatcher [Redis Corrupt]: Invalid JSON structure in Redis key '%s' for emotion '%s'",
_cache_key(name),
name,
)
continue
if isinstance(data, dict):
self._embeddings[name] = data
loaded += 1
logger.info(
"SemanticTriggerMatcher [Redis Hit]: Loaded cached emotion '%s' from Redis key '%s'",
name,
_cache_key(name),
)
else:
logger.warning(
"SemanticTriggerMatcher [Redis Corrupt]: Invalid JSON structure in Redis key '%s' for emotion '%s'",
_cache_key(name),
name,
)
if loaded:
logger.info(
"SemanticTriggerMatcher: loaded %d emotion embeddings from Redis",
loaded,
)
async def _build_and_store(
self,
name: str,
affect: str,
affect_vec: list,
) -> None:
"""Build one emotion's semantic fingerprint and persist it.
Completes the per-emotion warm-up pipeline: unless startup LLM warming
is disabled, it asks ``_generate_phrases`` for colloquial trigger
phrases, embeds them via ``_embed_texts``, combines those phrase
vectors with the precomputed ``affect_vec`` into an L2-normalized
centroid through ``_mean_vec``, and assembles the cache entry holding
the name, affect text, affect vector, phrase vectors, and ``mean_vec``
fingerprint used by ``find_triggers``.
The entry is stored in ``self._embeddings`` and, when Redis is
available, written to the ``ncm:trigger_embed:v2:`` key (from
``_cache_key``) with a jittered TTL from ``_random_ttl``. It always
discards the name from ``self._pending`` in a ``finally`` block, and
swallows per-emotion failures so one bad emotion cannot abort the
whole warm-up. Side effects: LLM and embedding network calls plus a
Redis write; no filesystem access. Called by ``_build_limited`` during
``_ensure_all_cached_internal`` fan-out, by
``scripts/pregenerate_trigger_embeddings``, and directly in the
telemetry tests.
Args:
name (str): The emotion name (key into the recursion index).
affect (str): The emotion's affect description string.
affect_vec (list): The precomputed embedding of ``affect``, or an
empty list when affect embedding was unavailable.
Returns:
None
"""
try:
phrases = []
if not DISABLE_LLM_PHRASES:
# Generate colloquial trigger phrases via LLM
phrases = await self._generate_phrases(name, affect)
all_vecs = [affect_vec] if affect_vec else []
phrase_vecs: list[list] = []
if phrases:
try:
p_vecs = await self._embed_texts(phrases)
phrase_vecs = p_vecs
all_vecs.extend(p_vecs)
except Exception as e:
logger.debug("Phrase embed failed for %s: %s", name, e)
mean = _mean_vec(all_vecs) if all_vecs else affect_vec
entry = {
"name": name,
"affect": affect,
"vec": affect_vec,
"phrase_vecs": phrase_vecs,
"mean_vec": mean,
}
self._embeddings[name] = entry
try:
await self._collection.upsert(
ids=[name],
documents=[affect],
metadatas=[{"phrase_vecs": phrase_vecs}],
embeddings=[mean]
)
logger.info("SemanticTriggerMatcher: successfully saved emotion '%s' to pgvector", name)
except Exception as e:
logger.error("SemanticTriggerMatcher: failed to save emotion '%s' to pgvector: %s", name, e)
if self._redis:
try:
ttl = _random_ttl()
logger.info(
"SemanticTriggerMatcher [Redis Store Attempt]: Saving emotion '%s' to Redis key '%s' with TTL=%d",
name,
_cache_key(name),
ttl,
)
await self._redis.set(
_cache_key(name),
json.dumps(entry),
ex=ttl,
)
logger.info(
"SemanticTriggerMatcher [Redis Store Success]: Successfully cached emotion '%s' in Redis key '%s'",
name,
_cache_key(name),
)
except Exception as e:
logger.warning(
"SemanticTriggerMatcher [Redis Store Error]: Failed to write emotion '%s' to Redis key '%s': %s",
name,
_cache_key(name),
e,
)
logger.debug(
"SemanticTriggerMatcher: cached %s (%d phrase vecs)",
name,
len(phrase_vecs),
)
except Exception as e:
logger.warning(
"SemanticTriggerMatcher: _build_and_store failed for %s: %s",
name,
e,
)
finally:
self._pending.discard(name)
async def _generate_phrases(self, name: str, affect: str) -> list[str]:
"""Generate colloquial phrases for this emotion via LLM calls.
Iterates ``PHRASE_MODELS`` (one call per model), stopping early once
at least ``NUM_PHRASES`` phrases are collected, then deduplicates and
caps the result at ``NUM_PHRASES``.
"""
if not self._api_key:
return []
headers = {
"Authorization": f"Bearer {self._api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/matrix-llm-bot",
"X-Title": "Matrix LLM Bot",
}
user_msg = f'Emotion: "{name}"\nCharacter: "{affect}"'
all_phrases: list[str] = []
models = list(PHRASE_MODELS)
random.shuffle(models)
async with httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=10.0)
) as client:
for model in models:
payload = {
"model": model,
"messages": [
{"role": "system", "content": _phrase_system()},
{"role": "user", "content": user_msg},
],
"temperature": 0.9,
"max_tokens": 10240,
}
try:
resp = await client.post(
OPENROUTER_CHAT_URL, json=payload, headers=headers
)
if resp.status_code == 429:
await asyncio.sleep(5)
continue
resp.raise_for_status()
data = resp.json()
if "error" in data:
continue
text = (
data.get("choices", [{}])[0]
.get("message", {})
.get("content", "")
)
phrases = _parse_phrases(text)
if phrases:
all_phrases.extend(phrases)
except Exception:
continue
# Stop early if we have enough
if len(all_phrases) >= NUM_PHRASES:
break
if not all_phrases:
logger.debug("All phrase models failed for %s", name)
# Deduplicate while preserving order, cap at NUM_PHRASES
seen = set()
unique: list[str] = []
for p in all_phrases:
p_lower = p.lower()
if p_lower not in seen:
seen.add(p_lower)
unique.append(p)
return unique[:NUM_PHRASES]
async def _embed_texts(self, texts: list[str]) -> list[list]:
"""Batch-embed texts using google/gemini-embedding-001.
When an ``OpenRouterClient`` was provided at init time, delegates
to ``client.embed_batch()`` for shared connection pooling,
dual-provider failover, and the batch embedding API. Otherwise
falls back to direct HTTP (original implementation).
"""
if not texts:
return []
# ── Preferred path: shared OpenRouterClient ──────────────
if self._openrouter is not None:
try:
raw = await self._openrouter.embed_batch(
texts,
EMBED_MODEL,
)
return [_normalize(v) if v else [] for v in raw]
except Exception as e:
logger.warning(
"Shared client embed_batch failed, "
"falling back to direct HTTP: %s",
e,
)
# Fall through to direct HTTP below
# ── Fallback: Gemini API via shared key pool ─────────────
try:
raw = await embed_batch_via_gemini(texts, EMBED_MODEL)
return [_normalize(v) if v else [] for v in raw]
except Exception as e:
logger.debug("Gemini embed fallback failed: %s", e)
return []