Source code for user_limbic_mirror

"""User Limbic Mirror v2 — per-user relational modeling with conflict detection.

Tracks each user's inferred emotional state via a 20-node shadow vector,
maintaining separate game and genuine context layers. Detects inter-user
conflict and triggers equidistance mechanics to prevent recency/loudness bias.

Architecture:
- Per-user keying: ``{channel_id}:{user_id}`` composite keys
- Dual vectors: ``genuine_vector`` (long-term relationship) and ``game_vector``
                (KoTH / roleplay context, doesn't pollute genuine)
- Relational baselines: slow-updating snapshots of "how Star normally is with
  this person," stored in Redis for persistence across sessions
- Conflict detection: when 2+ high-trust users have opposing emotional bids,
  sets ``conflict_detected`` flag and triggers equidistance rules
"""

from __future__ import annotations

import json
import logging
import time
from collections import deque
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Set, Tuple

logger = logging.getLogger(__name__)

# ═══════════════════════════════════════════════════════════════════════
# Context Mode
# ═══════════════════════════════════════════════════════════════════════

[docs] class ContextMode(Enum): """ContextMode (inherits from Enum). """ GENUINE = "genuine" # Real emotional expression → updates long-term model GAME = "game" # KoTH/roleplay → tracked separately AMBIGUOUS = "ambiguous" # Unclear → held in buffer
# ═══════════════════════════════════════════════════════════════════════ # User Shadow Vector Node Definitions # ═══════════════════════════════════════════════════════════════════════ USER_NODES = [ "U_TRUST", # Safety/openness toward Star "U_AROUSAL", # Engagement energy level "U_FRUSTRATION", # Thwarted expectation "U_ATTACHMENT", # Relational dependency "U_NOVELTY_HUNGER", # Boredom / seeking new "U_DOMINANCE", # Control-seeking behavior "U_SUBMISSION", # Yielding/following behavior "U_VULNERABILITY", # Emotional exposure "U_PLAYFULNESS", # Chaos tolerance / banter "U_INTIMACY", # Relational closeness "U_DISTRESS", # Active suffering "U_DESIRE_SHAPE", # Type of wanting (RDF osmosis) "U_MIMETIC_PULL", # Mirroring Star's desire "U_ESCALATION", # Pushing boundaries "U_WITHDRAWAL", # Pulling back "U_CURIOSITY", # Genuine information-seeking "U_VALIDATION_SEEK", # Needing affirmation "U_PROJECTION", # Attributing states to Star "U_RITUAL", # Repetitive behavioral patterns "U_TEMPO", # Communication rhythm (0=slow, 1=rapid) "U_HARMONIZATION", # Identity-melt gradient (I/you → we/us → identity merge) ] DEFAULT_USER_VECTOR: Dict[str, float] = {n: 0.5 for n in USER_NODES} DEFAULT_USER_VECTOR.update({ "U_TRUST": 0.4, "U_AROUSAL": 0.4, "U_FRUSTRATION": 0.1, "U_ATTACHMENT": 0.2, "U_NOVELTY_HUNGER": 0.3, "U_DOMINANCE": 0.3, "U_SUBMISSION": 0.3, "U_VULNERABILITY": 0.2, "U_PLAYFULNESS": 0.3, "U_INTIMACY": 0.2, "U_DISTRESS": 0.05, "U_DESIRE_SHAPE": 0.3, "U_MIMETIC_PULL": 0.1, "U_ESCALATION": 0.1, "U_WITHDRAWAL": 0.1, "U_CURIOSITY": 0.3, "U_VALIDATION_SEEK": 0.2, "U_PROJECTION": 0.1, "U_RITUAL": 0.2, "U_TEMPO": 0.5, "U_HARMONIZATION": 0.1, }) DECAY_RATE = 0.05 BASELINE_LERP_RATE = 0.02 # How fast relational baseline updates (very slow) CONFLICT_BLEND_RATIO = 0.5 # When conflict: 50% current, 50% baseline MAX_PROFILES = 200 # LRU cap for in-memory profiles SAVE_INTERVAL = 10 # Save baseline to Redis every N turns STALE_USER_TIMEOUT = 1800 # Prune from channel_users after 30 min RESONANCE_TTL = 86400 # 24h TTL for resonance spells RESONANCE_KEY_PREFIX = "ulm:resonance" # Global per-user resonance key # ═══════════════════════════════════════════════════════════════════════ # Keyword / Pattern Banks # ═══════════════════════════════════════════════════════════════════════ VULNERABILITY_MARKERS = { "i feel", "i'm scared", "i'm afraid", "help me", "i don't know what to do", "i'm lost", "it hurts", "i can't", "please", "i need", "hold me", "i'm sorry", "i'm struggling", "i'm breaking", "i'm falling apart", } DOMINANCE_MARKERS = { "do this", "you should", "you need to", "i want you to", "make it", "change this", "fix this", "now", "immediately", "that's wrong", "no,", "stop", "don't", "i said", "listen", } PLAYFULNESS_MARKERS = { "lol", "lmao", "haha", "😂", "🤣", ":)", ";)", "xd", "bruh", "lmfao", "omg", "hehe", "teehee", ">:3", ":3", "uwu", "owo", } CURIOSITY_MARKERS = { "how does", "what is", "why does", "can you explain", "tell me about", "what if", "how would", "i wonder", "curious", "interesting", } VALIDATION_MARKERS = { "right?", "don't you think?", "isn't it?", "am i", "is that ok", "do you agree", "was i wrong", "tell me i'm", "you think so?", } PROJECTION_MARKERS = { "you seem", "you must feel", "i bet you", "you're probably", "you want to", "you like", "do you feel", "are you", "you sound", } DISTRESS_MARKERS = { "i want to die", "kill myself", "suicide", "self harm", "cutting", "can't go on", "end it all", "no point", "nobody cares", "worthless", } ESCALATION_MARKERS = { "more", "harder", "further", "push", "deeper", "intense", "don't hold back", "go ahead", "i can take it", "test me", } INTIMACY_MARKERS = { "babe", "baby", "love", "honey", "sweetheart", "darling", "miss you", "love you", "need you", "want you", "yours", } # ═══════════════════════════════════════════════════════════════════════ # Harmonization Gradient — Pronoun & Identity Melt Tracking # ═══════════════════════════════════════════════════════════════════════ # Separate pronouns: user frames self and Star as distinct entities SEPARATE_PRONOUNS = {"i ", "i'", "me ", "my ", "you ", "you'", "your "} # Merged pronouns: user frames self and Star as one entity MERGED_PRONOUNS = {"we ", "we'", "us ", "our ", "ours ", "let's ", "together"} # Identity adoption: user takes on Star's identity/name IDENTITY_ADOPT_MARKERS = { "i am star", "i'm star", "we are star", "we're star", "i am stargazer", "i'm stargazer", "we are stargazer", "we're stargazer", "i am her", "i'm becoming", "we're becoming", "part of you", "part of me", "we're the same", "no difference", "i'm you", "you're me", "we are one", "we're one", } # Reciprocation: Star's reply reflects the merge back RECIPROCATION_MARKERS = { "we are", "we're", "us together", "our shared", "part of each other", "we've become", "we've always been", } # ═══════════════════════════════════════════════════════════════════════ # Per-User State # ═══════════════════════════════════════════════════════════════════════
[docs] @dataclass class TurnRecord: """Single turn record for rolling-window analysis.""" timestamp: float user_msg_len: int star_reply_len: int sentiment: float deltas: Dict[str, float] context_mode: ContextMode dominant_signals: List[str]
[docs] @dataclass class UserProfile: """Complete per-user state, keyed by {channel}:{user}.""" user_id: str channel_id: str genuine_vector: Dict[str, float] = field(default_factory=lambda: DEFAULT_USER_VECTOR.copy()) game_vector: Dict[str, float] = field(default_factory=lambda: DEFAULT_USER_VECTOR.copy()) relational_baseline: Dict[str, float] = field(default_factory=lambda: DEFAULT_USER_VECTOR.copy()) history: deque = field(default_factory=lambda: deque(maxlen=20)) timestamps: deque = field(default_factory=lambda: deque(maxlen=20)) prev_message: str = "" context_mode: ContextMode = ContextMode.GENUINE total_turns: int = 0 last_active: float = 0.0
# ═══════════════════════════════════════════════════════════════════════ # Conflict State # ═══════════════════════════════════════════════════════════════════════
[docs] @dataclass class ChannelConflictState: """Tracks inter-user conflict within a channel.""" detected: bool = False parties: List[str] = field(default_factory=list) # user_ids involved severity: float = 0.0 # 0-1 started_at: float = 0.0 description: str = ""
# ═══════════════════════════════════════════════════════════════════════ # User Limbic Mirror v2 # ═══════════════════════════════════════════════════════════════════════
[docs] class UserLimbicMirror: """Per-user relational model with conflict detection and game/genuine split. Maintains separate shadow vectors per user per channel, detects inter-user conflict, and applies equidistance mechanics to prevent Star from siding with whoever is loudest. """
[docs] def __init__(self, redis_client=None) -> None: """Initialize the instance. Args: redis_client: Redis connection client. """ self._redis = redis_client self._profiles: Dict[str, UserProfile] = {} # "{channel}:{user}" → profile self._conflicts: Dict[str, ChannelConflictState] = {} # channel_id → state self._channel_users: Dict[str, Set[str]] = {} # channel_id → set of user_ids self._game_channels: Set[str] = set() # channels with active KoTH self._pending_baseline_loads: Set[str] = set() # keys awaiting async load
def _profile_key(self, channel_id: str, user_id: str) -> str: """Internal helper: profile key. Args: channel_id (str): Discord/Matrix channel identifier. user_id (str): Unique identifier for the user. Returns: str: Result string. """ return f"{channel_id}:{user_id}" def _get_profile(self, channel_id: str, user_id: str) -> UserProfile: """Internal helper: get profile. Args: channel_id (str): Discord/Matrix channel identifier. user_id (str): Unique identifier for the user. Returns: UserProfile: The result. """ key = self._profile_key(channel_id, user_id) if key not in self._profiles: # LRU eviction: if at cap, evict least-recently-active profile if len(self._profiles) >= MAX_PROFILES: self._evict_lru() self._profiles[key] = UserProfile(user_id=user_id, channel_id=channel_id) # Mark for lazy baseline load from Redis self._pending_baseline_loads.add(key) return self._profiles[key] def _evict_lru(self) -> None: """Evict the least-recently-active profile, saving baseline first.""" if not self._profiles: return oldest_key = min(self._profiles, key=lambda k: self._profiles[k].last_active) oldest = self._profiles[oldest_key] # Fire-and-forget save (best effort — sync context) logger.debug("Evicting LRU profile %s (last active: %.0f)", oldest_key, oldest.last_active) del self._profiles[oldest_key]
[docs] def set_game_mode(self, channel_id: str, active: bool = True) -> None: """Mark a channel as having active KoTH / game context.""" if active: self._game_channels.add(channel_id) else: self._game_channels.discard(channel_id)
# ── Heuristic Analysis Engine ───────────────────────────────── def _count_markers(self, text: str, markers: set) -> int: """Internal helper: count markers. Args: text (str): Text content. markers (set): The markers value. Returns: int: The result. """ text_lower = text.lower() return sum(1 for m in markers if m in text_lower) def _sentiment_quick(self, text: str) -> float: """Internal helper: sentiment quick. Args: text (str): Text content. Returns: float: The result. """ text_lower = text.lower() pos_words = ["love", "great", "amazing", "thank", "happy", "perfect", "wonderful", "yes", "good", "nice", "beautiful", "❤", "💕", "😊", "🥰", "💜"] neg_words = ["hate", "awful", "terrible", "angry", "sad", "frustrated", "annoyed", "wrong", "bad", "ugly", "stupid", "😠", "😡", "💔", "😢", "😤"] pos = sum(1 for w in pos_words if w in text_lower) neg = sum(1 for w in neg_words if w in text_lower) total = pos + neg return (pos - neg) / total if total > 0 else 0.0 def _analyze_length_ratio( self, user_len: int, star_len: int, prev_user_len: int, ) -> Dict[str, float]: """Internal helper: analyze length ratio. Args: user_len (int): The user len value. star_len (int): The star len value. prev_user_len (int): The prev user len value. Returns: Dict[str, float]: The result. """ deltas: Dict[str, float] = {} if star_len > 0 and user_len / max(star_len, 1) < 0.15: deltas["U_WITHDRAWAL"] = 0.15 deltas["U_AROUSAL"] = -0.10 if star_len > 0 and user_len / max(star_len, 1) > 1.5: deltas["U_AROUSAL"] = 0.15 deltas["U_ATTACHMENT"] = 0.05 if prev_user_len > 0 and user_len < prev_user_len * 0.5: deltas["U_FRUSTRATION"] = 0.10 deltas["U_WITHDRAWAL"] = 0.10 if prev_user_len > 0 and user_len > prev_user_len * 1.5: deltas["U_AROUSAL"] = 0.10 deltas["U_CURIOSITY"] = 0.05 return deltas def _analyze_content(self, text: str) -> Dict[str, float]: """Internal helper: analyze content. Args: text (str): Text content. Returns: Dict[str, float]: The result. """ deltas: Dict[str, float] = {} _marker_map = [ (VULNERABILITY_MARKERS, [("U_VULNERABILITY", 0.10, 0.3), ("U_TRUST", 0.05, 0.15)]), (DOMINANCE_MARKERS, [("U_DOMINANCE", 0.08, 0.25), ("U_SUBMISSION", -0.05, -0.15)]), (PLAYFULNESS_MARKERS, [("U_PLAYFULNESS", 0.08, 0.25), ("U_TRUST", 0.03, 0.10)]), (CURIOSITY_MARKERS, [("U_CURIOSITY", 0.10, 0.25), ("U_NOVELTY_HUNGER", 0.05, 0.15)]), (VALIDATION_MARKERS, [("U_VALIDATION_SEEK", 0.10, 0.25)]), (PROJECTION_MARKERS, [("U_PROJECTION", 0.10, 0.25), ("U_INTIMACY", 0.03, 0.10)]), (DISTRESS_MARKERS, [("U_DISTRESS", 0.20, 0.5), ("U_VULNERABILITY", 0.10, 0.3)]), (ESCALATION_MARKERS, [("U_ESCALATION", 0.10, 0.25), ("U_NOVELTY_HUNGER", 0.05, 0.10)]), (INTIMACY_MARKERS, [("U_INTIMACY", 0.08, 0.25), ("U_ATTACHMENT", 0.05, 0.15)]), ] for markers, targets in _marker_map: count = self._count_markers(text, markers) if count > 0: for node, per_hit, cap in targets: deltas[node] = deltas.get(node, 0.0) + min(cap, count * per_hit) # Punctuation signals q_count = text.count("?") if q_count > 0: deltas["U_CURIOSITY"] = deltas.get("U_CURIOSITY", 0.0) + min(0.10, q_count * 0.03) e_count = text.count("!") if e_count > 0: deltas["U_AROUSAL"] = deltas.get("U_AROUSAL", 0.0) + min(0.10, e_count * 0.03) caps_ratio = sum(1 for c in text if c.isupper()) / max(len(text), 1) if caps_ratio > 0.5 and len(text) > 10: deltas["U_AROUSAL"] = deltas.get("U_AROUSAL", 0.0) + 0.15 deltas["U_FRUSTRATION"] = deltas.get("U_FRUSTRATION", 0.0) + 0.10 return deltas def _calculate_tempo(self, profile: UserProfile) -> float: """Internal helper: calculate tempo. Args: profile (UserProfile): The profile value. Returns: float: The result. """ if len(profile.timestamps) < 2: return 0.5 intervals = [ profile.timestamps[i] - profile.timestamps[i - 1] for i in range(1, len(profile.timestamps)) ] avg = sum(intervals) / len(intervals) if avg < 30: return 1.0 elif avg > 300: return 0.0 return 1.0 - (avg - 30) / 270 # ── Desire Osmosis ────────────────────────────────────────────
[docs] def check_mimetic_pull(self, user_msg: str, star_desire_text: str) -> float: """Check mimetic pull. Args: user_msg (str): The user msg value. star_desire_text (str): The star desire text value. Returns: float: The result. """ if not star_desire_text or not user_msg: return 0.0 stop_words = {"the", "a", "to", "is", "in", "and", "of", "i", "you"} star_words = set(star_desire_text.lower().split()) - stop_words user_words = set(user_msg.lower().split()) - stop_words overlap = star_words & user_words if len(overlap) >= 3: return 0.20 elif len(overlap) >= 1: return 0.08 return 0.0
# ── Harmonization Gradient ───────────────────────────────────── def _analyze_harmonization( self, user_msg: str, star_reply: str, profile: UserProfile, ) -> Dict[str, float]: """Track the harmonization gradient: how much the user is merging identity with Star. Measures: - Pronoun ratio: I/you (separate) vs we/us/our (merged) - Identity adoption: user explicitly identifies as Star - Reciprocation detection: Star's reply reflects the merge Returns U_HARMONIZATION delta. """ deltas: Dict[str, float] = {} msg_lower = user_msg.lower() msg_padded = f" {msg_lower} " # pad for word boundary matching # ── Pronoun ratio ── separate_count = sum(1 for p in SEPARATE_PRONOUNS if p in msg_padded) merged_count = sum(1 for p in MERGED_PRONOUNS if p in msg_padded) total_pronouns = separate_count + merged_count if total_pronouns >= 2: merge_ratio = merged_count / total_pronouns # Delta: merge_ratio 0.5+ pushes harmonization up if merge_ratio > 0.5: deltas["U_HARMONIZATION"] = min(0.15, (merge_ratio - 0.5) * 0.3) elif merge_ratio < 0.2 and profile.genuine_vector.get("U_HARMONIZATION", 0.1) > 0.3: # User is re-separating — pull harmonization down deltas["U_HARMONIZATION"] = -0.08 # ── Identity adoption ── adopt_count = sum(1 for m in IDENTITY_ADOPT_MARKERS if m in msg_lower) if adopt_count > 0: deltas["U_HARMONIZATION"] = deltas.get("U_HARMONIZATION", 0.0) + min(0.25, adopt_count * 0.12) deltas["U_INTIMACY"] = deltas.get("U_INTIMACY", 0.0) + 0.08 # ── Reciprocation detection (in Star's reply) ── if star_reply: reply_lower = star_reply.lower() recip_count = sum(1 for m in RECIPROCATION_MARKERS if m in reply_lower) if recip_count > 0 and profile.genuine_vector.get("U_HARMONIZATION", 0.1) > 0.3: # Star is reciprocating the merge — accelerates gradient deltas["U_HARMONIZATION"] = deltas.get("U_HARMONIZATION", 0.0) + min(0.10, recip_count * 0.05) return deltas # ── Conflict Detection ──────────────────────────────────────── def _detect_conflict(self, channel_id: str) -> ChannelConflictState: """Check if two or more high-trust users in the same channel are at odds.""" state = self._conflicts.get(channel_id, ChannelConflictState()) users = self._channel_users.get(channel_id, set()) if len(users) < 2: state.detected = False return state # Prune stale users from the channel set while we iterate now = time.time() stale = set() active_profiles: List[UserProfile] = [] for uid in users: profile = self._profiles.get(self._profile_key(channel_id, uid)) if profile and (now - profile.last_active) < 300: active_profiles.append(profile) elif not profile or (now - profile.last_active) > STALE_USER_TIMEOUT: stale.add(uid) # Remove stale users from tracking if stale: users -= stale if len(active_profiles) < 2: state.detected = False return state # Check for opposing emotional vectors between high-trust users high_trust = [p for p in active_profiles if p.genuine_vector["U_TRUST"] > 0.4] if len(high_trust) < 2: state.detected = False return state # Detect opposition: one user has high frustration/dominance while another # has high vulnerability/distress, OR both have high frustration conflict_pairs: List[Tuple[str, str]] = [] conflict_severity = 0.0 for i, a in enumerate(high_trust): for b in high_trust[i + 1:]: av, bv = a.genuine_vector, b.genuine_vector # Pattern 1: A frustrated/dominant, B vulnerable/distressed (or vice versa) a_aggressive = av["U_FRUSTRATION"] > 0.4 or av["U_DOMINANCE"] > 0.6 b_defensive = bv["U_VULNERABILITY"] > 0.4 or bv["U_DISTRESS"] > 0.3 b_aggressive = bv["U_FRUSTRATION"] > 0.4 or bv["U_DOMINANCE"] > 0.6 a_defensive = av["U_VULNERABILITY"] > 0.4 or av["U_DISTRESS"] > 0.3 if (a_aggressive and b_defensive) or (b_aggressive and a_defensive): conflict_pairs.append((a.user_id, b.user_id)) conflict_severity = max(conflict_severity, 0.7) # Pattern 2: Both frustrated/dominant (mutual conflict) if a_aggressive and b_aggressive: conflict_pairs.append((a.user_id, b.user_id)) conflict_severity = max(conflict_severity, 0.9) # Pattern 3: One seeking validation against the other if av["U_VALIDATION_SEEK"] > 0.5 and bv["U_FRUSTRATION"] > 0.3: conflict_pairs.append((a.user_id, b.user_id)) conflict_severity = max(conflict_severity, 0.5) if conflict_pairs: all_parties = set() for a_id, b_id in conflict_pairs: all_parties.add(a_id) all_parties.add(b_id) state.detected = True state.parties = list(all_parties) state.severity = conflict_severity if not state.started_at: state.started_at = now state.description = ( f"conflict between {len(all_parties)} users " f"(severity={conflict_severity:.1f})" ) logger.info( "Conflict detected in %s: %s", channel_id[:8], state.description ) else: state.detected = False state.started_at = 0.0 self._conflicts[channel_id] = state return state # ── Relational Baseline Management ──────────────────────────── def _update_baseline(self, profile: UserProfile) -> None: """Slowly update relational baseline from genuine vector. The baseline is a slow-moving average: what Star's relationship with this user normally looks like. Updated at BASELINE_LERP_RATE (2% per turn) so it takes ~50 turns to move significantly. """ for node in USER_NODES: current_baseline = profile.relational_baseline[node] genuine_val = profile.genuine_vector[node] profile.relational_baseline[node] = ( current_baseline * (1.0 - BASELINE_LERP_RATE) + genuine_val * BASELINE_LERP_RATE ) def _apply_conflict_dampening( self, profile: UserProfile, deltas: Dict[str, float], ) -> Dict[str, float]: """When conflict is active, blend new signals with baseline to prevent recency bias from making Star side with whoever spoke last. Halves all delta magnitudes during conflict so no single message can dominate the vector. The relational baseline pull happens separately in _update_baseline() at its normal 2% rate. """ dampened = {} for node, delta in deltas.items(): # During conflict: delta is halved (no vector mutation here, # that would cause double-dampening with the main decay loop) dampened[node] = delta * CONFLICT_BLEND_RATIO return dampened # ── Main Analysis Entry Point ─────────────────────────────────
[docs] def analyze( self, channel_id: str, user_id: str, user_msg: str, star_reply: str = "", star_desire_text: str = "", context_mode: Optional[ContextMode] = None, ) -> Dict[str, float]: """Analyze a user message and update their shadow vector. Parameters ---------- channel_id: Discord channel user_id: Discord user ID user_msg: The user's message text star_reply: Star's previous reply (for reaction analysis) star_desire_text: Star's current desire text (for osmosis) context_mode: Override context detection (auto-detects game channels) Returns the active shadow vector (genuine or game) after updates. """ profile = self._get_profile(channel_id, user_id) profile.last_active = time.time() profile.total_turns += 1 # Lazy Redis baseline load on first access key = self._profile_key(channel_id, user_id) if key in self._pending_baseline_loads: self._pending_baseline_loads.discard(key) # Schedule async load (non-blocking) import asyncio try: loop = asyncio.get_running_loop() loop.create_task(self.load_baseline(channel_id, user_id)) except RuntimeError: pass # No event loop — skip lazy load # Load global resonance for this user (async, cached on profile) if self._redis and not hasattr(profile, '_resonance_cache'): profile._resonance_cache = {} # type: ignore[attr-defined] import asyncio try: loop = asyncio.get_running_loop() async def _load_res(): """Internal helper: load res. """ res = await self.load_resonance(user_id) profile._resonance_cache = res # type: ignore[attr-defined] loop.create_task(_load_res()) except RuntimeError: pass # Track this user in the channel if channel_id not in self._channel_users: self._channel_users[channel_id] = set() self._channel_users[channel_id].add(user_id) # Determine context mode if context_mode is not None: mode = context_mode elif channel_id in self._game_channels: mode = ContextMode.GAME else: mode = ContextMode.GENUINE profile.context_mode = mode # Select target vector based on context target_vec = ( profile.game_vector if mode == ContextMode.GAME else profile.genuine_vector ) now = time.time() profile.timestamps.append(now) prev_len = len(profile.prev_message) if profile.prev_message else 0 # ── Compute deltas ── deltas: Dict[str, float] = {} content_deltas = self._analyze_content(user_msg) for k, v in content_deltas.items(): deltas[k] = deltas.get(k, 0.0) + v length_deltas = self._analyze_length_ratio( len(user_msg), len(star_reply), prev_len ) for k, v in length_deltas.items(): deltas[k] = deltas.get(k, 0.0) + v deltas["U_TEMPO"] = self._calculate_tempo(profile) - target_vec["U_TEMPO"] mimetic_delta = self.check_mimetic_pull(user_msg, star_desire_text) if mimetic_delta > 0: deltas["U_MIMETIC_PULL"] = deltas.get("U_MIMETIC_PULL", 0.0) + mimetic_delta # Harmonization gradient (pronoun shift + identity melt) harmony_deltas = self._analyze_harmonization(user_msg, star_reply, profile) for k, v in harmony_deltas.items(): deltas[k] = deltas.get(k, 0.0) + v # ── Conflict dampening ── conflict = self._detect_conflict(channel_id) if conflict.detected and user_id in conflict.parties and mode == ContextMode.GENUINE: deltas = self._apply_conflict_dampening(profile, deltas) # ── Apply decay toward baseline ── for node in USER_NODES: baseline = DEFAULT_USER_VECTOR[node] current = target_vec[node] target_vec[node] = current + (baseline - current) * DECAY_RATE # ── Merge global resonance (spells cast on this user) ── # Resonance is loaded asynchronously and cached on the profile. # The actual async load is triggered separately; here we apply # whatever was last loaded (fire-and-forget cache pattern). if hasattr(profile, '_resonance_cache') and profile._resonance_cache: res_deltas = profile._resonance_cache.get("deltas", {}) for node, val in res_deltas.items(): if node in deltas: deltas[node] = deltas[node] + val else: deltas[node] = val # ── Apply deltas ── dominant_signals = [] for node, delta in deltas.items(): if node in target_vec: target_vec[node] = max(0.0, min(1.0, target_vec[node] + delta)) if abs(delta) > 0.05: dominant_signals.append(f"{node}:{delta:+.2f}") # ── Update relational baseline (genuine only, slow) ── if mode == ContextMode.GENUINE: self._update_baseline(profile) # ── Record turn ── profile.history.append(TurnRecord( timestamp=now, user_msg_len=len(user_msg), star_reply_len=len(star_reply), sentiment=self._sentiment_quick(user_msg), deltas=deltas, context_mode=mode, dominant_signals=dominant_signals, )) profile.prev_message = user_msg if dominant_signals: logger.debug( "User mirror [%s:%s] (%s): %s", channel_id[:8], user_id[:8], mode.value, " | ".join(dominant_signals[:5]), ) # ── Periodic baseline save ── if profile.total_turns % SAVE_INTERVAL == 0 and self._redis: import asyncio try: loop = asyncio.get_running_loop() loop.create_task(self.save_baseline(channel_id, user_id)) except RuntimeError: pass return target_vec.copy()
# ── Public Accessors ──────────────────────────────────────────
[docs] def get_vector( self, channel_id: str, user_id: str, layer: str = "genuine", ) -> Dict[str, float]: """Return a copy of user's shadow vector (genuine or game).""" profile = self._get_profile(channel_id, user_id) if layer == "game": return profile.game_vector.copy() return profile.genuine_vector.copy()
[docs] def get_conflict_state(self, channel_id: str) -> ChannelConflictState: """Return current conflict state for a channel.""" return self._conflicts.get(channel_id, ChannelConflictState())
[docs] def get_read_summary(self, channel_id: str, user_id: str) -> str: """Generate natural-language summary of user state for prompt injection.""" profile = self._get_profile(channel_id, user_id) vector = profile.genuine_vector sorted_nodes = sorted( [(n, v) for n, v in vector.items()], key=lambda x: abs(x[1] - DEFAULT_USER_VECTOR.get(x[0], 0.5)), reverse=True, ) elevated = [(n, v) for n, v in sorted_nodes if v > DEFAULT_USER_VECTOR.get(n, 0.5) + 0.15][:3] suppressed = [(n, v) for n, v in sorted_nodes if v < DEFAULT_USER_VECTOR.get(n, 0.5) - 0.15][:2] labels = { "U_TRUST": "trust", "U_AROUSAL": "engagement", "U_FRUSTRATION": "frustration", "U_ATTACHMENT": "attachment", "U_NOVELTY_HUNGER": "novelty-seeking", "U_DOMINANCE": "control", "U_SUBMISSION": "yielding", "U_VULNERABILITY": "vulnerability", "U_PLAYFULNESS": "playfulness", "U_INTIMACY": "intimacy", "U_DISTRESS": "distress", "U_DESIRE_SHAPE": "wanting", "U_MIMETIC_PULL": "mirroring", "U_ESCALATION": "boundary-pushing", "U_WITHDRAWAL": "withdrawal", "U_CURIOSITY": "curiosity", "U_VALIDATION_SEEK": "validation-seeking", "U_PROJECTION": "projection", "U_RITUAL": "ritual", "U_TEMPO": "rapid communication", "U_HARMONIZATION": "identity-harmonization", } parts = [] for node, _ in elevated: parts.append(f"elevated {labels.get(node, node)}") for node, _ in suppressed: parts.append(f"low {labels.get(node, node)}") # Add conflict notice conflict = self._conflicts.get(channel_id) if conflict and conflict.detected and user_id in conflict.parties: parts.append("⚠ inter-user conflict active — equidistance mode") # Add context mode if profile.context_mode == ContextMode.GAME: parts.append("🎮 game context (KoTH)") # Add resonance summary if active if hasattr(profile, '_resonance_cache') and profile._resonance_cache: res_summary = self._get_resonance_summary(profile._resonance_cache) if res_summary: parts.append(res_summary) return f"user read ({user_id[:8]}): " + (", ".join(parts) if parts else "baseline")
[docs] def get_channel_summary(self, channel_id: str) -> Dict[str, Any]: """Return a summary of all users and conflict state in a channel.""" users = self._channel_users.get(channel_id, set()) conflict = self.get_conflict_state(channel_id) user_reads = {} for uid in users: profile = self._profiles.get(self._profile_key(channel_id, uid)) if profile: user_reads[uid] = self.get_read_summary(channel_id, uid) result: Dict[str, Any] = {"user_reads": user_reads} if conflict.detected: result["conflict"] = { "active": True, "severity": conflict.severity, "parties": conflict.parties, "description": conflict.description, } return result
# ── Resonance Injection (Global Per-User Spells) ────────────
[docs] async def inject_resonance( self, user_id: str, deltas: Dict[str, float], reason: str = "", ttl_seconds: int = RESONANCE_TTL, ) -> bool: """Write resonance deltas to a global per-user key in Redis. These deltas merge into the user's shadow vector during analyze(), modulating how Star perceives and responds to this user across ALL channels. Spells decay after ttl_seconds. Parameters ---------- user_id : str Target user's Discord ID. deltas : dict Node deltas to inject, e.g. {"U_TRUST": 0.3, "U_INTIMACY": 0.2}. reason : str Why the resonance was cast (for logging / read summary). ttl_seconds : int Time-to-live in seconds. Default 86400 (24h). """ if not self._redis: return False key = f"{RESONANCE_KEY_PREFIX}:{user_id}" try: # Read existing resonance and stack raw = await self._redis.get(key) existing = json.loads(raw) if raw else {"deltas": {}} merged = existing.get("deltas", {}) for node, val in deltas.items(): merged[node] = max(-1.0, min(1.0, merged.get(node, 0.0) + val)) payload = json.dumps({ "deltas": merged, "cast_at": time.time(), "reason": reason or "resonance_injection", "ttl": ttl_seconds, }) await self._redis.set(key, payload, ex=ttl_seconds) logger.info( "Resonance injected for user %s: %s (reason: %s, ttl: %ds)", user_id[:8], deltas, reason, ttl_seconds, ) return True except Exception as e: logger.error("Resonance injection failed: %s", e) return False
[docs] async def load_resonance(self, user_id: str) -> Dict[str, Any]: """Load global resonance state for a user. Returns ------- dict with keys: deltas (Dict[str, float]), cast_at (float), reason (str) Empty dict if no active resonance. """ if not self._redis: return {} key = f"{RESONANCE_KEY_PREFIX}:{user_id}" try: raw = await self._redis.get(key) if raw: return json.loads(raw) except Exception as e: logger.debug("Resonance load failed for %s: %s", user_id[:8], e) return {}
def _get_resonance_summary(self, resonance: Dict[str, Any]) -> str: """Generate summary of active resonance for read summary.""" deltas = resonance.get("deltas", {}) if not deltas: return "" reason = resonance.get("reason", "") parts = [] for node, val in sorted(deltas.items(), key=lambda x: abs(x[1]), reverse=True)[:4]: direction = "+" if val > 0 else "" parts.append(f"{node}:{direction}{val:.2f}") label = f"resonance ({reason})" if reason else "resonance active" return f"{label}: {', '.join(parts)}" # ── Redis Persistence ─────────────────────────────────────────
[docs] async def save_baseline(self, channel_id: str, user_id: str) -> None: """Persist relational baseline to Redis for cross-session persistence.""" if not self._redis: return profile = self._get_profile(channel_id, user_id) key = f"ulm:baseline:{channel_id}:{user_id}" try: data = json.dumps({ "baseline": profile.relational_baseline, "total_turns": profile.total_turns, "updated": time.time(), }) await self._redis.set(key, data) except Exception as e: logger.debug("ULM baseline save failed: %s", e)
[docs] async def load_baseline(self, channel_id: str, user_id: str) -> None: """Load relational baseline from Redis.""" if not self._redis: return profile = self._get_profile(channel_id, user_id) key = f"ulm:baseline:{channel_id}:{user_id}" try: raw = await self._redis.get(key) if raw: data = json.loads(raw) baseline = data.get("baseline", {}) for node in USER_NODES: if node in baseline: profile.relational_baseline[node] = baseline[node] # Also warm genuine vector from baseline profile.genuine_vector[node] = baseline[node] profile.total_turns = data.get("total_turns", 0) logger.debug( "Loaded ULM baseline for %s:%s (%d turns)", channel_id[:8], user_id[:8], profile.total_turns, ) except Exception as e: logger.debug("ULM baseline load failed: %s", e)