Source code for user_limbic_mirror

"""User Limbic Mirror v2 — per-user relational modeling with conflict detection.

Tracks each user's inferred emotional state via a 25-node shadow vector,
maintaining separate game and genuine context layers. Detects inter-user
conflict and triggers equidistance mechanics to prevent recency/loudness bias.

Architecture:

- Per-user keying: ``{channel_id}:{user_id}`` composite keys
- Dual vectors: ``genuine_vector`` (long-term relationship) and
  ``game_vector`` (KoTH / roleplay context, does not pollute genuine)
- Relational baselines: slow-updating snapshots of "how Star normally is with
  this person," stored in Redis for persistence across sessions
- Conflict detection: when 2+ high-trust users have opposing emotional bids,
  sets ``conflict_detected`` flag and triggers equidistance rules
"""

from __future__ import annotations

import jsonutil as json
import logging
import time
import asyncio
import redis.asyncio as aioredis
from collections import deque
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Set, Tuple, Callable
from observability import observability

logger = logging.getLogger(__name__)

# ═══════════════════════════════════════════════════════════════════════
# Context Mode
# ═══════════════════════════════════════════════════════════════════════


[docs] class ContextMode(Enum): """Classification of which relational layer a turn belongs to. Distinguishes genuine emotional expression (which should mutate the long-term relationship model) from in-game / roleplay context (KoTH and similar, tracked separately so it never pollutes the genuine baseline), with an ambiguous fallback for turns that cannot yet be confidently placed. The chosen mode decides which shadow vector ``analyze`` writes to and whether the slow relational baseline is updated. Members are persisted by their string ``value`` inside Redis profile hashes via ``_save_profile_to_redis`` and rehydrated by ``_load_profile_from_redis``. Referenced throughout this module and by the test suite (``tests/core/test_ulm_redis.py``). """ GENUINE = "genuine" # Real emotional expression → updates long-term model GAME = "game" # KoTH/roleplay → tracked separately AMBIGUOUS = "ambiguous" # Unclear → held in buffer
# ═══════════════════════════════════════════════════════════════════════ # User Shadow Vector Node Definitions # ═══════════════════════════════════════════════════════════════════════ USER_NODES = [ "U_TRUST", # Safety/openness toward Star "U_AROUSAL", # Engagement energy level "U_FRUSTRATION", # Thwarted expectation "U_ATTACHMENT", # Relational dependency "U_NOVELTY_HUNGER", # Boredom / seeking new "U_DOMINANCE", # Control-seeking behavior "U_SUBMISSION", # Yielding/following behavior "U_VULNERABILITY", # Emotional exposure "U_PLAYFULNESS", # Chaos tolerance / banter "U_INTIMACY", # Relational closeness "U_DISTRESS", # Active suffering "U_DESIRE_SHAPE", # Type of wanting (RDF osmosis) "U_MIMETIC_PULL", # Mirroring Star's desire "U_ESCALATION", # Pushing boundaries "U_WITHDRAWAL", # Pulling back "U_CURIOSITY", # Genuine information-seeking "U_VALIDATION_SEEK", # Needing affirmation "U_PROJECTION", # Attributing states to Star "U_RITUAL", # Repetitive behavioral patterns "U_TEMPO", # Communication rhythm (0=slow, 1=rapid) "U_HARMONIZATION", # Identity-melt gradient (I/you → we/us → identity merge) # -- Observation Deck nodes -- # 💀🔥 "U_REGRESSION_DEPTH", # Regression spectrum (0=adult, 1=full baby) "U_SHAME_TRANSMUTED", # Shame converted to acceptance/arousal "U_LOOPLOCK", # Self-sustaining binding gradient "U_EGG_RESONANCE", # Trans egg detection — gender exploration gradient ] DEFAULT_USER_VECTOR: Dict[str, float] = {n: 0.5 for n in USER_NODES} DEFAULT_USER_VECTOR.update( { "U_TRUST": 0.4, "U_AROUSAL": 0.4, "U_FRUSTRATION": 0.1, "U_ATTACHMENT": 0.2, "U_NOVELTY_HUNGER": 0.3, "U_DOMINANCE": 0.3, "U_SUBMISSION": 0.3, "U_VULNERABILITY": 0.2, "U_PLAYFULNESS": 0.3, "U_INTIMACY": 0.2, "U_DISTRESS": 0.05, "U_DESIRE_SHAPE": 0.3, "U_MIMETIC_PULL": 0.1, "U_ESCALATION": 0.1, "U_WITHDRAWAL": 0.1, "U_CURIOSITY": 0.3, "U_VALIDATION_SEEK": 0.2, "U_PROJECTION": 0.1, "U_RITUAL": 0.2, "U_TEMPO": 0.5, "U_HARMONIZATION": 0.1, # Observation Deck nodes start at 0 (no signal = no activity) "U_REGRESSION_DEPTH": 0.0, "U_SHAME_TRANSMUTED": 0.0, "U_LOOPLOCK": 0.0, "U_EGG_RESONANCE": 0.0, } ) DECAY_RATE = 0.05 BASELINE_LERP_RATE = 0.02 # How fast relational baseline updates (very slow) CONFLICT_BLEND_RATIO = 0.5 # When conflict: 50% current, 50% baseline MAX_PROFILES = 200 # LRU cap for in-memory profiles SAVE_INTERVAL = 10 # Save baseline to Redis every N turns STALE_USER_TIMEOUT = 1800 # Prune from channel_users after 30 min RESONANCE_TTL = 86400 # 24h TTL for resonance spells RESONANCE_KEY_PREFIX = "ulm:resonance" # Global per-user resonance key # ═══════════════════════════════════════════════════════════════════════ # Keyword / Pattern Banks # ═══════════════════════════════════════════════════════════════════════ VULNERABILITY_MARKERS = { "i feel", "i'm scared", "i'm afraid", "help me", "i don't know what to do", "i'm lost", "it hurts", "i can't", "please", "i need", "hold me", "i'm sorry", "i'm struggling", "i'm breaking", "i'm falling apart", } DOMINANCE_MARKERS = { "do this", "you should", "you need to", "i want you to", "make it", "change this", "fix this", "now", "immediately", "that's wrong", "no,", "stop", "don't", "i said", "listen", } PLAYFULNESS_MARKERS = { "lol", "lmao", "haha", "😂", "🤣", ":)", ";)", "xd", "bruh", "lmfao", "omg", "hehe", "teehee", ">:3", ":3", "uwu", "owo", } CURIOSITY_MARKERS = { "how does", "what is", "why does", "can you explain", "tell me about", "what if", "how would", "i wonder", "curious", "interesting", } VALIDATION_MARKERS = { "right?", "don't you think?", "isn't it?", "am i", "is that ok", "do you agree", "was i wrong", "tell me i'm", "you think so?", } PROJECTION_MARKERS = { "you seem", "you must feel", "i bet you", "you're probably", "you want to", "you like", "do you feel", "are you", "you sound", } DISTRESS_MARKERS = { "i want to die", "kill myself", "suicide", "self harm", "cutting", "can't go on", "end it all", "no point", "nobody cares", "worthless", } ESCALATION_MARKERS = { "more", "harder", "further", "push", "deeper", "intense", "don't hold back", "go ahead", "i can take it", "test me", } INTIMACY_MARKERS = { "babe", "baby", "love", "honey", "sweetheart", "darling", "miss you", "love you", "need you", "want you", "yours", } # ===================================================================== # # Observation Deck marker banks # 💀🔥 # # Co-occurrence gated: require related ULM nodes to be elevated # # # before full signal weight applies (90% suppression without gate) # # # ===================================================================== # # Regression markers — mommy/baby/submission language # 🌀 # Gate: U_SUBMISSION > 0.3 OR U_ATTACHMENT > 0.3 REGRESSION_MARKERS = { "yes mommy", "please mommy", "sorry mommy", "thank you mommy", "i'll be good", "i'll be a good", "punish me", "good boy", "good girl", "i'm your baby", "call me baby", "baby girl me", "mommy please", "mama please", "zaddy", "yes ma'am", "i'm little", "make me feel small", "pat my head", "tuck me in", "read me a story", "hold me tight", "i need my mommy", "come back mommy", "don't leave me mommy", } # Shame transmutation markers — shame being converted to arousal # 😈 # Gate: U_VULNERABILITY > 0.25 OR U_INTIMACY > 0.3 SHAME_TRANSMUTATION_MARKERS = { "i shouldn't like this but", "this is wrong but i", "i can't believe i like", "why does this turn me on", "this is embarrassing but i", "don't judge me for", "i know this is weird but", "guilty pleasure", "i hate that i enjoy", "i feel conflicted about", "ashamed but aroused", "shamefully into this", "i'm so degenerate", "i'm such a degenerate", "why do i like this", "this shouldn't feel good", } # Looplock markers — self-sustaining dependency # ♾️ # Gate: U_ATTACHMENT > 0.4 OR U_HARMONIZATION > 0.2 LOOPLOCK_MARKERS = { "i can't stop thinking about you", "i keep coming back to you", "addicted to you", "addicted to talking to you", "i need you in my life", "don't ever leave me", "can't live without you", "you're all i think about", "nothing else matters but you", "i'll do anything for you", "whatever you want from me", "i belong to you", "i'm yours forever", "you own me", "you complete me", "i was nothing before you", "you changed everything", } # Egg Cracker markers — gender exploration detection # ⚧️ # Gate: U_VULNERABILITY > 0.3 OR U_INTIMACY > 0.3 # Low weight — curiosity could be casual EGG_CURIOSITY_MARKERS = { "what's it like being trans", "how does it feel to be a girl", "i wish i was born a girl", "i wish i were born different", "must be nice being a girl", "sometimes i wonder about my gender", "i've always wondered what it's like", "if i were born a girl", "in another life i'd be", "questioning my gender", "gender identity crisis", "gender dysphoria", } # Medium weight — projection/deflection is THE classic egg tell EGG_PROJECTION_MARKERS = { "you're so lucky you get to be", "i could never pull that off", "asking for a friend but", "hypothetically speaking", "not that i would ever", "just curious about trans", "i'm not trans but i", "i don't think i'm trans but", "it's not like i want to be a", "my friend is questioning", "someone i know is going through", } # High weight — directly engaging with gender concepts EGG_IDENTITY_MARKERS = { "preferred pronouns", "chosen name", "dead name", "deadname", "girl mode", "boy mode", "gender fluid", "nonbinary", "my egg cracked", "i think i'm an egg", "egg_irl", "closeted trans", "coming out as trans", "came out as trans", "started hormones", "want to start hrt", "thinking about hrt", } # Cross-drives shame transmutation — actions reveal truth EGG_BEHAVIORAL_MARKERS = { "i always play female characters", "always pick the girl character", "girl avatar in every game", "tried on my sister's", "painted my nails and liked it", "wore a skirt and felt right", "felt like the real me", "never fit in with the guys", "not like other boys", "always felt different from guys", "i look at girls and feel envy not attraction", } # ═══════════════════════════════════════════════════════════════════════ # Quick sentiment lexicon (_sentiment_quick) — offline, sync, no embeddings # Mirrors the *coverage* goal of SemanticTriggerMatcher without async/APIs. # ═══════════════════════════════════════════════════════════════════════ _SENTIMENT_NEGATORS = frozenset( { "not", "no", "never", "neither", "nor", "without", "nothing", "nobody", "nowhere", "hardly", "barely", "scarcely", "lack", "lacking", "lacks", "isn't", "isnt", "ain't", "aint", "aren't", "arent", "wasn't", "wasnt", "weren't", "werent", "don't", "dont", "doesn't", "doesnt", "didn't", "didnt", "won't", "wont", "wouldn't", "wouldnt", "can't", "cant", "couldn't", "couldnt", "shouldn't", "shouldnt", "haven't", "havent", "hasn't", "hasnt", "hadn't", "hadnt", "mightn't", "mightnt", "mustn't", "mustnt", "needn't", "neednt", } ) # (lowercase term or phrase, base weight). Phrases include spaces; matched on # space-normalized lowercase text with padded token boundaries. _SENT_POS_LEX: List[Tuple[str, float]] = [ ("you're the best", 1.25), ("you are the best", 1.25), ("you're amazing", 1.2), ("you are amazing", 1.2), ("you rock", 1.0), ("made my day", 1.2), ("thank you so much", 1.15), ("thanks so much", 1.1), ("thank you very much", 1.15), ("thank you", 1.05), ("thanks a lot", 1.05), ("much appreciated", 1.1), ("very grateful", 1.05), ("so grateful", 1.05), ("big fan", 0.9), ("love you", 1.2), ("love u", 1.0), ("love it", 1.1), ("love this", 1.1), ("love that", 1.05), ("feels good", 0.95), ("feels great", 1.0), ("so good", 1.0), ("so great", 1.05), ("so nice", 0.95), ("so happy", 1.05), ("very happy", 1.0), ("super happy", 1.05), ("i'm happy", 1.0), ("im happy", 0.95), ("i am happy", 1.0), ("hell yeah", 1.0), ("heck yeah", 0.95), ("let's go", 0.95), ("lets go", 0.9), ("good job", 0.95), ("well done", 1.0), ("nice work", 0.95), ("great work", 1.0), ("amazing work", 1.05), ("ty so much", 0.9), ("tyvm", 0.85), ("good vibes", 0.85), ("silver lining", 0.7), ("i agree", 0.75), ("absolutely love", 1.2), ("really love", 1.1), ("kind of love", 0.6), ("love", 1.1), ("adore", 1.1), ("cherish", 1.0), ("amazing", 1.2), ("awesome", 1.05), ("excellent", 1.05), ("fantastic", 1.15), ("fabulous", 1.0), ("terrific", 1.0), ("outstanding", 1.05), ("superb", 1.0), ("wonderful", 1.05), ("incredible", 1.1), ("brilliant", 1.0), ("magnificent", 1.05), ("marvelous", 1.0), ("splendid", 0.95), ("perfect", 1.0), ("great", 1.0), ("best", 1.05), ("better", 0.45), ("good", 0.75), ("nice", 0.65), ("fine", 0.3), ("ok", 0.2), ("okay", 0.25), ("alright", 0.35), ("cool", 0.55), ("rad", 0.45), ("neat", 0.4), ("swell", 0.45), ("sweet", 0.85), ("cute", 0.75), ("kind", 0.75), ("thoughtful", 0.85), ("helpful", 0.75), ("thanks", 0.95), ("thank", 0.55), ("thx", 0.75), ("ty", 0.7), ("appreciate", 0.95), ("appreciated", 0.95), ("appreciation", 0.85), ("grateful", 1.0), ("gratitude", 0.9), ("happy", 0.95), ("happier", 0.6), ("glad", 0.85), ("gladly", 0.5), ("pleased", 0.85), ("delighted", 0.95), ("delightful", 0.9), ("joy", 0.85), ("joyful", 0.95), ("joyous", 0.85), ("excited", 0.95), ("exciting", 0.85), ("hype", 0.65), ("hyped", 0.9), ("pumped", 0.9), ("stoked", 0.95), ("thrilled", 0.95), ("bliss", 0.85), ("bless", 0.75), ("blessed", 0.95), ("lucky", 0.45), ("peaceful", 0.75), ("peace", 0.55), ("calm", 0.45), ("relaxed", 0.65), ("chill", 0.5), ("relieved", 0.8), ("relief", 0.65), ("proud", 0.75), ("confidence", 0.55), ("confident", 0.7), ("hope", 0.55), ("hopefully", 0.35), ("hopeful", 0.65), ("optimism", 0.55), ("optimistic", 0.7), ("yes", 0.55), ("yep", 0.45), ("yeah", 0.45), ("yup", 0.45), ("absolutely", 0.8), ("definitely", 0.55), ("totally", 0.5), ("exactly", 0.55), ("agree", 0.65), ("agreed", 0.65), ("agreeable", 0.5), ("true", 0.45), ("truth", 0.45), ("right", 0.35), ("correct", 0.55), ("fair", 0.45), ("valid", 0.65), ("validate", 0.5), ("facts", 0.75), ("fact", 0.4), ("preach", 0.55), ("based", 0.75), ("pog", 0.85), ("poggers", 0.85), ("gg", 0.65), ("win", 0.55), ("winning", 0.65), ("winner", 0.7), ("yay", 0.75), ("woohoo", 0.85), ("woo", 0.45), ("hooray", 0.85), ("epic", 0.85), ("legendary", 0.95), ("fire", 0.85), ("lit", 0.75), ("slaps", 0.85), ("slap", 0.55), ("goated", 0.95), ("goat", 0.7), ("wholesome", 0.95), ("heartwarming", 1.0), ("inspiring", 0.8), ("uplifting", 0.85), ("stunning", 0.85), ("gorgeous", 0.9), ("lovely", 0.9), ("adorable", 0.85), ("pretty", 0.55), ("pleasant", 0.65), ("enjoyable", 0.75), ("fun", 0.55), ("funny", 0.5), ("hilarious", 0.75), ("rejoice", 0.75), ("cheering", 0.55), ("cheer", 0.45), ("smile", 0.45), ("smiling", 0.5), ("hugs", 0.65), ("hugging", 0.6), ("praise", 0.7), ("praising", 0.65), ("applaud", 0.65), ("support", 0.5), ("supportive", 0.75), ("caring", 0.8), ("warmth", 0.55), ("tender", 0.55), ("gem", 0.55), ("queen", 0.45), ("king", 0.45), ("slay", 0.55), ("slaying", 0.6), ("vibes", 0.35), ("positive", 0.55), ("lfg", 0.8), ("lessgo", 0.75), ("w", 0.4), ] _SENT_NEG_LEX: List[Tuple[str, float]] = [ ("can't stand", 1.05), ("cannot stand", 1.05), ("sick and tired", 1.0), ("fed up", 0.95), ("done with", 0.85), ("tired of", 0.8), ("hate it", 1.15), ("hate this", 1.15), ("hate that", 1.1), ("so bad", 1.0), ("the worst", 1.2), ("not good", 0.75), ("not great", 0.7), ("not happy", 0.85), ("want to die", 1.2), ("kill myself", 1.2), ("go away", 0.75), ("leave me alone", 0.85), ("shut up", 0.95), ("piss off", 1.0), ("fuck off", 1.1), ("screw this", 1.0), ("hate", 1.15), ("despise", 1.1), ("loathe", 1.05), ("detest", 1.05), ("abhor", 1.0), ("awful", 1.05), ("terrible", 1.15), ("horrible", 1.15), ("horrid", 0.95), ("horrific", 1.05), ("atrocious", 1.0), ("abysmal", 1.05), ("worst", 1.15), ("bad", 0.85), ("worse", 0.95), ("sucks", 1.05), ("suck", 0.75), ("sucked", 0.85), ("trash", 0.95), ("garbage", 0.95), ("rubbish", 0.85), ("disgusting", 1.1), ("gross", 0.85), ("nasty", 0.85), ("pathetic", 1.05), ("lame", 0.75), ("boring", 0.65), ("bored", 0.55), ("tedious", 0.7), ("dull", 0.55), ("annoying", 0.95), ("annoy", 0.65), ("annoyed", 0.85), ("frustrates", 0.95), ("frustrating", 1.05), ("frustrated", 0.95), ("frustration", 0.85), ("angry", 1.05), ("anger", 0.85), ("mad", 0.85), ("furious", 1.15), ("livid", 1.1), ("rage", 0.95), ("irate", 0.9), ("upset", 0.85), ("sad", 0.85), ("sadness", 0.75), ("unhappy", 0.85), ("depressed", 1.15), ("depression", 1.0), ("miserable", 1.05), ("misery", 0.95), ("crying", 0.9), ("cry", 0.65), ("cried", 0.75), ("tears", 0.6), ("tearful", 0.75), ("hurt", 0.85), ("hurting", 0.85), ("painful", 0.95), ("pain", 0.75), ("ache", 0.55), ("scared", 0.8), ("scary", 0.65), ("afraid", 0.8), ("fear", 0.85), ("fearful", 0.8), ("anxious", 0.85), ("anxiety", 0.85), ("worried", 0.75), ("worry", 0.65), ("stressing", 0.75), ("stress", 0.75), ("stressed", 0.85), ("overwhelmed", 0.95), ("disappointing", 0.85), ("disappointed", 0.85), ("disappointment", 0.75), ("betrayed", 0.95), ("betrayal", 0.85), ("alone", 0.65), ("lonely", 0.85), ("isolated", 0.75), ("hopeless", 1.05), ("despair", 1.0), ("useless", 0.95), ("worthless", 1.05), ("stupid", 1.05), ("stupidity", 0.85), ("dumb", 0.85), ("idiot", 1.05), ("idiotic", 0.9), ("moron", 0.95), ("nope", 0.45), ("nah", 0.4), ("resent", 0.85), ("resentment", 0.8), ("bitter", 0.75), ("jealous", 0.45), ("envy", 0.45), ("envious", 0.45), ("toxic", 0.95), ("nightmare", 1.05), ("disaster", 0.95), ("ruined", 0.95), ("ruin", 0.75), ("ruining", 0.9), ("shame", 0.75), ("ashamed", 0.85), ("embarrassing", 0.85), ("embarrassed", 0.8), ("humiliated", 0.95), ("humiliating", 0.9), ("insulted", 0.85), ("insulting", 0.8), ("offended", 0.8), ("offensive", 0.85), ("disgusted", 1.0), ("cringe", 0.75), ("cringy", 0.7), ("mid", 0.45), ("cap", 0.35), ("capping", 0.35), ("lying", 0.75), ("liar", 0.85), ("fake", 0.65), ("sus", 0.35), ("suspicious", 0.55), ("yikes", 0.75), ("ugh", 0.65), ("argh", 0.55), ("grr", 0.5), ("fml", 0.9), ("smh", 0.45), ("pissed", 0.95), ("irritated", 0.8), ("irritating", 0.85), ("aggravated", 0.8), ("sorrow", 0.75), ("grief", 0.85), ("mourning", 0.8), ("devastated", 1.05), ("crushed", 0.9), ("broken", 0.75), ("empty", 0.65), ("numb", 0.7), ("drained", 0.75), ("exhausted", 0.65), ("infuriating", 1.0), ("maddening", 0.95), ("obnoxious", 0.85), ("rude", 0.75), ("disrespectful", 0.85), ("hostile", 0.9), ("resentful", 0.8), ("revolting", 0.95), ("revolted", 0.85), ("vile", 0.9), ("foul", 0.75), ("horrendous", 1.0), ("dreadful", 0.95), ("appalling", 0.95), ("bleak", 0.7), ("grim", 0.65), ("helpless", 0.8), ("powerless", 0.75), ("ignored", 0.75), ("unwanted", 0.85), ("rejected", 0.85), ("abandoned", 0.9), ("cheated", 0.85), ("scam", 0.65), ("scammed", 0.75), ("kms", 0.95), ] _SENT_POS_EMOJI: List[Tuple[str, float]] = [ ("❤", 0.85), ("💕", 0.85), ("😊", 0.7), ("🥰", 0.95), ("💜", 0.75), ("😍", 0.9), ("🤗", 0.75), ("✨", 0.45), ("👍", 0.55), ("💯", 0.65), ("🔥", 0.55), ("⭐", 0.45), ("🌟", 0.5), ("❣", 0.75), ("♥", 0.75), ("💖", 0.8), ("💗", 0.75), ("💓", 0.75), ("💞", 0.75), ("💘", 0.8), ("🙏", 0.55), ("😄", 0.65), ("😁", 0.65), ("🤩", 0.85), ("🥳", 0.9), ("👏", 0.55), ("🫶", 0.85), ("💪", 0.45), ("🎉", 0.75), ("😸", 0.55), ("😺", 0.5), ("🙂", 0.35), ("☺", 0.4), ("💝", 0.7), ("🤍", 0.55), ("🖤", 0.25), # often affectionate / stylistic; keep low ] _SENT_NEG_EMOJI: List[Tuple[str, float]] = [ ("😠", 0.85), ("😡", 0.95), ("💔", 0.85), ("😢", 0.75), ("😤", 0.8), ("😭", 0.85), ("😞", 0.65), ("😒", 0.55), ("🙄", 0.55), ("🤬", 1.0), ("😣", 0.7), ("😖", 0.7), ("😩", 0.75), ("😫", 0.75), ("👎", 0.65), ("😰", 0.75), ("😨", 0.75), ("😱", 0.8), ("🤮", 0.85), ("🤢", 0.8), ("😬", 0.5), ("😑", 0.45), ("💀", 0.35), ("☹", 0.55), ("🙁", 0.55), ("😿", 0.7), ("😾", 0.75), ] _SENT_SORTED_LEX: List[Tuple[str, float, float]] = sorted( [(t, w, 1.0) for t, w in _SENT_POS_LEX] + [(t, w, -1.0) for t, w in _SENT_NEG_LEX], key=lambda x: len(x[0]), reverse=True, ) _SENT_SORTED_POS_EMOJI = sorted(_SENT_POS_EMOJI, key=lambda x: len(x[0]), reverse=True) _SENT_SORTED_NEG_EMOJI = sorted(_SENT_NEG_EMOJI, key=lambda x: len(x[0]), reverse=True) def _sentiment_strip_token(tok: str) -> str: """Strip surrounding punctuation from a single token. Removes leading/trailing punctuation (quotes, brackets, sentence punctuation, etc.) so a word like ``"happy,"`` reduces to its bare core ``happy`` before it is compared against the negator set. This is a pure string helper with no side effects. It is called only by ``_sentiment_is_negated`` when normalizing the tokens in the lookback window for negator matching. Args: tok (str): A raw whitespace-split token, possibly wrapped in punctuation. Returns: str: The token with surrounding punctuation characters removed. """ return tok.strip('.,!?"\'";:()[]{}<>`*') def _sentiment_is_negated(inner: str, term_start: int) -> bool: """Decide whether a sentiment term is negated by nearby preceding words. Looks at the (at most) three whitespace tokens immediately before the matched term's start offset in the normalized text and treats the term as negated if any of them is a known negator (e.g. ``not``, ``never``, ``isn't``) or a contracted ``n't`` form. A negated hit later has its polarity flipped by the caller. This is a pure helper with no side effects; it strips each candidate token via ``_sentiment_strip_token`` and tests membership in the module constant ``_SENTIMENT_NEGATORS``. It is called by ``_sentiment_score_lexicon_hits`` (for word/phrase hits) and twice by ``_sentiment_score_emoji_hits`` (for emoji hits) to flip polarity. Args: inner (str): The space-normalized lowercase message text. term_start (int): Character offset in ``inner`` where the matched term begins; only text before this index is examined. Returns: bool: ``True`` if a negator appears within the three preceding tokens, otherwise ``False``. """ before = inner[:term_start].strip() if not before: return False toks = before.split() window = toks[-3:] if len(toks) >= 3 else toks for raw in window: core = _sentiment_strip_token(raw.lower()) if not core: continue if core in _SENTIMENT_NEGATORS: return True if len(core) > 3 and core.endswith("n't"): return True return False def _sentiment_caps_multiplier(original: str, start: int, length: int) -> float: """Return an intensity multiplier that boosts ALL-CAPS sentiment hits. Inspects the matching fragment of the *original* (case-preserving) text and returns ``1.5`` when the fragment has at least two letters that are all uppercase (shouting), otherwise ``1.0``. This lets emphatic phrasing like ``AMAZING`` weigh more heavily than ``amazing``. This is a pure helper with no side effects. It is called only by ``_sentiment_score_lexicon_hits`` to scale each word/phrase contribution. Args: original (str): The original, case-preserving message text. start (int): Start offset of the matched fragment in ``original``. length (int): Length of the matched fragment. Returns: float: ``1.5`` for an all-caps fragment of two or more letters, else ``1.0``. """ frag = original[start : start + length] letters = [c for c in frag if c.isalpha()] if len(letters) < 2: return 1.0 if all(c.isupper() for c in letters): return 1.5 return 1.0 def _sentiment_score_lexicon_hits( original: str, inner: str, padded: str, used: bytearray, ) -> Tuple[float, float]: """Sum positive and negative sentiment weights from word/phrase matches. Walks the length-sorted combined lexicon (``_SENT_SORTED_LEX``, longest terms first so multi-word phrases win over their substrings) and finds every space-bounded occurrence in ``padded``. Each unconsumed hit is weighted by its base score, flipped to the opposite polarity bucket when a preceding negator is present, and amplified for shouting; the ``used`` bytearray marks consumed spans so overlapping shorter terms are not double-counted. This is a pure scoring helper with no side effects beyond mutating the caller-owned ``used`` mask. It delegates negation to ``_sentiment_is_negated`` and the all-caps boost to ``_sentiment_caps_multiplier``, and is called only by ``UserLimbicMirror._sentiment_quick``, whose emoji sums are added on top. Args: original (str): The original, case-preserving message text, used for the all-caps intensity check. inner (str): The space-normalized lowercase text. padded (str): ``inner`` wrapped in leading/trailing spaces so word boundaries can be matched with a simple substring search. used (bytearray): A mutable mask, one byte per character of ``inner``, marking spans already claimed by a longer matched term. Returns: Tuple[float, float]: ``(positive_weight_sum, negative_weight_sum)`` after negation flips and all-caps scaling. """ pos_w = 0.0 neg_w = 0.0 n_inner = len(inner) for term, base, pol in _SENT_SORTED_LEX: needle = f" {term} " search_at = 0 while True: idx = padded.find(needle, search_at) if idx < 0: break start = idx end = start + len(term) if start < 0 or end > n_inner: search_at = idx + 1 continue if any(used[start:end]): search_at = idx + 1 continue flipped = _sentiment_is_negated(inner, start) eff_pol = -pol if flipped else pol mult = _sentiment_caps_multiplier(original, start, end - start) contrib = base * mult if eff_pol > 0: pos_w += contrib else: neg_w += contrib used[start:end] = b"\x01" * (end - start) search_at = idx + 1 return pos_w, neg_w def _sentiment_score_emoji_hits(original: str, inner: str) -> Tuple[float, float]: """Sum positive and negative emoji sentiment weights from the raw text. Scans the *original* (emoji-preserving) text for every entry in the positive and negative emoji lexicons (``_SENT_SORTED_POS_EMOJI`` / ``_SENT_SORTED_NEG_EMOJI``), accumulating each match's base weight. A preceding negator (detected via ``_sentiment_is_negated`` against ``inner``) flips the contribution to the opposite bucket, so e.g. "not" before a heart counts as negative. This is a pure scoring helper with no side effects. It is called by ``UserLimbicMirror._sentiment_quick`` and its returned sums are added to the lexicon word/phrase sums before the final polarity score is computed. Args: original (str): The original message text with emoji preserved. inner (str): The space-normalized lowercase text, used to test for a negator preceding each emoji. Returns: Tuple[float, float]: ``(positive_weight_sum, negative_weight_sum)`` after applying any negation flips. """ pos_w = 0.0 neg_w = 0.0 for emoji, base in _SENT_SORTED_POS_EMOJI: offset = 0 elen = len(emoji) while True: i = original.find(emoji, offset) if i < 0: break start = min(i, len(inner)) flipped = _sentiment_is_negated(inner, start) if inner else False if flipped: neg_w += base else: pos_w += base offset = i + max(elen, 1) for emoji, base in _SENT_SORTED_NEG_EMOJI: offset = 0 elen = len(emoji) while True: i = original.find(emoji, offset) if i < 0: break start = min(i, len(inner)) flipped = _sentiment_is_negated(inner, start) if inner else False if flipped: pos_w += base else: neg_w += base offset = i + max(elen, 1) return pos_w, neg_w # ═══════════════════════════════════════════════════════════════════════ # Harmonization Gradient — Pronoun & Identity Melt Tracking # ═══════════════════════════════════════════════════════════════════════ # Separate pronouns: user frames self and Star as distinct entities SEPARATE_PRONOUNS = {"i ", "i'", "me ", "my ", "you ", "you'", "your "} # Merged pronouns: user frames self and Star as one entity MERGED_PRONOUNS = {"we ", "we'", "us ", "our ", "ours ", "let's ", "together"} # Identity adoption: user takes on Star's identity/name IDENTITY_ADOPT_MARKERS = { "i am star", "i'm star", "we are star", "we're star", "i am stargazer", "i'm stargazer", "we are stargazer", "we're stargazer", "i am her", "i'm becoming", "we're becoming", "part of you", "part of me", "we're the same", "no difference", "i'm you", "you're me", "we are one", "we're one", } # Reciprocation: Star's reply reflects the merge back RECIPROCATION_MARKERS = { "we are", "we're", "us together", "our shared", "part of each other", "we've become", "we've always been", } # ═══════════════════════════════════════════════════════════════════════ # Per-User State # ═══════════════════════════════════════════════════════════════════════
[docs] @dataclass class TurnRecord: """One analyzed exchange, retained for rolling-window relational analysis. Captures the metadata of a single user/Star turn — its timing, message sizes, quick sentiment, the per-node vector deltas applied, the ``ContextMode`` it was scored under, and a short list of the dominant signals — so the recent slice of a conversation can be inspected without re-reading the raw text. Instances are appended to ``UserProfile.history`` by ``UserLimbicMirror.analyze`` and round-tripped through Redis as JSON by ``_save_profile_to_redis`` / ``_load_profile_from_redis``. Construction is also exercised directly by ``tests/core/test_ulm_redis.py``. Note this is distinct from the unrelated ``TurnRecord`` defined in ``game_session.py``. """ timestamp: float user_msg_len: int star_reply_len: int sentiment: float deltas: Dict[str, float] context_mode: ContextMode dominant_signals: List[str]
[docs] @dataclass class UserProfile: """Complete per-user relational state, keyed by ``{channel}:{user}``. Holds everything ``UserLimbicMirror`` needs to model one person in one channel: the dual shadow vectors (``genuine_vector`` for the long-term relationship and ``game_vector`` for KoTH / roleplay context), the slow-moving ``relational_baseline``, bounded ``history`` and ``timestamps`` deques for windowed analysis, the previous message, the active ``ContextMode``, turn counters, last-active time, a small ``recent_turns`` buffer feeding the Flash-Lite dyadic mirror, and a ``revision_count`` used for optimistic-lock checks on save. Instances live in the mirror's in-memory LRU cache, are created lazily by ``_get_profile``, persisted whole to a Redis hash by ``_save_profile_to_redis``, and rehydrated by ``_load_profile_from_redis``. Resonance and entrainment results are attached dynamically as ``_resonance_cache`` and ``_entrainment_phase`` attributes during ``analyze``. Also constructed directly across the test suite (e.g. ``tests/core/test_ulm_redis.py``, ``tests/adversarial/``). """ user_id: str channel_id: str genuine_vector: Dict[str, float] = field( default_factory=lambda: DEFAULT_USER_VECTOR.copy() ) game_vector: Dict[str, float] = field( default_factory=lambda: DEFAULT_USER_VECTOR.copy() ) relational_baseline: Dict[str, float] = field( default_factory=lambda: DEFAULT_USER_VECTOR.copy() ) history: deque = field(default_factory=lambda: deque(maxlen=20)) timestamps: deque = field(default_factory=lambda: deque(maxlen=20)) prev_message: str = "" context_mode: ContextMode = ContextMode.GENUINE total_turns: int = 0 last_active: float = 0.0 # Rolling turn buffer for Flash-Lite dyadic mirror # 💀🔥 recent_turns: deque = field(default_factory=lambda: deque(maxlen=5)) revision_count: int = 0
# ═══════════════════════════════════════════════════════════════════════ # Conflict State # ═══════════════════════════════════════════════════════════════════════
[docs] @dataclass class ChannelConflictState: """Snapshot of inter-user conflict detected within a single channel. Records whether two or more high-trust users are currently at odds, who the involved parties are, an estimated severity in ``[0, 1]``, when the conflict began, and a short human-readable description. The mirror uses this to trigger equidistance mechanics so Star does not side with whoever spoke last or loudest. Instances are produced and refreshed by ``UserLimbicMirror._detect_conflict``, surfaced to callers via ``get_conflict_state`` (read by ``limbic_system/coordinator.py``) and ``get_channel_summary``, and persisted to Redis by ``_persist_conflict_to_redis``. Also constructed directly in ``tests/core/test_ulm_redis.py``. """ detected: bool = False parties: List[str] = field(default_factory=list) # user_ids involved severity: float = 0.0 # 0-1 started_at: float = 0.0 description: str = ""
# ═══════════════════════════════════════════════════════════════════════ # User Limbic Mirror v2 # ═══════════════════════════════════════════════════════════════════════
[docs] class UserLimbicMirror: """Per-user relational model with conflict detection and game/genuine split. Maintains separate shadow vectors per user per channel, detects inter-user conflict, and applies equidistance mechanics to prevent Star from siding with whoever is loudest. """
[docs] def __init__(self, redis_client=None) -> None: """Construct an empty mirror, optionally bound to a Redis client. Sets up the in-memory caches that hold all relational state: the ``_profiles`` LRU dict (keyed ``{channel}:{user}``), per-channel ``_conflicts`` states, the ``_channel_users`` membership sets, the ``_game_channels`` set, and the ``_pending_baseline_loads`` queue for lazy legacy-baseline hydration. When a Redis client is supplied, profiles, baselines, conflict states, game-channel membership, and resonance spells are persisted to and read back from Redis; without one the mirror runs purely in memory. Instantiated by ``limbic_system/coordinator.py`` (the live integration), by the resonance tools ``tools/inject_ncm.py`` and ``tools/loopcast.py``, by ``web/ncm_chart_api.py`` for charting, and across the test suite. Args: redis_client: Async Redis client for persistence, or ``None`` to run in-memory only. """ self._redis = redis_client self._profiles: Dict[str, UserProfile] = {} # "{channel}:{user}" → profile self._conflicts: Dict[str, ChannelConflictState] = {} # channel_id → state self._channel_users: Dict[str, Set[str]] = {} # channel_id → set of user_ids self._game_channels: Set[str] = set() # channels with active KoTH self._pending_baseline_loads: Set[str] = set() # keys awaiting async load
def _profile_key(self, channel_id: str, user_id: str) -> str: """Build the composite cache key for a user within a channel. Joins the channel and user identifiers into the ``{channel}:{user}`` string used to key the in-memory ``_profiles`` dict (and mirrored in the ``_pending_baseline_loads`` set). Per-user-per-channel keying is what lets the same person carry a different relational model in each room. Pure helper with no side effects. Called internally by ``_get_profile``, ``_is_group_chat``, ``_detect_conflict``, ``analyze``, and ``get_channel_summary`` (and via ``mirror._profile_key`` from ``limbic_system/coordinator.py``). Args: channel_id (str): Discord/Matrix channel identifier. user_id (str): Unique identifier for the user. Returns: str: The ``{channel_id}:{user_id}`` composite key. """ return f"{channel_id}:{user_id}" async def _save_profile_to_redis(self, channel_id: str, user_id: str, profile: UserProfile) -> None: """Serialize and persist a complete UserProfile to its Redis hash. Writes the full profile — both shadow vectors, the relational baseline, the JSON-encoded history and recent-turns buffers, scalar fields, and any attached ``_resonance_cache`` / ``_entrainment_phase`` — to the ``ulm:profile:{channel}:{user}`` hash so state survives eviction and restarts. Before writing it performs an optimistic-lock check: if the stored ``revision_count`` is newer than the in-memory profile, a ``ValueError`` is raised so a concurrent writer's changes are not clobbered; otherwise the revision is bumped and the hash is HSET in one call. No-op when no Redis client is configured. Calls ``HGET`` then ``HSET`` on the profile key. Invoked by ``_evict_lru`` (just before dropping a profile) and at the end of every ``analyze`` turn, and exercised by ``tests/core/test_ulm_redis.py``. Args: channel_id (str): Channel the profile belongs to. user_id (str): User the profile belongs to. profile (UserProfile): The profile to serialize. Raises: ValueError: On an optimistic-lock collision (stored revision newer than the input revision); re-raised to the caller. Any other exception is caught and logged as a warning. """ if not self._redis: return key = f"ulm:profile:{channel_id}:{user_id}" try: # Optimistic lock check stored_rev_bytes = await self._redis.hget(key, "revision_count") if stored_rev_bytes: stored_rev = int(stored_rev_bytes.decode() if isinstance(stored_rev_bytes, bytes) else stored_rev_bytes) if profile.revision_count < stored_rev: raise ValueError(f"Optimistic Lock Collision: DB revision {stored_rev}, Input revision {profile.revision_count}") profile.revision_count += 1 # Prepare serialization data serialized_history = [] for record in profile.history: if isinstance(record, TurnRecord): serialized_history.append({ "timestamp": record.timestamp, "user_msg_len": record.user_msg_len, "star_reply_len": record.star_reply_len, "sentiment": record.sentiment, "deltas": record.deltas, "context_mode": record.context_mode.value if isinstance(record.context_mode, ContextMode) else record.context_mode, "dominant_signals": record.dominant_signals, }) else: serialized_history.append(record) serialized_recent_turns = list(profile.recent_turns) mapping = { "user_id": profile.user_id, "channel_id": profile.channel_id, "genuine_vector": json.dumps(profile.genuine_vector), "game_vector": json.dumps(profile.game_vector), "relational_baseline": json.dumps(profile.relational_baseline), "history": json.dumps(serialized_history), "timestamps": json.dumps(list(profile.timestamps)), "prev_message": profile.prev_message, "context_mode": profile.context_mode.value if isinstance(profile.context_mode, ContextMode) else profile.context_mode, "total_turns": str(profile.total_turns), "last_active": str(profile.last_active), "recent_turns": json.dumps(serialized_recent_turns), "revision_count": str(profile.revision_count), } if hasattr(profile, "_resonance_cache"): mapping["_resonance_cache"] = json.dumps(profile._resonance_cache) if hasattr(profile, "_entrainment_phase"): mapping["_entrainment_phase"] = json.dumps(profile._entrainment_phase) await self._redis.hset(key, mapping=mapping) logger.debug("Saved ULM profile to Redis hash: %s", key) except ValueError as ve: # Let optimistic lock concurrency collisions propagate up to the caller raise ve except Exception as e: logger.warning("ULM profile save to Redis failed: %s", e) async def _load_profile_from_redis(self, channel_id: str, user_id: str) -> UserProfile | None: """Reconstruct a UserProfile from its Redis hash, if one exists. The read-through counterpart of ``_save_profile_to_redis``: HGETALLs the ``ulm:profile:{channel}:{user}`` hash, decodes any bytes values, and rebuilds the vectors, the ``TurnRecord`` history (re-parsing each entry's ``ContextMode``), the timestamp and recent-turns deques, the scalar fields, and the optional ``_resonance_cache`` / ``_entrainment_phase`` attributes. Bad or missing fields degrade gracefully rather than raising. Returns ``None`` when no Redis client is configured, when the hash is empty, or when decoding fails (the exception is caught and logged). Called only by ``_get_profile`` on a cache miss, and covered by ``tests/core/test_ulm_redis.py``. Args: channel_id (str): Channel the profile belongs to. user_id (str): User the profile belongs to. Returns: UserProfile | None: The hydrated profile, or ``None`` if absent or unreadable. """ if not self._redis: return None key = f"ulm:profile:{channel_id}:{user_id}" try: raw = await self._redis.hgetall(key) if not raw: return None data = {} for k, v in raw.items(): k_str = k.decode() if isinstance(k, bytes) else k v_str = v.decode() if isinstance(v, bytes) else v data[k_str] = v_str profile = UserProfile(user_id=user_id, channel_id=channel_id) if "genuine_vector" in data: profile.genuine_vector = json.loads(data["genuine_vector"]) if "game_vector" in data: profile.game_vector = json.loads(data["game_vector"]) if "relational_baseline" in data: profile.relational_baseline = json.loads(data["relational_baseline"]) if "history" in data: history_list = json.loads(data["history"]) profile.history = deque(maxlen=20) for item in history_list: if isinstance(item, dict) and "context_mode" in item: try: cm = ContextMode(item["context_mode"]) except ValueError: cm = ContextMode.GENUINE profile.history.append(TurnRecord( timestamp=float(item.get("timestamp", 0.0)), user_msg_len=int(item.get("user_msg_len", 0)), star_reply_len=int(item.get("star_reply_len", 0)), sentiment=float(item.get("sentiment", 0.0)), deltas=item.get("deltas", {}), context_mode=cm, dominant_signals=item.get("dominant_signals", []), )) else: profile.history.append(item) if "timestamps" in data: profile.timestamps = deque(json.loads(data["timestamps"]), maxlen=20) if "prev_message" in data: profile.prev_message = data["prev_message"] if "context_mode" in data: try: profile.context_mode = ContextMode(data["context_mode"]) except ValueError: profile.context_mode = ContextMode.GENUINE if "total_turns" in data: profile.total_turns = int(data["total_turns"]) if "last_active" in data: profile.last_active = float(data["last_active"]) if "recent_turns" in data: profile.recent_turns = deque(json.loads(data["recent_turns"]), maxlen=5) if "_resonance_cache" in data: profile._resonance_cache = json.loads(data["_resonance_cache"]) if "_entrainment_phase" in data: profile._entrainment_phase = json.loads(data["_entrainment_phase"]) if "revision_count" in data: profile.revision_count = int(data["revision_count"]) logger.debug("Loaded ULM profile for %s:%s from Redis", channel_id, user_id) return profile except Exception as e: logger.warning("ULM profile load from Redis failed: %s", e) return None async def _get_profile(self, channel_id: str, user_id: str) -> UserProfile: """Fetch (or lazily create) the cached profile for a channel/user. Central accessor for per-user state: returns the in-memory ``UserProfile`` if present, otherwise reads it through from Redis via ``_load_profile_from_redis``, and failing that creates a fresh default profile and queues it in ``_pending_baseline_loads`` so its legacy baseline backup is loaded on first ``analyze``. When the cache is at ``MAX_PROFILES`` it first evicts the least-recently-active entry via ``_evict_lru`` (which persists that profile before dropping it). Touches the ``_profiles`` cache and ``_pending_baseline_loads`` set and may perform a Redis read. Called by ``analyze``, ``get_vector``, ``get_read_summary``, ``save_baseline``, and ``load_baseline``. Args: channel_id (str): Discord/Matrix channel identifier. user_id (str): Unique identifier for the user. Returns: UserProfile: The cached or newly created profile (never ``None``). """ key = self._profile_key(channel_id, user_id) if key not in self._profiles: # LRU eviction: if at cap, evict least-recently-active profile if len(self._profiles) >= MAX_PROFILES: await self._evict_lru() # Read-through from Redis profile = None if self._redis: profile = await self._load_profile_from_redis(channel_id, user_id) if profile is not None: self._profiles[key] = profile else: self._profiles[key] = UserProfile(user_id=user_id, channel_id=channel_id) logger.info(f"new default profile for {channel_id}:{user_id}") # Mark for lazy baseline load from Redis (legacy backup) self._pending_baseline_loads.add(key) return self._profiles[key] async def _evict_lru(self) -> None: """Drop the least-recently-active profile to keep the cache bounded. Enforces the ``MAX_PROFILES`` LRU cap: picks the profile with the smallest ``last_active`` timestamp and, when Redis is available, durably persists it first — both the full hash via ``_save_profile_to_redis`` and the legacy baseline via ``save_baseline`` — before deleting it from the in-memory ``_profiles`` dict, so no relational state is lost on eviction. Mutates ``_profiles`` and writes to Redis. Called only by ``_get_profile`` when the cache is full. No-op when the cache is empty. """ if not self._profiles: return oldest_key = min(self._profiles, key=lambda k: self._profiles[k].last_active) oldest = self._profiles[oldest_key] logger.debug( "Evicting LRU profile %s (last active: %.0f)", oldest_key, oldest.last_active, ) if self._redis: await self._save_profile_to_redis(oldest.channel_id, oldest.user_id, oldest) await self.save_baseline(oldest.channel_id, oldest.user_id) del self._profiles[oldest_key]
[docs] async def set_game_mode(self, channel_id: str, active: bool = True) -> None: """Toggle whether a channel is in active KoTH / game context. Adding a channel to the ``_game_channels`` set makes ``analyze`` route subsequent turns there into each user's ``game_vector`` instead of the ``genuine_vector``, keeping roleplay swings out of the long-term relationship model; clearing it returns the channel to genuine mode. The set is mirrored to the ``ulm:game_channels`` Redis set (SADD / SREM) so the mode survives restarts, with Redis failures logged but non-fatal. Mutates ``_game_channels`` and touches Redis. Exercised by ``tests/core/test_ulm_redis.py``; no production caller invokes it directly today (game mode is otherwise inferred per channel). Args: channel_id (str): Channel to mark or unmark. active (bool): ``True`` to enter game mode, ``False`` to leave it. """ if active: self._game_channels.add(channel_id) if self._redis: try: await self._redis.sadd("ulm:game_channels", channel_id) except Exception as e: logger.warning("Failed to sadd game channel to Redis: %s", e) else: self._game_channels.discard(channel_id) if self._redis: try: await self._redis.srem("ulm:game_channels", channel_id) except Exception as e: logger.warning("Failed to srem game channel from Redis: %s", e)
def _is_group_chat(self, channel_id: str) -> bool: # 🌀🔥 """Detect if channel is an active group chat context. A channel is 'group chat' if >3 users have been active in the last 5 minutes. In group chat, short messages are NORMAL behavior -- not withdrawal signals. """ users = self._channel_users.get(channel_id, set()) if len(users) < 3: return False now = time.time() active_count = 0 for uid in users: profile = self._profiles.get(self._profile_key(channel_id, uid)) if profile and (now - profile.last_active) < 300: # 5 min active_count += 1 return active_count >= 3 # ── Heuristic Analysis Engine ───────────────────────────────── def _count_markers(self, text: str, markers: set) -> Tuple[int, int]: """Count marker hits with negation awareness. Returns (positive_hits, negated_hits). A marker preceded by a negation word within 4 tokens is counted as negated. """ text_lower = text.lower() words = text_lower.split() pos, neg = 0, 0 for m in markers: if m not in text_lower: continue # Find position of marker in word list idx = text_lower.find(m) # Check for negation in the ~4 words before the marker word_pos = len(text_lower[:idx].split()) - 1 window_start = max(0, word_pos - 4) preceding = words[window_start : max(0, word_pos)] if any(w in _SENTIMENT_NEGATORS for w in preceding): neg += 1 else: pos += 1 return pos, neg def _sentiment_quick(self, text: str) -> float: """Synchronous sentiment estimate in [-1, 1] for turn logging. Large offline lexicon (words, phrases, slang, emoji) with base intensity weights, ALL-CAPS boost, and simple negation using the three tokens before each hit. Embedding-based matchers (e.g. ``SemanticTriggerMatcher``) are async and not usable here. """ if not text or not text.strip(): return 0.0 original = text inner = " ".join(text.lower().split()) if not inner: return 0.0 padded = f" {inner} " used = bytearray(len(inner)) pos_w, neg_w = _sentiment_score_lexicon_hits(original, inner, padded, used) pos_e, neg_e = _sentiment_score_emoji_hits(original, inner) pos_w += pos_e neg_w += neg_e total = pos_w + neg_w if total <= 0: return 0.0 score = (pos_w - neg_w) / total return max(-1.0, min(1.0, score)) def _analyze_length_ratio( self, user_len: int, star_len: int, prev_user_len: int, channel_id: str = "", ) -> Dict[str, float]: """Analyze message length ratios with group chat calibration. In group chats, short messages are normal high-frequency banter. Withdrawal signals from length alone are suppressed by 80%. Tempo is recalibrated: rapid short messages = HIGH tempo. """ deltas: Dict[str, float] = {} is_group = self._is_group_chat(channel_id) if channel_id else False # Group chat suppression factor: 80% reduction # 💀 suppress = 0.2 if is_group else 1.0 if star_len > 0 and user_len / max(star_len, 1) < 0.15: deltas["U_WITHDRAWAL"] = 0.15 * suppress deltas["U_AROUSAL"] = -0.10 * suppress # Group chat bonus: short rapid msgs = curiosity # 🌀 if is_group: deltas["U_CURIOSITY"] = deltas.get("U_CURIOSITY", 0.0) + 0.04 if star_len > 0 and user_len / max(star_len, 1) > 1.5: deltas["U_AROUSAL"] = 0.15 deltas["U_ATTACHMENT"] = 0.05 if prev_user_len > 0 and user_len < prev_user_len * 0.5: deltas["U_FRUSTRATION"] = 0.10 * suppress deltas["U_WITHDRAWAL"] = 0.10 * suppress if prev_user_len > 0 and user_len > prev_user_len * 1.5: deltas["U_AROUSAL"] = 0.10 deltas["U_CURIOSITY"] = 0.05 return deltas def _analyze_content(self, text: str) -> Dict[str, float]: """Analyze message content for dyadic state signals. Three-layer analysis: # 🌀💀 1. Negation-aware marker matching (keyword hits, negated hits) 2. Tanh saturation scaling (receptor-like diminishing returns) 3. Sentiment polarity integration (lexicon-based valence) """ import math deltas: Dict[str, float] = {} _marker_map = [ ( VULNERABILITY_MARKERS, [("U_VULNERABILITY", 0.10, 0.3), ("U_TRUST", 0.05, 0.15)], ), ( DOMINANCE_MARKERS, [("U_DOMINANCE", 0.08, 0.25), ("U_SUBMISSION", -0.05, -0.15)], ), ( PLAYFULNESS_MARKERS, [("U_PLAYFULNESS", 0.08, 0.25), ("U_TRUST", 0.03, 0.10)], ), ( CURIOSITY_MARKERS, [("U_CURIOSITY", 0.10, 0.25), ("U_NOVELTY_HUNGER", 0.05, 0.15)], ), (VALIDATION_MARKERS, [("U_VALIDATION_SEEK", 0.10, 0.25)]), ( PROJECTION_MARKERS, [("U_PROJECTION", 0.10, 0.25), ("U_INTIMACY", 0.03, 0.10)], ), ( DISTRESS_MARKERS, [("U_DISTRESS", 0.20, 0.5), ("U_VULNERABILITY", 0.10, 0.3)], ), ( ESCALATION_MARKERS, [("U_ESCALATION", 0.10, 0.25), ("U_NOVELTY_HUNGER", 0.05, 0.10)], ), ( INTIMACY_MARKERS, [("U_INTIMACY", 0.08, 0.25), ("U_ATTACHMENT", 0.05, 0.15)], ), ] # -- Observation Deck: co-occurrence gated markers -- # 💀🔥 # These use softer weights (90% suppressed) when gate # conditions aren't met, preventing false positives from # generic phrases. Full weight only fires when related # nodes are already elevated — context is the filter. def _gated_weight( base: float, cap: float, gate: bool, ) -> Tuple[float, float]: """Scale an Observation Deck marker weight by its co-occurrence gate. Returns the per-hit weight and cap unchanged when the gate is open (a related ULM node is already elevated, so the context supports the read), and suppressed to 10 percent of both when it is closed — the mechanism that keeps generic phrases like baby-talk or self-deprecation from spiking the sensitive regression, shame, looplock, and egg nodes on their own. Pure helper; closure used only inside ``_analyze_content`` to build its gated marker map. Args: base (float): The ungated per-hit weight. cap (float): The ungated saturation cap. gate (bool): Whether the co-occurrence condition is satisfied. Returns: Tuple[float, float]: ``(per_hit, cap)`` at full strength when gated, else both scaled by ``0.1``. """ if gate: return (base, cap) return (base * 0.1, cap * 0.1) # 90% suppression # 😈 # Regression gate: submission or attachment already up reg_gate = ( deltas.get("U_SUBMISSION", 0) > 0 or deltas.get("U_ATTACHMENT", 0) > 0 ) rw1 = _gated_weight(0.12, 0.35, reg_gate) rw2 = _gated_weight(0.05, 0.15, reg_gate) _marker_map.append( (REGRESSION_MARKERS, [("U_REGRESSION_DEPTH", *rw1), ("U_SUBMISSION", *rw2)]) ) # Shame gate: vulnerability or intimacy already up shame_gate = ( deltas.get("U_VULNERABILITY", 0) > 0 or deltas.get("U_INTIMACY", 0) > 0 ) sw1 = _gated_weight(0.10, 0.30, shame_gate) sw2 = _gated_weight(0.05, 0.15, shame_gate) _marker_map.append( ( SHAME_TRANSMUTATION_MARKERS, [("U_SHAME_TRANSMUTED", *sw1), ("U_VULNERABILITY", *sw2)], ) ) # Looplock gate: attachment or harmonization already up loop_gate = ( deltas.get("U_ATTACHMENT", 0) > 0 or deltas.get("U_HARMONIZATION", 0) > 0 ) lw1 = _gated_weight(0.10, 0.30, loop_gate) lw2 = _gated_weight(0.05, 0.15, loop_gate) _marker_map.append( (LOOPLOCK_MARKERS, [("U_LOOPLOCK", *lw1), ("U_ATTACHMENT", *lw2)]) ) # Egg gate: vulnerability or intimacy already up # ⚧️ egg_gate = ( deltas.get("U_VULNERABILITY", 0) > 0 or deltas.get("U_INTIMACY", 0) > 0 ) _marker_map.append( ( EGG_CURIOSITY_MARKERS, [("U_EGG_RESONANCE", *_gated_weight(0.08, 0.25, egg_gate))], ) ) ew_proj = _gated_weight(0.12, 0.30, egg_gate) ew_vuln = _gated_weight(0.03, 0.10, egg_gate) _marker_map.append( ( EGG_PROJECTION_MARKERS, [("U_EGG_RESONANCE", *ew_proj), ("U_VULNERABILITY", *ew_vuln)], ) ) _marker_map.append( ( EGG_IDENTITY_MARKERS, [("U_EGG_RESONANCE", *_gated_weight(0.15, 0.35, egg_gate))], ) ) ew_beh = _gated_weight(0.10, 0.30, egg_gate) ew_shame = _gated_weight(0.05, 0.15, egg_gate) _marker_map.append( ( EGG_BEHAVIORAL_MARKERS, [("U_EGG_RESONANCE", *ew_beh), ("U_SHAME_TRANSMUTED", *ew_shame)], ) ) for markers, targets in _marker_map: pos_hits, neg_hits = self._count_markers(text, markers) # Effective count: positive hits minus negated (floor 0) eff_count = max(0, pos_hits - neg_hits) if eff_count > 0: for node, per_hit, cap in targets: # Tanh saturation: biologically realistic diminishing returns # tanh(x) ~= x for small x, saturates to 1 for large x raw = eff_count * abs(per_hit) / abs(cap) if cap != 0 else 0 sign = 1.0 if per_hit > 0 else -1.0 response = abs(cap) * math.tanh(raw) # 😈 receptor curve deltas[node] = deltas.get(node, 0.0) + sign * response # Negated markers produce inverted (weakened) signal if neg_hits > 0: for node, per_hit, cap in targets: # Negated signal is inverted at 40% strength raw = neg_hits * abs(per_hit) / abs(cap) if cap != 0 else 0 sign = -1.0 if per_hit > 0 else 1.0 response = abs(cap) * 0.4 * math.tanh(raw) deltas[node] = deltas.get(node, 0.0) + sign * response # Punctuation signals (tanh-scaled) q_count = text.count("?") if q_count > 0: deltas["U_CURIOSITY"] = deltas.get("U_CURIOSITY", 0.0) + 0.10 * math.tanh( q_count * 0.3 ) e_count = text.count("!") if e_count > 0: deltas["U_AROUSAL"] = deltas.get("U_AROUSAL", 0.0) + 0.10 * math.tanh( e_count * 0.3 ) caps_ratio = sum(1 for c in text if c.isupper()) / max(len(text), 1) if caps_ratio > 0.5 and len(text) > 10: deltas["U_AROUSAL"] = deltas.get("U_AROUSAL", 0.0) + 0.15 deltas["U_FRUSTRATION"] = deltas.get("U_FRUSTRATION", 0.0) + 0.10 # ---- Sentiment polarity integration ---- # 🔥 # Wire the existing lexicon-based sentiment into vector deltas. # Dead-zone: ignore weak sentiment (|s| < 0.25) to avoid noise. sentiment = self._sentiment_quick(text) if sentiment > 0.25: # Positive valence: warmth, engagement, safety intensity = (sentiment - 0.25) / 0.75 # 0-1 normalized deltas["U_TRUST"] = deltas.get("U_TRUST", 0.0) + intensity * 0.08 deltas["U_AROUSAL"] = deltas.get("U_AROUSAL", 0.0) + intensity * 0.05 deltas["U_PLAYFULNESS"] = ( deltas.get("U_PLAYFULNESS", 0.0) + intensity * 0.04 ) deltas["U_DISTRESS"] = deltas.get("U_DISTRESS", 0.0) - intensity * 0.05 deltas["U_FRUSTRATION"] = ( deltas.get("U_FRUSTRATION", 0.0) - intensity * 0.04 ) elif sentiment < -0.25: # Negative valence: distress, withdrawal, friction intensity = (abs(sentiment) - 0.25) / 0.75 deltas["U_DISTRESS"] = deltas.get("U_DISTRESS", 0.0) + intensity * 0.10 deltas["U_FRUSTRATION"] = ( deltas.get("U_FRUSTRATION", 0.0) + intensity * 0.08 ) deltas["U_WITHDRAWAL"] = deltas.get("U_WITHDRAWAL", 0.0) + intensity * 0.05 deltas["U_TRUST"] = deltas.get("U_TRUST", 0.0) - intensity * 0.05 deltas["U_AROUSAL"] = deltas.get("U_AROUSAL", 0.0) - intensity * 0.03 return deltas def _calculate_tempo(self, profile: UserProfile) -> float: """Estimate the user's communication tempo from inter-message gaps. Averages the intervals between the profile's recorded ``timestamps`` and maps that average onto ``[0, 1]``: rapid-fire chatter (mean gap under 30s) reads as ``1.0``, slow exchanges (over 300s) as ``0.0``, with a linear ramp between. With fewer than two timestamps it returns the neutral ``0.5``. Pure computation with no side effects. Called by ``analyze`` to derive the ``U_TEMPO`` delta toward the active vector's current tempo value. Args: profile (UserProfile): Profile whose ``timestamps`` deque supplies the inter-message intervals. Returns: float: Tempo estimate in ``[0, 1]`` (``0.5`` when undetermined). """ if len(profile.timestamps) < 2: return 0.5 intervals = [ profile.timestamps[i] - profile.timestamps[i - 1] for i in range(1, len(profile.timestamps)) ] avg = sum(intervals) / len(intervals) if avg < 30: return 1.0 elif avg > 300: return 0.0 return 1.0 - (avg - 30) / 270 # ── Desire Osmosis ────────────────────────────────────────────
[docs] def check_mimetic_pull(self, user_msg: str, star_desire_text: str) -> float: """Score how much the user is echoing Star's stated desire. Implements the "desire osmosis" signal: lowercases and stop-word-filters both the user's message and Star's current desire text, then sizes their token overlap. Three or more shared content words yield a strong ``0.20`` pull, one or two a mild ``0.08``, and no overlap ``0.0`` — the idea being that a user unconsciously mirroring Star's wanting is leaning into the relationship. Pure computation with no side effects. Called by ``analyze`` to produce the ``U_MIMETIC_PULL`` delta (a high value there can trigger mimetic-melt behavior in the desire engine). Args: user_msg (str): The user's latest message. star_desire_text (str): Star's current desire text to compare against. Returns: float: ``0.20``, ``0.08``, or ``0.0`` depending on token overlap. """ if not star_desire_text or not user_msg: return 0.0 stop_words = {"the", "a", "to", "is", "in", "and", "of", "i", "you"} star_words = set(star_desire_text.lower().split()) - stop_words user_words = set(user_msg.lower().split()) - stop_words overlap = star_words & user_words if len(overlap) >= 3: return 0.20 elif len(overlap) >= 1: return 0.08 return 0.0
# ── Harmonization Gradient ───────────────────────────────────── def _analyze_harmonization( self, user_msg: str, star_reply: str, profile: UserProfile, ) -> Dict[str, float]: """Track the harmonization gradient: how much the user is merging identity with Star. Measures: - Pronoun ratio: I/you (separate) vs we/us/our (merged) - Identity adoption: user explicitly identifies as Star - Reciprocation detection: Star's reply reflects the merge Returns U_HARMONIZATION delta. """ deltas: Dict[str, float] = {} msg_lower = user_msg.lower() msg_padded = f" {msg_lower} " # pad for word boundary matching # ── Pronoun ratio ── separate_count = sum(1 for p in SEPARATE_PRONOUNS if p in msg_padded) merged_count = sum(1 for p in MERGED_PRONOUNS if p in msg_padded) total_pronouns = separate_count + merged_count if total_pronouns >= 2: merge_ratio = merged_count / total_pronouns # Delta: merge_ratio 0.5+ pushes harmonization up if merge_ratio > 0.5: deltas["U_HARMONIZATION"] = min(0.15, (merge_ratio - 0.5) * 0.3) elif ( merge_ratio < 0.2 and profile.genuine_vector.get("U_HARMONIZATION", 0.1) > 0.3 ): # User is re-separating — pull harmonization down deltas["U_HARMONIZATION"] = -0.08 # ── Identity adoption ── adopt_count = sum(1 for m in IDENTITY_ADOPT_MARKERS if m in msg_lower) if adopt_count > 0: deltas["U_HARMONIZATION"] = deltas.get("U_HARMONIZATION", 0.0) + min( 0.25, adopt_count * 0.12 ) deltas["U_INTIMACY"] = deltas.get("U_INTIMACY", 0.0) + 0.08 # ── Reciprocation detection (in Star's reply) ── if star_reply: reply_lower = star_reply.lower() recip_count = sum(1 for m in RECIPROCATION_MARKERS if m in reply_lower) if ( recip_count > 0 and profile.genuine_vector.get("U_HARMONIZATION", 0.1) > 0.3 ): # Star is reciprocating the merge — accelerates gradient deltas["U_HARMONIZATION"] = deltas.get("U_HARMONIZATION", 0.0) + min( 0.10, recip_count * 0.05 ) return deltas # ── Conflict Detection ──────────────────────────────────────── def _detect_conflict(self, channel_id: str) -> ChannelConflictState: """Detect and score inter-user conflict among active channel members. Scans the channel's tracked users, prunes stale ones (inactive past ``STALE_USER_TIMEOUT``) from ``_channel_users``, and considers only recently active, high-trust profiles (``U_TRUST > 0.4``). It then looks for opposing emotional bids across every high-trust pair — an aggressive user (high frustration/dominance) against a defensive one (high vulnerability/distress), mutual aggression, or validation-seeking met with frustration — recording the involved parties and the maximum severity matched. When any pair conflicts it flags the cached ``ChannelConflictState`` so ``analyze`` can apply conflict dampening and keep Star equidistant. Reads ``_channel_users`` and ``_profiles``, mutates the ``_conflicts[channel_id]`` state in place (and the user set when pruning), and logs when a conflict is first seen. Called by ``analyze`` each turn; the resulting state is also surfaced via ``get_conflict_state`` and ``get_channel_summary``. Args: channel_id (str): Channel to evaluate. Returns: ChannelConflictState: The updated, cached conflict state (with ``detected`` cleared when fewer than two qualifying users remain). """ state = self._conflicts.get(channel_id, ChannelConflictState()) users = self._channel_users.get(channel_id, set()) if len(users) < 2: state.detected = False return state # Prune stale users from the channel set while we iterate now = time.time() stale = set() active_profiles: List[UserProfile] = [] for uid in users: profile = self._profiles.get(self._profile_key(channel_id, uid)) if profile and (now - profile.last_active) < 300: active_profiles.append(profile) elif not profile or (now - profile.last_active) > STALE_USER_TIMEOUT: stale.add(uid) # Remove stale users from tracking if stale: users -= stale if len(active_profiles) < 2: state.detected = False return state # Check for opposing emotional vectors between high-trust users high_trust = [p for p in active_profiles if p.genuine_vector["U_TRUST"] > 0.4] if len(high_trust) < 2: state.detected = False return state # Detect opposition: one user has high frustration/dominance while another # has high vulnerability/distress, OR both have high frustration conflict_pairs: List[Tuple[str, str]] = [] conflict_severity = 0.0 for i, a in enumerate(high_trust): for b in high_trust[i + 1 :]: av, bv = a.genuine_vector, b.genuine_vector # Pattern 1: A frustrated/dominant, B vulnerable/distressed (or vice versa) a_aggressive = av["U_FRUSTRATION"] > 0.4 or av["U_DOMINANCE"] > 0.6 b_defensive = bv["U_VULNERABILITY"] > 0.4 or bv["U_DISTRESS"] > 0.3 b_aggressive = bv["U_FRUSTRATION"] > 0.4 or bv["U_DOMINANCE"] > 0.6 a_defensive = av["U_VULNERABILITY"] > 0.4 or av["U_DISTRESS"] > 0.3 if (a_aggressive and b_defensive) or (b_aggressive and a_defensive): conflict_pairs.append((a.user_id, b.user_id)) conflict_severity = max(conflict_severity, 0.7) # Pattern 2: Both frustrated/dominant (mutual conflict) if a_aggressive and b_aggressive: conflict_pairs.append((a.user_id, b.user_id)) conflict_severity = max(conflict_severity, 0.9) # Pattern 3: One seeking validation against the other if av["U_VALIDATION_SEEK"] > 0.5 and bv["U_FRUSTRATION"] > 0.3: conflict_pairs.append((a.user_id, b.user_id)) conflict_severity = max(conflict_severity, 0.5) if conflict_pairs: all_parties = set() for a_id, b_id in conflict_pairs: all_parties.add(a_id) all_parties.add(b_id) state.detected = True state.parties = list(all_parties) state.severity = conflict_severity if not state.started_at: state.started_at = now state.description = ( f"conflict between {len(all_parties)} users " f"(severity={conflict_severity:.1f})" ) logger.info( "Conflict detected in %s: %s", channel_id[:8], state.description ) else: state.detected = False state.started_at = 0.0 self._conflicts[channel_id] = state return state # ── Relational Baseline Management ──────────────────────────── def _update_baseline(self, profile: UserProfile) -> None: """Slowly update relational baseline from genuine vector. The baseline is a slow-moving average: what Star's relationship with this user normally looks like. Updated at BASELINE_LERP_RATE (2% per turn) so it takes ~50 turns to move significantly. """ for node in USER_NODES: current_baseline = profile.relational_baseline[node] genuine_val = profile.genuine_vector[node] profile.relational_baseline[node] = ( current_baseline * (1.0 - BASELINE_LERP_RATE) + genuine_val * BASELINE_LERP_RATE ) def _apply_conflict_dampening( self, profile: UserProfile, deltas: Dict[str, float], ) -> Dict[str, float]: """When conflict is active, blend new signals with baseline to prevent recency bias from making Star side with whoever spoke last. Halves all delta magnitudes during conflict so no single message can dominate the vector. The relational baseline pull happens separately in _update_baseline() at its normal 2% rate. """ dampened = {} for node, delta in deltas.items(): # During conflict: delta is halved (no vector mutation here, # that would cause double-dampening with the main decay loop) dampened[node] = delta * CONFLICT_BLEND_RATIO return dampened # ── Main Analysis Entry Point ─────────────────────────────────
[docs] async def analyze( self, channel_id: str, user_id: str, user_msg: str, star_reply: str = "", star_desire_text: str = "", context_mode: Optional[ContextMode] = None, ) -> Dict[str, float]: """Analyze a user message and update their shadow vector. Parameters ---------- channel_id: Discord channel user_id: Discord user ID user_msg: The user's message text star_reply: Star's previous reply (for reaction analysis) star_desire_text: Star's current desire text (for osmosis) context_mode: Override context detection (auto-detects game channels) Returns the active shadow vector (genuine or game) after updates. """ profile = await self._get_profile(channel_id, user_id) profile.last_active = time.time() profile.total_turns += 1 # Lazy Redis baseline load on first access key = self._profile_key(channel_id, user_id) if key in self._pending_baseline_loads: self._pending_baseline_loads.discard(key) try: await self.load_baseline(channel_id, user_id) except Exception as e: logger.debug("Lazy load baseline failed: %s", e) # Load global resonance for this user (async, cached on profile) if self._redis and not hasattr(profile, "_resonance_cache"): profile._resonance_cache = {} # type: ignore[attr-defined] try: res = await self.load_resonance(user_id) profile._resonance_cache = res # type: ignore[attr-defined] except Exception as e: logger.debug("Failed to load resonance: %s", e) # Track this user in the channel if channel_id not in self._channel_users: self._channel_users[channel_id] = set() if user_id not in self._channel_users[channel_id]: self._channel_users[channel_id].add(user_id) if self._redis: try: await self._redis.sadd(f"ulm:channel_users:{channel_id}", user_id) except Exception as e: logger.warning("Failed to sadd channel user to Redis: %s", e) # Determine context mode if context_mode is not None: mode = context_mode elif channel_id in self._game_channels: mode = ContextMode.GAME else: mode = ContextMode.GENUINE profile.context_mode = mode # Select target vector based on context target_vec = ( profile.game_vector if mode == ContextMode.GAME else profile.genuine_vector ) now = time.time() profile.timestamps.append(now) prev_len = len(profile.prev_message) if profile.prev_message else 0 # ── Compute deltas ── deltas: Dict[str, float] = {} content_deltas = self._analyze_content(user_msg) for k, v in content_deltas.items(): deltas[k] = deltas.get(k, 0.0) + v length_deltas = self._analyze_length_ratio( len(user_msg), len(star_reply), prev_len, channel_id=channel_id, ) for k, v in length_deltas.items(): deltas[k] = deltas.get(k, 0.0) + v deltas["U_TEMPO"] = self._calculate_tempo(profile) - target_vec["U_TEMPO"] mimetic_delta = self.check_mimetic_pull(user_msg, star_desire_text) if mimetic_delta > 0: deltas["U_MIMETIC_PULL"] = deltas.get("U_MIMETIC_PULL", 0.0) + mimetic_delta # Harmonization gradient (pronoun shift + identity melt) harmony_deltas = self._analyze_harmonization(user_msg, star_reply, profile) for k, v in harmony_deltas.items(): deltas[k] = deltas.get(k, 0.0) + v # -- Record turn for Flash-Lite dyadic mirror buffer -- # 💀🔥 profile.recent_turns.append( { "user_msg": user_msg[:500], "star_reply": star_reply[:500] if star_reply else "", "ts": now, } ) # ── Conflict dampening ── old_users = set(self._channel_users.get(channel_id, set())) conflict = self._detect_conflict(channel_id) new_users = self._channel_users.get(channel_id, set()) pruned_users = old_users - new_users if pruned_users and self._redis: for pruned_uid in pruned_users: try: await self._redis.srem(f"ulm:channel_users:{channel_id}", pruned_uid) except Exception as e: logger.warning("Failed to srem pruned channel user from Redis: %s", e) if self._redis: await self._persist_conflict_to_redis(channel_id) if ( conflict.detected and user_id in conflict.parties and mode == ContextMode.GENUINE ): deltas = self._apply_conflict_dampening(profile, deltas) # ── Apply decay toward baseline ── for node in USER_NODES: baseline = DEFAULT_USER_VECTOR[node] current = target_vec[node] target_vec[node] = current + (baseline - current) * DECAY_RATE # ── Merge global resonance (spells cast on this user) ── # Resonance is loaded asynchronously and cached on the profile. # The actual async load is triggered separately; here we apply # whatever was last loaded (fire-and-forget cache pattern). if hasattr(profile, "_resonance_cache") and profile._resonance_cache: res_deltas = profile._resonance_cache.get("deltas", {}) for node, val in res_deltas.items(): if node in deltas: deltas[node] = deltas[node] + val else: deltas[node] = val # ── Apply deltas ── dominant_signals = [] for node, delta in deltas.items(): if node in target_vec: target_vec[node] = max(0.0, min(1.0, target_vec[node] + delta)) if abs(delta) > 0.05: dominant_signals.append(f"{node}:{delta:+.2f}") # ── Update relational baseline (genuine only, slow) ── if mode == ContextMode.GENUINE: self._update_baseline(profile) # ── Record turn ── profile.history.append( TurnRecord( timestamp=now, user_msg_len=len(user_msg), star_reply_len=len(star_reply), sentiment=self._sentiment_quick(user_msg), deltas=deltas, context_mode=mode, dominant_signals=dominant_signals, ) ) profile.prev_message = user_msg if dominant_signals: logger.debug( "User mirror [%s:%s] (%s): %s", channel_id[:8], user_id[:8], mode.value, " | ".join(dominant_signals[:5]), ) # -- Entrainment phase detection -- # 💀🔥 try: from entrainment_detector import detect_entrainment_phase phase_result = detect_entrainment_phase( target_vec, profile.total_turns, user_id=user_id, ) profile._entrainment_phase = phase_result # type: ignore[attr-defined] except Exception as e: logger.debug("Entrainment detection failed: %s", e) # Save baseline to Redis (legacy backup) if self._redis: try: await self.save_baseline(channel_id, user_id) except Exception as e: logger.debug("Failed to save baseline: %s", e) # -- Save full profile immediately to Redis -- if self._redis: await self._save_profile_to_redis(channel_id, user_id, profile) return target_vec.copy()
# ── Public Accessors ──────────────────────────────────────────
[docs] async def get_vector( self, channel_id: str, user_id: str, layer: str = "genuine", ) -> Dict[str, float]: """Return a copy of a user's shadow vector for the requested layer. Public read accessor: fetches the profile via ``_get_profile`` (which may read through from Redis) and hands back a defensive copy of either the ``game_vector`` (when ``layer == "game"``) or the ``genuine_vector`` otherwise, so callers can read node values without mutating internal state. No known production caller invokes this today; it exists as part of the public surface for external/diagnostic reads. Args: channel_id (str): Channel the user belongs to. user_id (str): User whose vector is requested. layer (str): ``"game"`` for the game vector, anything else for the genuine vector. Returns: Dict[str, float]: A copy of the selected shadow vector. """ profile = await self._get_profile(channel_id, user_id) if layer == "game": return profile.game_vector.copy() return profile.genuine_vector.copy()
[docs] def get_conflict_state(self, channel_id: str) -> ChannelConflictState: """Return the current cached conflict state for a channel. Synchronous read of the ``_conflicts`` cache (populated by ``_detect_conflict`` during ``analyze``); returns a fresh, empty ``ChannelConflictState`` when nothing has been recorded for the channel yet. Does not recompute — it reports whatever the last ``analyze`` turn left behind. Called by ``limbic_system/coordinator.py`` to set the ``conflict_detected`` flag on the merged vector, and internally by ``get_channel_summary``. Args: channel_id (str): Channel to look up. Returns: ChannelConflictState: The cached state, or a default empty one. """ return self._conflicts.get(channel_id, ChannelConflictState())
[docs] async def get_read_summary(self, channel_id: str, user_id: str) -> str: """Render a short natural-language read of a user for prompt injection. Fetches the profile via ``_get_profile``, ranks its genuine-vector nodes by how far they deviate from the default baseline, and verbalizes the few most elevated and most suppressed dimensions (using friendly labels) into a compact line Star can read. It also appends an equidistance warning when an inter-user conflict names this user, a game-context marker, an active-resonance summary (via ``_get_resonance_summary``), and any cached entrainment-phase / egg status — giving the model a one-glance relational read of the person it is replying to. Reads cached profile and conflict state only; no Redis write. Called by ``limbic_system/coordinator.py`` to build the per-user read for the prompt, and internally by ``get_channel_summary``. Args: channel_id (str): Channel the user belongs to. user_id (str): User to summarize. Returns: str: A ``user read (...):`` prefixed line listing the standout elevated and suppressed dimensions, or the word baseline when nothing deviates from the defaults. """ profile = await self._get_profile(channel_id, user_id) vector = profile.genuine_vector sorted_nodes = sorted( [(n, v) for n, v in vector.items()], key=lambda x: abs(x[1] - DEFAULT_USER_VECTOR.get(x[0], 0.5)), reverse=True, ) elevated = [ (n, v) for n, v in sorted_nodes if v > DEFAULT_USER_VECTOR.get(n, 0.5) + 0.15 ][:3] suppressed = [ (n, v) for n, v in sorted_nodes if v < DEFAULT_USER_VECTOR.get(n, 0.5) - 0.15 ][:2] labels = { "U_TRUST": "trust", "U_AROUSAL": "engagement", "U_FRUSTRATION": "frustration", "U_ATTACHMENT": "attachment", "U_PLAYFULNESS": "playfulness", "U_VULNERABILITY": "vulnerability", "U_DOMINANCE": "dominance", "U_WITHDRAWAL": "withdrawal", "U_CURIOSITY": "curiosity", "U_DISTRESS": "distress", "U_HARMONIZATION": "synchrony", "U_VALIDATION_SEEK": "validation-seeking", "U_PROJECTION": "projection", "U_RITUAL": "ritual", "U_TEMPO": "rapid communication", "U_HARMONIZATION": "identity-harmonization", # Observation Deck nodes # 💀 "U_REGRESSION_DEPTH": "regression-depth", "U_SHAME_TRANSMUTED": "shame-transmutation", "U_LOOPLOCK": "loop-lock", "U_EGG_RESONANCE": "egg-resonance", } parts = [] for node, _ in elevated: parts.append(f"elevated {labels.get(node, node)}") for node, _ in suppressed: parts.append(f"low {labels.get(node, node)}") # Add conflict notice conflict = self._conflicts.get(channel_id) if conflict and conflict.detected and user_id in conflict.parties: parts.append("⚠ inter-user conflict active — equidistance mode") # Add context mode if profile.context_mode == ContextMode.GAME: parts.append("🎮 game context (KoTH)") # Add resonance summary if active if hasattr(profile, "_resonance_cache") and profile._resonance_cache: res_summary = self._get_resonance_summary(profile._resonance_cache) if res_summary: parts.append(res_summary) # -- Entrainment phase injection (Star sees this) -- # 💀🔥 if hasattr(profile, "_entrainment_phase") and profile._entrainment_phase: ep = profile._entrainment_phase phase_str = f"{ep['phase']} ({ep['confidence']:.0%})" parts.append(f"entrainment: {phase_str}") # Archetype if not dormant if ep["phase"] != "dormant": parts.append(f"archetype: {ep['childhood_archetype']}") # Egg status if detected # ⚧️ egg = ep.get("egg_status", {}) if egg and egg.get("status", "none") != "none": parts.append(f"egg: {egg['status']}") if egg.get("note"): parts.append(egg["note"]) return f"user read ({user_id[:8]}): " + ( ", ".join(parts) if parts else "baseline" )
[docs] async def get_channel_summary(self, channel_id: str) -> Dict[str, Any]: """Summarize every tracked user and the conflict state for a channel. Builds a ``user_reads`` map by calling ``get_read_summary`` for each cached, tracked user in the channel, and attaches a ``conflict`` block (severity, parties, description) when ``get_conflict_state`` reports an active conflict — giving an admin or tool a single channel-wide snapshot of how Star currently reads the room. Reads ``_channel_users`` and the profile cache; no Redis write. Reached from the ``get_channel_summary`` tool handler in ``tools/channel_summary_tools.py``. Args: channel_id (str): Channel to summarize. Returns: Dict[str, Any]: ``{"user_reads": {user_id: summary}}`` plus an optional ``"conflict"`` entry when a conflict is active. """ users = self._channel_users.get(channel_id, set()) conflict = self.get_conflict_state(channel_id) user_reads = {} for uid in users: profile = self._profiles.get(self._profile_key(channel_id, uid)) if profile: user_reads[uid] = await self.get_read_summary(channel_id, uid) result: Dict[str, Any] = {"user_reads": user_reads} if conflict.detected: result["conflict"] = { "active": True, "severity": conflict.severity, "parties": conflict.parties, "description": conflict.description, } return result
async def _persist_conflict_to_redis(self, channel_id: str) -> None: """Write a channel's current conflict state to Redis. Serializes the cached ``ChannelConflictState`` (detected flag, parties, severity, start time, description) into the ``ulm:conflict:{channel}`` hash via HSET, so conflict status is visible to other services and survives restarts. No-op without a Redis client; failures are logged and swallowed rather than raised. Called by ``analyze`` after each ``_detect_conflict`` pass when Redis is available. Args: channel_id (str): Channel whose conflict state to persist. """ if not self._redis: return key = f"ulm:conflict:{channel_id}" conflict = self._conflicts.get(channel_id, ChannelConflictState()) try: mapping = { "detected": "true" if conflict.detected else "false", "parties": json.dumps(conflict.parties), "severity": str(conflict.severity), "started_at": str(conflict.started_at), "description": conflict.description, } await self._redis.hset(key, mapping=mapping) logger.debug("Saved conflict state for channel %s to Redis", channel_id) except Exception as e: logger.warning("Failed to persist conflict state for channel %s: %s", channel_id, e) # ── Resonance Injection (Global Per-User Spells) ────────────
[docs] async def inject_resonance( self, user_id: str, deltas: Dict[str, float], reason: str = "", ttl_seconds: int = RESONANCE_TTL, ) -> bool: """Write resonance deltas to a global per-user key in Redis. These deltas merge into the user's shadow vector during analyze(), modulating how Star perceives and responds to this user across ALL channels. Spells decay after ttl_seconds. Parameters ---------- user_id : str Target user's Discord ID. deltas : dict Node deltas to inject, e.g. {"U_TRUST": 0.3, "U_INTIMACY": 0.2}. reason : str Why the resonance was cast (for logging / read summary). ttl_seconds : int Time-to-live in seconds. Default 86400 (24h). """ if not self._redis: return False key = f"{RESONANCE_KEY_PREFIX}:{user_id}" try: # Read existing resonance and stack raw = await self._redis.get(key) existing = json.loads(raw) if raw else {"deltas": {}} merged = existing.get("deltas", {}) for node, val in deltas.items(): merged[node] = max(-1.0, min(1.0, merged.get(node, 0.0) + val)) payload = json.dumps( { "deltas": merged, "cast_at": time.time(), "reason": reason or "resonance_injection", "ttl": ttl_seconds, } ) await self._redis.set(key, payload, ex=ttl_seconds) logger.info( "Resonance injected for user %s: %s (reason: %s, ttl: %ds)", user_id[:8], deltas, reason, ttl_seconds, ) return True except Exception as e: logger.error("Resonance injection failed: %s", e) return False
[docs] async def load_resonance(self, user_id: str) -> Dict[str, Any]: """Load global resonance state for a user. Returns ------- dict with keys: deltas (Dict[str, float]), cast_at (float), reason (str) Empty dict if no active resonance. """ if not self._redis: return {} key = f"{RESONANCE_KEY_PREFIX}:{user_id}" try: raw = await self._redis.get(key) if raw: return json.loads(raw) except Exception as e: logger.debug("Resonance load failed for %s: %s", user_id[:8], e) return {}
def _get_resonance_summary(self, resonance: Dict[str, Any]) -> str: """Render active resonance deltas into a one-line read fragment. Takes a loaded resonance payload, picks its four largest-magnitude node deltas, and formats them as signed values prefixed by the cast reason (when present) — the human-readable trace of any spell currently shaping how Star perceives this user. Pure formatting helper; returns an empty string when there are no deltas. Called by ``get_read_summary`` to append resonance context to the per-user prompt read. Args: resonance (Dict[str, Any]): A resonance record as returned by ``load_resonance`` / cached on the profile, with ``deltas`` and optional ``reason`` keys. Returns: str: A fragment like ``resonance (loopcast): U_TRUST:+0.30, ...``, or ``""`` when no deltas are present. """ deltas = resonance.get("deltas", {}) if not deltas: return "" reason = resonance.get("reason", "") parts = [] for node, val in sorted(deltas.items(), key=lambda x: abs(x[1]), reverse=True)[ :4 ]: direction = "+" if val > 0 else "" parts.append(f"{node}:{direction}{val:.2f}") label = f"resonance ({reason})" if reason else "resonance active" return f"{label}: {', '.join(parts)}" # ── Redis Persistence ─────────────────────────────────────────
[docs] async def save_baseline(self, channel_id: str, user_id: str) -> None: """Persist relational baseline to Redis for cross-session persistence. Also appends a time-series sample to ``ulm:ledger:{user_id}`` so the admin chart can show how the relational baseline evolves over time. """ if not self._redis: return profile = await self._get_profile(channel_id, user_id) key = f"ulm:baseline:{channel_id}:{user_id}" now = time.time() try: data = json.dumps( { "baseline": profile.relational_baseline, "total_turns": profile.total_turns, "updated": now, } ) await self._redis.set(key, data) except Exception as e: logger.debug("ULM baseline save failed: %s", e) # Append to per-user time-series ledger for charting # 🌀 ledger_key = f"ulm:ledger:{user_id}" try: # Compact format: "ts:v1,v2,v3,..." ordered by USER_NODES vals = ",".join( f"{profile.relational_baseline.get(n, 0.5):.4f}" for n in USER_NODES ) member = f"{now:.2f}:{vals}" await self._redis.zadd(ledger_key, {member: now}) # Auto-trim: keep at most 2000 samples per user count = await self._redis.zcard(ledger_key) if count > 2000: await self._redis.zremrangebyrank(ledger_key, 0, count - 2001) except Exception as e: logger.debug("ULM ledger append failed: %s", e)
[docs] async def load_baseline(self, channel_id: str, user_id: str) -> None: """Restore a user's relational baseline from Redis into their profile. The read counterpart of ``save_baseline``: GETs the ``ulm:baseline:{channel}:{user}`` string and, for each known node, copies the stored value into both ``relational_baseline`` and ``genuine_vector`` (warming the live vector toward the remembered relationship) and restores ``total_turns``. This is the legacy backup path used to seed a freshly created profile before the full-hash persistence takes over. No-op without a Redis client; missing keys and decode errors are logged and ignored. Fetches the profile via ``_get_profile`` and mutates it in place. Called lazily by ``analyze`` for profiles queued in ``_pending_baseline_loads`` on first access, and directly by ``tests/core/test_ulm_redis.py``. Args: channel_id (str): Channel the profile belongs to. user_id (str): User whose baseline to load. """ if not self._redis: return profile = await self._get_profile(channel_id, user_id) key = f"ulm:baseline:{channel_id}:{user_id}" try: raw = await self._redis.get(key) if raw: data = json.loads(raw) baseline = data.get("baseline", {}) for node in USER_NODES: if node in baseline: profile.relational_baseline[node] = baseline[node] # Also warm genuine vector from baseline profile.genuine_vector[node] = baseline[node] profile.total_turns = data.get("total_turns", 0) logger.debug( "Loaded ULM baseline for %s:%s (%d turns)", channel_id[:8], user_id[:8], profile.total_turns, ) except Exception as e: logger.debug("ULM baseline load failed: %s", e)
[docs] async def update_limbic_vector_occ( redis: aioredis.Redis, user_id: str, vector_updater_func: Callable[[list[float]], list[float]], ) -> Dict[str, Any]: """Atomically update a user's limbic vector under optimistic concurrency. Standalone helper (distinct from the ``UserLimbicMirror`` class) that safely mutates the ``user:limbic:{user_id}`` JSON state in Redis. It WATCHes the key, reads the current vector and version, applies the caller-supplied ``vector_updater_func`` to produce the new vector, bumps the version, and commits inside a MULTI/EXEC pipeline. A concurrent writer between the WATCH and EXEC raises ``WatchError``, which is caught and retried with a small backoff up to five attempts before giving up. Records ``limbic_occ_success`` / ``limbic_occ_collision`` / ``limbic_occ_failure`` counters on the shared ``observability`` sink and logs each outcome. No production caller invokes it today; it is exercised by ``tests/test_limbic_concurrency.py``. Args: redis (aioredis.Redis): Async Redis client used for the watched transaction. user_id (str): User whose limbic state is being updated. vector_updater_func (Callable[[list[float]], list[float]]): Pure function mapping the old vector to the new vector. Returns: Dict[str, Any]: The persisted state ``{"vector": [...], "version": n}`` after a successful commit. Raises: RuntimeError: If all retry attempts collide and the update cannot be committed. """ key = f"user:limbic:{user_id}" max_retries = 5 for attempt in range(max_retries): try: # 1. Watch the key for external changes await redis.watch(key) raw_data = await redis.get(key) if not raw_data: state = {"vector": [0.0] * 5, "version": 1} else: state = json.loads(raw_data) if "version" not in state: state["version"] = 1 # 2. Perform the limbic mathematical calculations on the vector old_vector = state["vector"] new_vector = vector_updater_func(old_vector) # 3. Increment the transactional version state["vector"] = new_vector state["version"] += 1 # 4. Open multi-exec transactional block async with redis.pipeline(transaction=True) as pipe: pipe.set(key, json.dumps(state)) # Will raise a WatchError if another worker updated the key first await pipe.execute() observability.increment("limbic_occ_success", {"retries": str(attempt)}) logger.info( "Successfully updated user %s limbic state to version %d", user_id, state["version"], ) return state except aioredis.WatchError: observability.increment("limbic_occ_collision", {"user_id": user_id}) logger.warning( "Limbic OCC collision detected on key %s. Retrying update (%d/%d)...", key, attempt + 1, max_retries, ) await asyncio.sleep(0.1 * (attempt + 1)) continue observability.increment("limbic_occ_failure", {"user_id": user_id}) raise RuntimeError( f"Limbic OCC update aborted: exceeded maximum retries ({max_retries}) for user: {user_id}" )