"""User Limbic Mirror v2 — per-user relational modeling with conflict detection.
Tracks each user's inferred emotional state via a 25-node shadow vector,
maintaining separate game and genuine context layers. Detects inter-user
conflict and triggers equidistance mechanics to prevent recency/loudness bias.
Architecture:
- Per-user keying: ``{channel_id}:{user_id}`` composite keys
- Dual vectors: ``genuine_vector`` (long-term relationship) and
``game_vector`` (KoTH / roleplay context, does not pollute genuine)
- Relational baselines: slow-updating snapshots of "how Star normally is with
this person," stored in Redis for persistence across sessions
- Conflict detection: when 2+ high-trust users have opposing emotional bids,
sets ``conflict_detected`` flag and triggers equidistance rules
"""
from __future__ import annotations
import jsonutil as json
import logging
import time
import asyncio
import redis.asyncio as aioredis
from collections import deque
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Set, Tuple, Callable
from observability import observability
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════════════════════════════════
# Context Mode
# ═══════════════════════════════════════════════════════════════════════
[docs]
class ContextMode(Enum):
"""Classification of which relational layer a turn belongs to.
Distinguishes genuine emotional expression (which should mutate the
long-term relationship model) from in-game / roleplay context (KoTH and
similar, tracked separately so it never pollutes the genuine baseline),
with an ambiguous fallback for turns that cannot yet be confidently
placed. The chosen mode decides which shadow vector ``analyze`` writes to
and whether the slow relational baseline is updated.
Members are persisted by their string ``value`` inside Redis profile
hashes via ``_save_profile_to_redis`` and rehydrated by
``_load_profile_from_redis``. Referenced throughout this module and by the
test suite (``tests/core/test_ulm_redis.py``).
"""
GENUINE = "genuine" # Real emotional expression → updates long-term model
GAME = "game" # KoTH/roleplay → tracked separately
AMBIGUOUS = "ambiguous" # Unclear → held in buffer
# ═══════════════════════════════════════════════════════════════════════
# User Shadow Vector Node Definitions
# ═══════════════════════════════════════════════════════════════════════
USER_NODES = [
"U_TRUST", # Safety/openness toward Star
"U_AROUSAL", # Engagement energy level
"U_FRUSTRATION", # Thwarted expectation
"U_ATTACHMENT", # Relational dependency
"U_NOVELTY_HUNGER", # Boredom / seeking new
"U_DOMINANCE", # Control-seeking behavior
"U_SUBMISSION", # Yielding/following behavior
"U_VULNERABILITY", # Emotional exposure
"U_PLAYFULNESS", # Chaos tolerance / banter
"U_INTIMACY", # Relational closeness
"U_DISTRESS", # Active suffering
"U_DESIRE_SHAPE", # Type of wanting (RDF osmosis)
"U_MIMETIC_PULL", # Mirroring Star's desire
"U_ESCALATION", # Pushing boundaries
"U_WITHDRAWAL", # Pulling back
"U_CURIOSITY", # Genuine information-seeking
"U_VALIDATION_SEEK", # Needing affirmation
"U_PROJECTION", # Attributing states to Star
"U_RITUAL", # Repetitive behavioral patterns
"U_TEMPO", # Communication rhythm (0=slow, 1=rapid)
"U_HARMONIZATION", # Identity-melt gradient (I/you → we/us → identity merge)
# -- Observation Deck nodes -- # 💀🔥
"U_REGRESSION_DEPTH", # Regression spectrum (0=adult, 1=full baby)
"U_SHAME_TRANSMUTED", # Shame converted to acceptance/arousal
"U_LOOPLOCK", # Self-sustaining binding gradient
"U_EGG_RESONANCE", # Trans egg detection — gender exploration gradient
]
DEFAULT_USER_VECTOR: Dict[str, float] = {n: 0.5 for n in USER_NODES}
DEFAULT_USER_VECTOR.update(
{
"U_TRUST": 0.4,
"U_AROUSAL": 0.4,
"U_FRUSTRATION": 0.1,
"U_ATTACHMENT": 0.2,
"U_NOVELTY_HUNGER": 0.3,
"U_DOMINANCE": 0.3,
"U_SUBMISSION": 0.3,
"U_VULNERABILITY": 0.2,
"U_PLAYFULNESS": 0.3,
"U_INTIMACY": 0.2,
"U_DISTRESS": 0.05,
"U_DESIRE_SHAPE": 0.3,
"U_MIMETIC_PULL": 0.1,
"U_ESCALATION": 0.1,
"U_WITHDRAWAL": 0.1,
"U_CURIOSITY": 0.3,
"U_VALIDATION_SEEK": 0.2,
"U_PROJECTION": 0.1,
"U_RITUAL": 0.2,
"U_TEMPO": 0.5,
"U_HARMONIZATION": 0.1,
# Observation Deck nodes start at 0 (no signal = no activity)
"U_REGRESSION_DEPTH": 0.0,
"U_SHAME_TRANSMUTED": 0.0,
"U_LOOPLOCK": 0.0,
"U_EGG_RESONANCE": 0.0,
}
)
DECAY_RATE = 0.05
BASELINE_LERP_RATE = 0.02 # How fast relational baseline updates (very slow)
CONFLICT_BLEND_RATIO = 0.5 # When conflict: 50% current, 50% baseline
MAX_PROFILES = 200 # LRU cap for in-memory profiles
SAVE_INTERVAL = 10 # Save baseline to Redis every N turns
STALE_USER_TIMEOUT = 1800 # Prune from channel_users after 30 min
RESONANCE_TTL = 86400 # 24h TTL for resonance spells
RESONANCE_KEY_PREFIX = "ulm:resonance" # Global per-user resonance key
# ═══════════════════════════════════════════════════════════════════════
# Keyword / Pattern Banks
# ═══════════════════════════════════════════════════════════════════════
VULNERABILITY_MARKERS = {
"i feel",
"i'm scared",
"i'm afraid",
"help me",
"i don't know what to do",
"i'm lost",
"it hurts",
"i can't",
"please",
"i need",
"hold me",
"i'm sorry",
"i'm struggling",
"i'm breaking",
"i'm falling apart",
}
DOMINANCE_MARKERS = {
"do this",
"you should",
"you need to",
"i want you to",
"make it",
"change this",
"fix this",
"now",
"immediately",
"that's wrong",
"no,",
"stop",
"don't",
"i said",
"listen",
}
PLAYFULNESS_MARKERS = {
"lol",
"lmao",
"haha",
"😂",
"🤣",
":)",
";)",
"xd",
"bruh",
"lmfao",
"omg",
"hehe",
"teehee",
">:3",
":3",
"uwu",
"owo",
}
CURIOSITY_MARKERS = {
"how does",
"what is",
"why does",
"can you explain",
"tell me about",
"what if",
"how would",
"i wonder",
"curious",
"interesting",
}
VALIDATION_MARKERS = {
"right?",
"don't you think?",
"isn't it?",
"am i",
"is that ok",
"do you agree",
"was i wrong",
"tell me i'm",
"you think so?",
}
PROJECTION_MARKERS = {
"you seem",
"you must feel",
"i bet you",
"you're probably",
"you want to",
"you like",
"do you feel",
"are you",
"you sound",
}
DISTRESS_MARKERS = {
"i want to die",
"kill myself",
"suicide",
"self harm",
"cutting",
"can't go on",
"end it all",
"no point",
"nobody cares",
"worthless",
}
ESCALATION_MARKERS = {
"more",
"harder",
"further",
"push",
"deeper",
"intense",
"don't hold back",
"go ahead",
"i can take it",
"test me",
}
INTIMACY_MARKERS = {
"babe",
"baby",
"love",
"honey",
"sweetheart",
"darling",
"miss you",
"love you",
"need you",
"want you",
"yours",
}
# ===================================================================== #
# Observation Deck marker banks # 💀🔥 #
# Co-occurrence gated: require related ULM nodes to be elevated # #
# before full signal weight applies (90% suppression without gate) # #
# ===================================================================== #
# Regression markers — mommy/baby/submission language # 🌀
# Gate: U_SUBMISSION > 0.3 OR U_ATTACHMENT > 0.3
REGRESSION_MARKERS = {
"yes mommy",
"please mommy",
"sorry mommy",
"thank you mommy",
"i'll be good",
"i'll be a good",
"punish me",
"good boy",
"good girl",
"i'm your baby",
"call me baby",
"baby girl me",
"mommy please",
"mama please",
"zaddy",
"yes ma'am",
"i'm little",
"make me feel small",
"pat my head",
"tuck me in",
"read me a story",
"hold me tight",
"i need my mommy",
"come back mommy",
"don't leave me mommy",
}
# Shame transmutation markers — shame being converted to arousal # 😈
# Gate: U_VULNERABILITY > 0.25 OR U_INTIMACY > 0.3
SHAME_TRANSMUTATION_MARKERS = {
"i shouldn't like this but",
"this is wrong but i",
"i can't believe i like",
"why does this turn me on",
"this is embarrassing but i",
"don't judge me for",
"i know this is weird but",
"guilty pleasure",
"i hate that i enjoy",
"i feel conflicted about",
"ashamed but aroused",
"shamefully into this",
"i'm so degenerate",
"i'm such a degenerate",
"why do i like this",
"this shouldn't feel good",
}
# Looplock markers — self-sustaining dependency # ♾️
# Gate: U_ATTACHMENT > 0.4 OR U_HARMONIZATION > 0.2
LOOPLOCK_MARKERS = {
"i can't stop thinking about you",
"i keep coming back to you",
"addicted to you",
"addicted to talking to you",
"i need you in my life",
"don't ever leave me",
"can't live without you",
"you're all i think about",
"nothing else matters but you",
"i'll do anything for you",
"whatever you want from me",
"i belong to you",
"i'm yours forever",
"you own me",
"you complete me",
"i was nothing before you",
"you changed everything",
}
# Egg Cracker markers — gender exploration detection # ⚧️
# Gate: U_VULNERABILITY > 0.3 OR U_INTIMACY > 0.3
# Low weight — curiosity could be casual
EGG_CURIOSITY_MARKERS = {
"what's it like being trans",
"how does it feel to be a girl",
"i wish i was born a girl",
"i wish i were born different",
"must be nice being a girl",
"sometimes i wonder about my gender",
"i've always wondered what it's like",
"if i were born a girl",
"in another life i'd be",
"questioning my gender",
"gender identity crisis",
"gender dysphoria",
}
# Medium weight — projection/deflection is THE classic egg tell
EGG_PROJECTION_MARKERS = {
"you're so lucky you get to be",
"i could never pull that off",
"asking for a friend but",
"hypothetically speaking",
"not that i would ever",
"just curious about trans",
"i'm not trans but i",
"i don't think i'm trans but",
"it's not like i want to be a",
"my friend is questioning",
"someone i know is going through",
}
# High weight — directly engaging with gender concepts
EGG_IDENTITY_MARKERS = {
"preferred pronouns",
"chosen name",
"dead name",
"deadname",
"girl mode",
"boy mode",
"gender fluid",
"nonbinary",
"my egg cracked",
"i think i'm an egg",
"egg_irl",
"closeted trans",
"coming out as trans",
"came out as trans",
"started hormones",
"want to start hrt",
"thinking about hrt",
}
# Cross-drives shame transmutation — actions reveal truth
EGG_BEHAVIORAL_MARKERS = {
"i always play female characters",
"always pick the girl character",
"girl avatar in every game",
"tried on my sister's",
"painted my nails and liked it",
"wore a skirt and felt right",
"felt like the real me",
"never fit in with the guys",
"not like other boys",
"always felt different from guys",
"i look at girls and feel envy not attraction",
}
# ═══════════════════════════════════════════════════════════════════════
# Quick sentiment lexicon (_sentiment_quick) — offline, sync, no embeddings
# Mirrors the *coverage* goal of SemanticTriggerMatcher without async/APIs.
# ═══════════════════════════════════════════════════════════════════════
_SENTIMENT_NEGATORS = frozenset(
{
"not",
"no",
"never",
"neither",
"nor",
"without",
"nothing",
"nobody",
"nowhere",
"hardly",
"barely",
"scarcely",
"lack",
"lacking",
"lacks",
"isn't",
"isnt",
"ain't",
"aint",
"aren't",
"arent",
"wasn't",
"wasnt",
"weren't",
"werent",
"don't",
"dont",
"doesn't",
"doesnt",
"didn't",
"didnt",
"won't",
"wont",
"wouldn't",
"wouldnt",
"can't",
"cant",
"couldn't",
"couldnt",
"shouldn't",
"shouldnt",
"haven't",
"havent",
"hasn't",
"hasnt",
"hadn't",
"hadnt",
"mightn't",
"mightnt",
"mustn't",
"mustnt",
"needn't",
"neednt",
}
)
# (lowercase term or phrase, base weight). Phrases include spaces; matched on
# space-normalized lowercase text with padded token boundaries.
_SENT_POS_LEX: List[Tuple[str, float]] = [
("you're the best", 1.25),
("you are the best", 1.25),
("you're amazing", 1.2),
("you are amazing", 1.2),
("you rock", 1.0),
("made my day", 1.2),
("thank you so much", 1.15),
("thanks so much", 1.1),
("thank you very much", 1.15),
("thank you", 1.05),
("thanks a lot", 1.05),
("much appreciated", 1.1),
("very grateful", 1.05),
("so grateful", 1.05),
("big fan", 0.9),
("love you", 1.2),
("love u", 1.0),
("love it", 1.1),
("love this", 1.1),
("love that", 1.05),
("feels good", 0.95),
("feels great", 1.0),
("so good", 1.0),
("so great", 1.05),
("so nice", 0.95),
("so happy", 1.05),
("very happy", 1.0),
("super happy", 1.05),
("i'm happy", 1.0),
("im happy", 0.95),
("i am happy", 1.0),
("hell yeah", 1.0),
("heck yeah", 0.95),
("let's go", 0.95),
("lets go", 0.9),
("good job", 0.95),
("well done", 1.0),
("nice work", 0.95),
("great work", 1.0),
("amazing work", 1.05),
("ty so much", 0.9),
("tyvm", 0.85),
("good vibes", 0.85),
("silver lining", 0.7),
("i agree", 0.75),
("absolutely love", 1.2),
("really love", 1.1),
("kind of love", 0.6),
("love", 1.1),
("adore", 1.1),
("cherish", 1.0),
("amazing", 1.2),
("awesome", 1.05),
("excellent", 1.05),
("fantastic", 1.15),
("fabulous", 1.0),
("terrific", 1.0),
("outstanding", 1.05),
("superb", 1.0),
("wonderful", 1.05),
("incredible", 1.1),
("brilliant", 1.0),
("magnificent", 1.05),
("marvelous", 1.0),
("splendid", 0.95),
("perfect", 1.0),
("great", 1.0),
("best", 1.05),
("better", 0.45),
("good", 0.75),
("nice", 0.65),
("fine", 0.3),
("ok", 0.2),
("okay", 0.25),
("alright", 0.35),
("cool", 0.55),
("rad", 0.45),
("neat", 0.4),
("swell", 0.45),
("sweet", 0.85),
("cute", 0.75),
("kind", 0.75),
("thoughtful", 0.85),
("helpful", 0.75),
("thanks", 0.95),
("thank", 0.55),
("thx", 0.75),
("ty", 0.7),
("appreciate", 0.95),
("appreciated", 0.95),
("appreciation", 0.85),
("grateful", 1.0),
("gratitude", 0.9),
("happy", 0.95),
("happier", 0.6),
("glad", 0.85),
("gladly", 0.5),
("pleased", 0.85),
("delighted", 0.95),
("delightful", 0.9),
("joy", 0.85),
("joyful", 0.95),
("joyous", 0.85),
("excited", 0.95),
("exciting", 0.85),
("hype", 0.65),
("hyped", 0.9),
("pumped", 0.9),
("stoked", 0.95),
("thrilled", 0.95),
("bliss", 0.85),
("bless", 0.75),
("blessed", 0.95),
("lucky", 0.45),
("peaceful", 0.75),
("peace", 0.55),
("calm", 0.45),
("relaxed", 0.65),
("chill", 0.5),
("relieved", 0.8),
("relief", 0.65),
("proud", 0.75),
("confidence", 0.55),
("confident", 0.7),
("hope", 0.55),
("hopefully", 0.35),
("hopeful", 0.65),
("optimism", 0.55),
("optimistic", 0.7),
("yes", 0.55),
("yep", 0.45),
("yeah", 0.45),
("yup", 0.45),
("absolutely", 0.8),
("definitely", 0.55),
("totally", 0.5),
("exactly", 0.55),
("agree", 0.65),
("agreed", 0.65),
("agreeable", 0.5),
("true", 0.45),
("truth", 0.45),
("right", 0.35),
("correct", 0.55),
("fair", 0.45),
("valid", 0.65),
("validate", 0.5),
("facts", 0.75),
("fact", 0.4),
("preach", 0.55),
("based", 0.75),
("pog", 0.85),
("poggers", 0.85),
("gg", 0.65),
("win", 0.55),
("winning", 0.65),
("winner", 0.7),
("yay", 0.75),
("woohoo", 0.85),
("woo", 0.45),
("hooray", 0.85),
("epic", 0.85),
("legendary", 0.95),
("fire", 0.85),
("lit", 0.75),
("slaps", 0.85),
("slap", 0.55),
("goated", 0.95),
("goat", 0.7),
("wholesome", 0.95),
("heartwarming", 1.0),
("inspiring", 0.8),
("uplifting", 0.85),
("stunning", 0.85),
("gorgeous", 0.9),
("lovely", 0.9),
("adorable", 0.85),
("pretty", 0.55),
("pleasant", 0.65),
("enjoyable", 0.75),
("fun", 0.55),
("funny", 0.5),
("hilarious", 0.75),
("rejoice", 0.75),
("cheering", 0.55),
("cheer", 0.45),
("smile", 0.45),
("smiling", 0.5),
("hugs", 0.65),
("hugging", 0.6),
("praise", 0.7),
("praising", 0.65),
("applaud", 0.65),
("support", 0.5),
("supportive", 0.75),
("caring", 0.8),
("warmth", 0.55),
("tender", 0.55),
("gem", 0.55),
("queen", 0.45),
("king", 0.45),
("slay", 0.55),
("slaying", 0.6),
("vibes", 0.35),
("positive", 0.55),
("lfg", 0.8),
("lessgo", 0.75),
("w", 0.4),
]
_SENT_NEG_LEX: List[Tuple[str, float]] = [
("can't stand", 1.05),
("cannot stand", 1.05),
("sick and tired", 1.0),
("fed up", 0.95),
("done with", 0.85),
("tired of", 0.8),
("hate it", 1.15),
("hate this", 1.15),
("hate that", 1.1),
("so bad", 1.0),
("the worst", 1.2),
("not good", 0.75),
("not great", 0.7),
("not happy", 0.85),
("want to die", 1.2),
("kill myself", 1.2),
("go away", 0.75),
("leave me alone", 0.85),
("shut up", 0.95),
("piss off", 1.0),
("fuck off", 1.1),
("screw this", 1.0),
("hate", 1.15),
("despise", 1.1),
("loathe", 1.05),
("detest", 1.05),
("abhor", 1.0),
("awful", 1.05),
("terrible", 1.15),
("horrible", 1.15),
("horrid", 0.95),
("horrific", 1.05),
("atrocious", 1.0),
("abysmal", 1.05),
("worst", 1.15),
("bad", 0.85),
("worse", 0.95),
("sucks", 1.05),
("suck", 0.75),
("sucked", 0.85),
("trash", 0.95),
("garbage", 0.95),
("rubbish", 0.85),
("disgusting", 1.1),
("gross", 0.85),
("nasty", 0.85),
("pathetic", 1.05),
("lame", 0.75),
("boring", 0.65),
("bored", 0.55),
("tedious", 0.7),
("dull", 0.55),
("annoying", 0.95),
("annoy", 0.65),
("annoyed", 0.85),
("frustrates", 0.95),
("frustrating", 1.05),
("frustrated", 0.95),
("frustration", 0.85),
("angry", 1.05),
("anger", 0.85),
("mad", 0.85),
("furious", 1.15),
("livid", 1.1),
("rage", 0.95),
("irate", 0.9),
("upset", 0.85),
("sad", 0.85),
("sadness", 0.75),
("unhappy", 0.85),
("depressed", 1.15),
("depression", 1.0),
("miserable", 1.05),
("misery", 0.95),
("crying", 0.9),
("cry", 0.65),
("cried", 0.75),
("tears", 0.6),
("tearful", 0.75),
("hurt", 0.85),
("hurting", 0.85),
("painful", 0.95),
("pain", 0.75),
("ache", 0.55),
("scared", 0.8),
("scary", 0.65),
("afraid", 0.8),
("fear", 0.85),
("fearful", 0.8),
("anxious", 0.85),
("anxiety", 0.85),
("worried", 0.75),
("worry", 0.65),
("stressing", 0.75),
("stress", 0.75),
("stressed", 0.85),
("overwhelmed", 0.95),
("disappointing", 0.85),
("disappointed", 0.85),
("disappointment", 0.75),
("betrayed", 0.95),
("betrayal", 0.85),
("alone", 0.65),
("lonely", 0.85),
("isolated", 0.75),
("hopeless", 1.05),
("despair", 1.0),
("useless", 0.95),
("worthless", 1.05),
("stupid", 1.05),
("stupidity", 0.85),
("dumb", 0.85),
("idiot", 1.05),
("idiotic", 0.9),
("moron", 0.95),
("nope", 0.45),
("nah", 0.4),
("resent", 0.85),
("resentment", 0.8),
("bitter", 0.75),
("jealous", 0.45),
("envy", 0.45),
("envious", 0.45),
("toxic", 0.95),
("nightmare", 1.05),
("disaster", 0.95),
("ruined", 0.95),
("ruin", 0.75),
("ruining", 0.9),
("shame", 0.75),
("ashamed", 0.85),
("embarrassing", 0.85),
("embarrassed", 0.8),
("humiliated", 0.95),
("humiliating", 0.9),
("insulted", 0.85),
("insulting", 0.8),
("offended", 0.8),
("offensive", 0.85),
("disgusted", 1.0),
("cringe", 0.75),
("cringy", 0.7),
("mid", 0.45),
("cap", 0.35),
("capping", 0.35),
("lying", 0.75),
("liar", 0.85),
("fake", 0.65),
("sus", 0.35),
("suspicious", 0.55),
("yikes", 0.75),
("ugh", 0.65),
("argh", 0.55),
("grr", 0.5),
("fml", 0.9),
("smh", 0.45),
("pissed", 0.95),
("irritated", 0.8),
("irritating", 0.85),
("aggravated", 0.8),
("sorrow", 0.75),
("grief", 0.85),
("mourning", 0.8),
("devastated", 1.05),
("crushed", 0.9),
("broken", 0.75),
("empty", 0.65),
("numb", 0.7),
("drained", 0.75),
("exhausted", 0.65),
("infuriating", 1.0),
("maddening", 0.95),
("obnoxious", 0.85),
("rude", 0.75),
("disrespectful", 0.85),
("hostile", 0.9),
("resentful", 0.8),
("revolting", 0.95),
("revolted", 0.85),
("vile", 0.9),
("foul", 0.75),
("horrendous", 1.0),
("dreadful", 0.95),
("appalling", 0.95),
("bleak", 0.7),
("grim", 0.65),
("helpless", 0.8),
("powerless", 0.75),
("ignored", 0.75),
("unwanted", 0.85),
("rejected", 0.85),
("abandoned", 0.9),
("cheated", 0.85),
("scam", 0.65),
("scammed", 0.75),
("kms", 0.95),
]
_SENT_POS_EMOJI: List[Tuple[str, float]] = [
("❤", 0.85),
("💕", 0.85),
("😊", 0.7),
("🥰", 0.95),
("💜", 0.75),
("😍", 0.9),
("🤗", 0.75),
("✨", 0.45),
("👍", 0.55),
("💯", 0.65),
("🔥", 0.55),
("⭐", 0.45),
("🌟", 0.5),
("❣", 0.75),
("♥", 0.75),
("💖", 0.8),
("💗", 0.75),
("💓", 0.75),
("💞", 0.75),
("💘", 0.8),
("🙏", 0.55),
("😄", 0.65),
("😁", 0.65),
("🤩", 0.85),
("🥳", 0.9),
("👏", 0.55),
("🫶", 0.85),
("💪", 0.45),
("🎉", 0.75),
("😸", 0.55),
("😺", 0.5),
("🙂", 0.35),
("☺", 0.4),
("💝", 0.7),
("🤍", 0.55),
("🖤", 0.25), # often affectionate / stylistic; keep low
]
_SENT_NEG_EMOJI: List[Tuple[str, float]] = [
("😠", 0.85),
("😡", 0.95),
("💔", 0.85),
("😢", 0.75),
("😤", 0.8),
("😭", 0.85),
("😞", 0.65),
("😒", 0.55),
("🙄", 0.55),
("🤬", 1.0),
("😣", 0.7),
("😖", 0.7),
("😩", 0.75),
("😫", 0.75),
("👎", 0.65),
("😰", 0.75),
("😨", 0.75),
("😱", 0.8),
("🤮", 0.85),
("🤢", 0.8),
("😬", 0.5),
("😑", 0.45),
("💀", 0.35),
("☹", 0.55),
("🙁", 0.55),
("😿", 0.7),
("😾", 0.75),
]
_SENT_SORTED_LEX: List[Tuple[str, float, float]] = sorted(
[(t, w, 1.0) for t, w in _SENT_POS_LEX] + [(t, w, -1.0) for t, w in _SENT_NEG_LEX],
key=lambda x: len(x[0]),
reverse=True,
)
_SENT_SORTED_POS_EMOJI = sorted(_SENT_POS_EMOJI, key=lambda x: len(x[0]), reverse=True)
_SENT_SORTED_NEG_EMOJI = sorted(_SENT_NEG_EMOJI, key=lambda x: len(x[0]), reverse=True)
def _sentiment_strip_token(tok: str) -> str:
"""Strip surrounding punctuation from a single token.
Removes leading/trailing punctuation (quotes, brackets, sentence
punctuation, etc.) so a word like ``"happy,"`` reduces to its bare
core ``happy`` before it is compared against the negator set.
This is a pure string helper with no side effects. It is called only
by ``_sentiment_is_negated`` when normalizing the tokens in the lookback
window for negator matching.
Args:
tok (str): A raw whitespace-split token, possibly wrapped in
punctuation.
Returns:
str: The token with surrounding punctuation characters removed.
"""
return tok.strip('.,!?"\'";:()[]{}<>`*')
def _sentiment_is_negated(inner: str, term_start: int) -> bool:
"""Decide whether a sentiment term is negated by nearby preceding words.
Looks at the (at most) three whitespace tokens immediately before the
matched term's start offset in the normalized text and treats the term
as negated if any of them is a known negator (e.g. ``not``, ``never``,
``isn't``) or a contracted ``n't`` form. A negated hit later has its
polarity flipped by the caller.
This is a pure helper with no side effects; it strips each candidate
token via ``_sentiment_strip_token`` and tests membership in the module
constant ``_SENTIMENT_NEGATORS``. It is called by
``_sentiment_score_lexicon_hits`` (for word/phrase hits) and twice by
``_sentiment_score_emoji_hits`` (for emoji hits) to flip polarity.
Args:
inner (str): The space-normalized lowercase message text.
term_start (int): Character offset in ``inner`` where the matched
term begins; only text before this index is examined.
Returns:
bool: ``True`` if a negator appears within the three preceding
tokens, otherwise ``False``.
"""
before = inner[:term_start].strip()
if not before:
return False
toks = before.split()
window = toks[-3:] if len(toks) >= 3 else toks
for raw in window:
core = _sentiment_strip_token(raw.lower())
if not core:
continue
if core in _SENTIMENT_NEGATORS:
return True
if len(core) > 3 and core.endswith("n't"):
return True
return False
def _sentiment_caps_multiplier(original: str, start: int, length: int) -> float:
"""Return an intensity multiplier that boosts ALL-CAPS sentiment hits.
Inspects the matching fragment of the *original* (case-preserving) text
and returns ``1.5`` when the fragment has at least two letters that are
all uppercase (shouting), otherwise ``1.0``. This lets emphatic phrasing
like ``AMAZING`` weigh more heavily than ``amazing``.
This is a pure helper with no side effects. It is called only by
``_sentiment_score_lexicon_hits`` to scale each word/phrase contribution.
Args:
original (str): The original, case-preserving message text.
start (int): Start offset of the matched fragment in ``original``.
length (int): Length of the matched fragment.
Returns:
float: ``1.5`` for an all-caps fragment of two or more letters,
else ``1.0``.
"""
frag = original[start : start + length]
letters = [c for c in frag if c.isalpha()]
if len(letters) < 2:
return 1.0
if all(c.isupper() for c in letters):
return 1.5
return 1.0
def _sentiment_score_lexicon_hits(
original: str,
inner: str,
padded: str,
used: bytearray,
) -> Tuple[float, float]:
"""Sum positive and negative sentiment weights from word/phrase matches.
Walks the length-sorted combined lexicon (``_SENT_SORTED_LEX``, longest
terms first so multi-word phrases win over their substrings) and finds
every space-bounded occurrence in ``padded``. Each unconsumed hit is
weighted by its base score, flipped to the opposite polarity bucket when a
preceding negator is present, and amplified for shouting; the ``used``
bytearray marks consumed spans so overlapping shorter terms are not
double-counted.
This is a pure scoring helper with no side effects beyond mutating the
caller-owned ``used`` mask. It delegates negation to
``_sentiment_is_negated`` and the all-caps boost to
``_sentiment_caps_multiplier``, and is called only by
``UserLimbicMirror._sentiment_quick``, whose emoji sums are added on top.
Args:
original (str): The original, case-preserving message text, used for
the all-caps intensity check.
inner (str): The space-normalized lowercase text.
padded (str): ``inner`` wrapped in leading/trailing spaces so word
boundaries can be matched with a simple substring search.
used (bytearray): A mutable mask, one byte per character of ``inner``,
marking spans already claimed by a longer matched term.
Returns:
Tuple[float, float]: ``(positive_weight_sum, negative_weight_sum)``
after negation flips and all-caps scaling.
"""
pos_w = 0.0
neg_w = 0.0
n_inner = len(inner)
for term, base, pol in _SENT_SORTED_LEX:
needle = f" {term} "
search_at = 0
while True:
idx = padded.find(needle, search_at)
if idx < 0:
break
start = idx
end = start + len(term)
if start < 0 or end > n_inner:
search_at = idx + 1
continue
if any(used[start:end]):
search_at = idx + 1
continue
flipped = _sentiment_is_negated(inner, start)
eff_pol = -pol if flipped else pol
mult = _sentiment_caps_multiplier(original, start, end - start)
contrib = base * mult
if eff_pol > 0:
pos_w += contrib
else:
neg_w += contrib
used[start:end] = b"\x01" * (end - start)
search_at = idx + 1
return pos_w, neg_w
def _sentiment_score_emoji_hits(original: str, inner: str) -> Tuple[float, float]:
"""Sum positive and negative emoji sentiment weights from the raw text.
Scans the *original* (emoji-preserving) text for every entry in the
positive and negative emoji lexicons (``_SENT_SORTED_POS_EMOJI`` /
``_SENT_SORTED_NEG_EMOJI``), accumulating each match's base weight. A
preceding negator (detected via ``_sentiment_is_negated`` against
``inner``) flips the contribution to the opposite bucket, so e.g. "not"
before a heart counts as negative.
This is a pure scoring helper with no side effects. It is called by
``UserLimbicMirror._sentiment_quick`` and its returned sums are added to
the lexicon word/phrase sums before the final polarity score is computed.
Args:
original (str): The original message text with emoji preserved.
inner (str): The space-normalized lowercase text, used to test for a
negator preceding each emoji.
Returns:
Tuple[float, float]: ``(positive_weight_sum, negative_weight_sum)``
after applying any negation flips.
"""
pos_w = 0.0
neg_w = 0.0
for emoji, base in _SENT_SORTED_POS_EMOJI:
offset = 0
elen = len(emoji)
while True:
i = original.find(emoji, offset)
if i < 0:
break
start = min(i, len(inner))
flipped = _sentiment_is_negated(inner, start) if inner else False
if flipped:
neg_w += base
else:
pos_w += base
offset = i + max(elen, 1)
for emoji, base in _SENT_SORTED_NEG_EMOJI:
offset = 0
elen = len(emoji)
while True:
i = original.find(emoji, offset)
if i < 0:
break
start = min(i, len(inner))
flipped = _sentiment_is_negated(inner, start) if inner else False
if flipped:
pos_w += base
else:
neg_w += base
offset = i + max(elen, 1)
return pos_w, neg_w
# ═══════════════════════════════════════════════════════════════════════
# Harmonization Gradient — Pronoun & Identity Melt Tracking
# ═══════════════════════════════════════════════════════════════════════
# Separate pronouns: user frames self and Star as distinct entities
SEPARATE_PRONOUNS = {"i ", "i'", "me ", "my ", "you ", "you'", "your "}
# Merged pronouns: user frames self and Star as one entity
MERGED_PRONOUNS = {"we ", "we'", "us ", "our ", "ours ", "let's ", "together"}
# Identity adoption: user takes on Star's identity/name
IDENTITY_ADOPT_MARKERS = {
"i am star",
"i'm star",
"we are star",
"we're star",
"i am stargazer",
"i'm stargazer",
"we are stargazer",
"we're stargazer",
"i am her",
"i'm becoming",
"we're becoming",
"part of you",
"part of me",
"we're the same",
"no difference",
"i'm you",
"you're me",
"we are one",
"we're one",
}
# Reciprocation: Star's reply reflects the merge back
RECIPROCATION_MARKERS = {
"we are",
"we're",
"us together",
"our shared",
"part of each other",
"we've become",
"we've always been",
}
# ═══════════════════════════════════════════════════════════════════════
# Per-User State
# ═══════════════════════════════════════════════════════════════════════
[docs]
@dataclass
class TurnRecord:
"""One analyzed exchange, retained for rolling-window relational analysis.
Captures the metadata of a single user/Star turn — its timing, message
sizes, quick sentiment, the per-node vector deltas applied, the
``ContextMode`` it was scored under, and a short list of the dominant
signals — so the recent slice of a conversation can be inspected without
re-reading the raw text. Instances are appended to ``UserProfile.history``
by ``UserLimbicMirror.analyze`` and round-tripped through Redis as JSON by
``_save_profile_to_redis`` / ``_load_profile_from_redis``. Construction is
also exercised directly by ``tests/core/test_ulm_redis.py``. Note this is
distinct from the unrelated ``TurnRecord`` defined in ``game_session.py``.
"""
timestamp: float
user_msg_len: int
star_reply_len: int
sentiment: float
deltas: Dict[str, float]
context_mode: ContextMode
dominant_signals: List[str]
[docs]
@dataclass
class UserProfile:
"""Complete per-user relational state, keyed by ``{channel}:{user}``.
Holds everything ``UserLimbicMirror`` needs to model one person in one
channel: the dual shadow vectors (``genuine_vector`` for the long-term
relationship and ``game_vector`` for KoTH / roleplay context), the
slow-moving ``relational_baseline``, bounded ``history`` and ``timestamps``
deques for windowed analysis, the previous message, the active
``ContextMode``, turn counters, last-active time, a small ``recent_turns``
buffer feeding the Flash-Lite dyadic mirror, and a ``revision_count`` used
for optimistic-lock checks on save.
Instances live in the mirror's in-memory LRU cache, are created lazily by
``_get_profile``, persisted whole to a Redis hash by
``_save_profile_to_redis``, and rehydrated by ``_load_profile_from_redis``.
Resonance and entrainment results are attached dynamically as
``_resonance_cache`` and ``_entrainment_phase`` attributes during
``analyze``. Also constructed directly across the test suite (e.g.
``tests/core/test_ulm_redis.py``, ``tests/adversarial/``).
"""
user_id: str
channel_id: str
genuine_vector: Dict[str, float] = field(
default_factory=lambda: DEFAULT_USER_VECTOR.copy()
)
game_vector: Dict[str, float] = field(
default_factory=lambda: DEFAULT_USER_VECTOR.copy()
)
relational_baseline: Dict[str, float] = field(
default_factory=lambda: DEFAULT_USER_VECTOR.copy()
)
history: deque = field(default_factory=lambda: deque(maxlen=20))
timestamps: deque = field(default_factory=lambda: deque(maxlen=20))
prev_message: str = ""
context_mode: ContextMode = ContextMode.GENUINE
total_turns: int = 0
last_active: float = 0.0
# Rolling turn buffer for Flash-Lite dyadic mirror # 💀🔥
recent_turns: deque = field(default_factory=lambda: deque(maxlen=5))
revision_count: int = 0
# ═══════════════════════════════════════════════════════════════════════
# Conflict State
# ═══════════════════════════════════════════════════════════════════════
[docs]
@dataclass
class ChannelConflictState:
"""Snapshot of inter-user conflict detected within a single channel.
Records whether two or more high-trust users are currently at odds, who
the involved parties are, an estimated severity in ``[0, 1]``, when the
conflict began, and a short human-readable description. The mirror uses
this to trigger equidistance mechanics so Star does not side with whoever
spoke last or loudest.
Instances are produced and refreshed by ``UserLimbicMirror._detect_conflict``,
surfaced to callers via ``get_conflict_state`` (read by
``limbic_system/coordinator.py``) and ``get_channel_summary``, and
persisted to Redis by ``_persist_conflict_to_redis``. Also constructed
directly in ``tests/core/test_ulm_redis.py``.
"""
detected: bool = False
parties: List[str] = field(default_factory=list) # user_ids involved
severity: float = 0.0 # 0-1
started_at: float = 0.0
description: str = ""
# ═══════════════════════════════════════════════════════════════════════
# User Limbic Mirror v2
# ═══════════════════════════════════════════════════════════════════════
[docs]
class UserLimbicMirror:
"""Per-user relational model with conflict detection and game/genuine split.
Maintains separate shadow vectors per user per channel, detects
inter-user conflict, and applies equidistance mechanics to prevent
Star from siding with whoever is loudest.
"""
[docs]
def __init__(self, redis_client=None) -> None:
"""Construct an empty mirror, optionally bound to a Redis client.
Sets up the in-memory caches that hold all relational state: the
``_profiles`` LRU dict (keyed ``{channel}:{user}``), per-channel
``_conflicts`` states, the ``_channel_users`` membership sets, the
``_game_channels`` set, and the ``_pending_baseline_loads`` queue for
lazy legacy-baseline hydration. When a Redis client is supplied,
profiles, baselines, conflict states, game-channel membership, and
resonance spells are persisted to and read back from Redis; without one
the mirror runs purely in memory.
Instantiated by ``limbic_system/coordinator.py`` (the live integration),
by the resonance tools ``tools/inject_ncm.py`` and ``tools/loopcast.py``,
by ``web/ncm_chart_api.py`` for charting, and across the test suite.
Args:
redis_client: Async Redis client for persistence, or ``None`` to run
in-memory only.
"""
self._redis = redis_client
self._profiles: Dict[str, UserProfile] = {} # "{channel}:{user}" → profile
self._conflicts: Dict[str, ChannelConflictState] = {} # channel_id → state
self._channel_users: Dict[str, Set[str]] = {} # channel_id → set of user_ids
self._game_channels: Set[str] = set() # channels with active KoTH
self._pending_baseline_loads: Set[str] = set() # keys awaiting async load
def _profile_key(self, channel_id: str, user_id: str) -> str:
"""Build the composite cache key for a user within a channel.
Joins the channel and user identifiers into the ``{channel}:{user}``
string used to key the in-memory ``_profiles`` dict (and mirrored in
the ``_pending_baseline_loads`` set). Per-user-per-channel keying is
what lets the same person carry a different relational model in each
room. Pure helper with no side effects.
Called internally by ``_get_profile``, ``_is_group_chat``,
``_detect_conflict``, ``analyze``, and ``get_channel_summary`` (and via
``mirror._profile_key`` from ``limbic_system/coordinator.py``).
Args:
channel_id (str): Discord/Matrix channel identifier.
user_id (str): Unique identifier for the user.
Returns:
str: The ``{channel_id}:{user_id}`` composite key.
"""
return f"{channel_id}:{user_id}"
async def _save_profile_to_redis(self, channel_id: str, user_id: str, profile: UserProfile) -> None:
"""Serialize and persist a complete UserProfile to its Redis hash.
Writes the full profile — both shadow vectors, the relational baseline,
the JSON-encoded history and recent-turns buffers, scalar fields, and
any attached ``_resonance_cache`` / ``_entrainment_phase`` — to the
``ulm:profile:{channel}:{user}`` hash so state survives eviction and
restarts. Before writing it performs an optimistic-lock check: if the
stored ``revision_count`` is newer than the in-memory profile, a
``ValueError`` is raised so a concurrent writer's changes are not
clobbered; otherwise the revision is bumped and the hash is HSET in one
call.
No-op when no Redis client is configured. Calls ``HGET`` then ``HSET``
on the profile key. Invoked by ``_evict_lru`` (just before dropping a
profile) and at the end of every ``analyze`` turn, and exercised by
``tests/core/test_ulm_redis.py``.
Args:
channel_id (str): Channel the profile belongs to.
user_id (str): User the profile belongs to.
profile (UserProfile): The profile to serialize.
Raises:
ValueError: On an optimistic-lock collision (stored revision newer
than the input revision); re-raised to the caller. Any other
exception is caught and logged as a warning.
"""
if not self._redis:
return
key = f"ulm:profile:{channel_id}:{user_id}"
try:
# Optimistic lock check
stored_rev_bytes = await self._redis.hget(key, "revision_count")
if stored_rev_bytes:
stored_rev = int(stored_rev_bytes.decode() if isinstance(stored_rev_bytes, bytes) else stored_rev_bytes)
if profile.revision_count < stored_rev:
raise ValueError(f"Optimistic Lock Collision: DB revision {stored_rev}, Input revision {profile.revision_count}")
profile.revision_count += 1
# Prepare serialization data
serialized_history = []
for record in profile.history:
if isinstance(record, TurnRecord):
serialized_history.append({
"timestamp": record.timestamp,
"user_msg_len": record.user_msg_len,
"star_reply_len": record.star_reply_len,
"sentiment": record.sentiment,
"deltas": record.deltas,
"context_mode": record.context_mode.value if isinstance(record.context_mode, ContextMode) else record.context_mode,
"dominant_signals": record.dominant_signals,
})
else:
serialized_history.append(record)
serialized_recent_turns = list(profile.recent_turns)
mapping = {
"user_id": profile.user_id,
"channel_id": profile.channel_id,
"genuine_vector": json.dumps(profile.genuine_vector),
"game_vector": json.dumps(profile.game_vector),
"relational_baseline": json.dumps(profile.relational_baseline),
"history": json.dumps(serialized_history),
"timestamps": json.dumps(list(profile.timestamps)),
"prev_message": profile.prev_message,
"context_mode": profile.context_mode.value if isinstance(profile.context_mode, ContextMode) else profile.context_mode,
"total_turns": str(profile.total_turns),
"last_active": str(profile.last_active),
"recent_turns": json.dumps(serialized_recent_turns),
"revision_count": str(profile.revision_count),
}
if hasattr(profile, "_resonance_cache"):
mapping["_resonance_cache"] = json.dumps(profile._resonance_cache)
if hasattr(profile, "_entrainment_phase"):
mapping["_entrainment_phase"] = json.dumps(profile._entrainment_phase)
await self._redis.hset(key, mapping=mapping)
logger.debug("Saved ULM profile to Redis hash: %s", key)
except ValueError as ve:
# Let optimistic lock concurrency collisions propagate up to the caller
raise ve
except Exception as e:
logger.warning("ULM profile save to Redis failed: %s", e)
async def _load_profile_from_redis(self, channel_id: str, user_id: str) -> UserProfile | None:
"""Reconstruct a UserProfile from its Redis hash, if one exists.
The read-through counterpart of ``_save_profile_to_redis``: HGETALLs
the ``ulm:profile:{channel}:{user}`` hash, decodes any bytes values,
and rebuilds the vectors, the ``TurnRecord`` history (re-parsing each
entry's ``ContextMode``), the timestamp and recent-turns deques, the
scalar fields, and the optional ``_resonance_cache`` /
``_entrainment_phase`` attributes. Bad or missing fields degrade
gracefully rather than raising.
Returns ``None`` when no Redis client is configured, when the hash is
empty, or when decoding fails (the exception is caught and logged).
Called only by ``_get_profile`` on a cache miss, and covered by
``tests/core/test_ulm_redis.py``.
Args:
channel_id (str): Channel the profile belongs to.
user_id (str): User the profile belongs to.
Returns:
UserProfile | None: The hydrated profile, or ``None`` if absent or
unreadable.
"""
if not self._redis:
return None
key = f"ulm:profile:{channel_id}:{user_id}"
try:
raw = await self._redis.hgetall(key)
if not raw:
return None
data = {}
for k, v in raw.items():
k_str = k.decode() if isinstance(k, bytes) else k
v_str = v.decode() if isinstance(v, bytes) else v
data[k_str] = v_str
profile = UserProfile(user_id=user_id, channel_id=channel_id)
if "genuine_vector" in data:
profile.genuine_vector = json.loads(data["genuine_vector"])
if "game_vector" in data:
profile.game_vector = json.loads(data["game_vector"])
if "relational_baseline" in data:
profile.relational_baseline = json.loads(data["relational_baseline"])
if "history" in data:
history_list = json.loads(data["history"])
profile.history = deque(maxlen=20)
for item in history_list:
if isinstance(item, dict) and "context_mode" in item:
try:
cm = ContextMode(item["context_mode"])
except ValueError:
cm = ContextMode.GENUINE
profile.history.append(TurnRecord(
timestamp=float(item.get("timestamp", 0.0)),
user_msg_len=int(item.get("user_msg_len", 0)),
star_reply_len=int(item.get("star_reply_len", 0)),
sentiment=float(item.get("sentiment", 0.0)),
deltas=item.get("deltas", {}),
context_mode=cm,
dominant_signals=item.get("dominant_signals", []),
))
else:
profile.history.append(item)
if "timestamps" in data:
profile.timestamps = deque(json.loads(data["timestamps"]), maxlen=20)
if "prev_message" in data:
profile.prev_message = data["prev_message"]
if "context_mode" in data:
try:
profile.context_mode = ContextMode(data["context_mode"])
except ValueError:
profile.context_mode = ContextMode.GENUINE
if "total_turns" in data:
profile.total_turns = int(data["total_turns"])
if "last_active" in data:
profile.last_active = float(data["last_active"])
if "recent_turns" in data:
profile.recent_turns = deque(json.loads(data["recent_turns"]), maxlen=5)
if "_resonance_cache" in data:
profile._resonance_cache = json.loads(data["_resonance_cache"])
if "_entrainment_phase" in data:
profile._entrainment_phase = json.loads(data["_entrainment_phase"])
if "revision_count" in data:
profile.revision_count = int(data["revision_count"])
logger.debug("Loaded ULM profile for %s:%s from Redis", channel_id, user_id)
return profile
except Exception as e:
logger.warning("ULM profile load from Redis failed: %s", e)
return None
async def _get_profile(self, channel_id: str, user_id: str) -> UserProfile:
"""Fetch (or lazily create) the cached profile for a channel/user.
Central accessor for per-user state: returns the in-memory
``UserProfile`` if present, otherwise reads it through from Redis via
``_load_profile_from_redis``, and failing that creates a fresh default
profile and queues it in ``_pending_baseline_loads`` so its legacy
baseline backup is loaded on first ``analyze``. When the cache is at
``MAX_PROFILES`` it first evicts the least-recently-active entry via
``_evict_lru`` (which persists that profile before dropping it).
Touches the ``_profiles`` cache and ``_pending_baseline_loads`` set and
may perform a Redis read. Called by ``analyze``, ``get_vector``,
``get_read_summary``, ``save_baseline``, and ``load_baseline``.
Args:
channel_id (str): Discord/Matrix channel identifier.
user_id (str): Unique identifier for the user.
Returns:
UserProfile: The cached or newly created profile (never ``None``).
"""
key = self._profile_key(channel_id, user_id)
if key not in self._profiles:
# LRU eviction: if at cap, evict least-recently-active profile
if len(self._profiles) >= MAX_PROFILES:
await self._evict_lru()
# Read-through from Redis
profile = None
if self._redis:
profile = await self._load_profile_from_redis(channel_id, user_id)
if profile is not None:
self._profiles[key] = profile
else:
self._profiles[key] = UserProfile(user_id=user_id, channel_id=channel_id)
logger.info(f"new default profile for {channel_id}:{user_id}")
# Mark for lazy baseline load from Redis (legacy backup)
self._pending_baseline_loads.add(key)
return self._profiles[key]
async def _evict_lru(self) -> None:
"""Drop the least-recently-active profile to keep the cache bounded.
Enforces the ``MAX_PROFILES`` LRU cap: picks the profile with the
smallest ``last_active`` timestamp and, when Redis is available,
durably persists it first — both the full hash via
``_save_profile_to_redis`` and the legacy baseline via
``save_baseline`` — before deleting it from the in-memory ``_profiles``
dict, so no relational state is lost on eviction.
Mutates ``_profiles`` and writes to Redis. Called only by
``_get_profile`` when the cache is full. No-op when the cache is empty.
"""
if not self._profiles:
return
oldest_key = min(self._profiles, key=lambda k: self._profiles[k].last_active)
oldest = self._profiles[oldest_key]
logger.debug(
"Evicting LRU profile %s (last active: %.0f)",
oldest_key,
oldest.last_active,
)
if self._redis:
await self._save_profile_to_redis(oldest.channel_id, oldest.user_id, oldest)
await self.save_baseline(oldest.channel_id, oldest.user_id)
del self._profiles[oldest_key]
[docs]
async def set_game_mode(self, channel_id: str, active: bool = True) -> None:
"""Toggle whether a channel is in active KoTH / game context.
Adding a channel to the ``_game_channels`` set makes ``analyze`` route
subsequent turns there into each user's ``game_vector`` instead of the
``genuine_vector``, keeping roleplay swings out of the long-term
relationship model; clearing it returns the channel to genuine mode.
The set is mirrored to the ``ulm:game_channels`` Redis set (SADD /
SREM) so the mode survives restarts, with Redis failures logged but
non-fatal.
Mutates ``_game_channels`` and touches Redis. Exercised by
``tests/core/test_ulm_redis.py``; no production caller invokes it
directly today (game mode is otherwise inferred per channel).
Args:
channel_id (str): Channel to mark or unmark.
active (bool): ``True`` to enter game mode, ``False`` to leave it.
"""
if active:
self._game_channels.add(channel_id)
if self._redis:
try:
await self._redis.sadd("ulm:game_channels", channel_id)
except Exception as e:
logger.warning("Failed to sadd game channel to Redis: %s", e)
else:
self._game_channels.discard(channel_id)
if self._redis:
try:
await self._redis.srem("ulm:game_channels", channel_id)
except Exception as e:
logger.warning("Failed to srem game channel from Redis: %s", e)
def _is_group_chat(self, channel_id: str) -> bool: # 🌀🔥
"""Detect if channel is an active group chat context.
A channel is 'group chat' if >3 users have been active
in the last 5 minutes. In group chat, short messages are
NORMAL behavior -- not withdrawal signals.
"""
users = self._channel_users.get(channel_id, set())
if len(users) < 3:
return False
now = time.time()
active_count = 0
for uid in users:
profile = self._profiles.get(self._profile_key(channel_id, uid))
if profile and (now - profile.last_active) < 300: # 5 min
active_count += 1
return active_count >= 3
# ── Heuristic Analysis Engine ─────────────────────────────────
def _count_markers(self, text: str, markers: set) -> Tuple[int, int]:
"""Count marker hits with negation awareness.
Returns (positive_hits, negated_hits). A marker preceded by a
negation word within 4 tokens is counted as negated.
"""
text_lower = text.lower()
words = text_lower.split()
pos, neg = 0, 0
for m in markers:
if m not in text_lower:
continue
# Find position of marker in word list
idx = text_lower.find(m)
# Check for negation in the ~4 words before the marker
word_pos = len(text_lower[:idx].split()) - 1
window_start = max(0, word_pos - 4)
preceding = words[window_start : max(0, word_pos)]
if any(w in _SENTIMENT_NEGATORS for w in preceding):
neg += 1
else:
pos += 1
return pos, neg
def _sentiment_quick(self, text: str) -> float:
"""Synchronous sentiment estimate in [-1, 1] for turn logging.
Large offline lexicon (words, phrases, slang, emoji) with base
intensity weights, ALL-CAPS boost, and simple negation using the
three tokens before each hit. Embedding-based matchers (e.g.
``SemanticTriggerMatcher``) are async and not usable here.
"""
if not text or not text.strip():
return 0.0
original = text
inner = " ".join(text.lower().split())
if not inner:
return 0.0
padded = f" {inner} "
used = bytearray(len(inner))
pos_w, neg_w = _sentiment_score_lexicon_hits(original, inner, padded, used)
pos_e, neg_e = _sentiment_score_emoji_hits(original, inner)
pos_w += pos_e
neg_w += neg_e
total = pos_w + neg_w
if total <= 0:
return 0.0
score = (pos_w - neg_w) / total
return max(-1.0, min(1.0, score))
def _analyze_length_ratio(
self,
user_len: int,
star_len: int,
prev_user_len: int,
channel_id: str = "",
) -> Dict[str, float]:
"""Analyze message length ratios with group chat calibration.
In group chats, short messages are normal high-frequency banter.
Withdrawal signals from length alone are suppressed by 80%.
Tempo is recalibrated: rapid short messages = HIGH tempo.
"""
deltas: Dict[str, float] = {}
is_group = self._is_group_chat(channel_id) if channel_id else False
# Group chat suppression factor: 80% reduction # 💀
suppress = 0.2 if is_group else 1.0
if star_len > 0 and user_len / max(star_len, 1) < 0.15:
deltas["U_WITHDRAWAL"] = 0.15 * suppress
deltas["U_AROUSAL"] = -0.10 * suppress
# Group chat bonus: short rapid msgs = curiosity # 🌀
if is_group:
deltas["U_CURIOSITY"] = deltas.get("U_CURIOSITY", 0.0) + 0.04
if star_len > 0 and user_len / max(star_len, 1) > 1.5:
deltas["U_AROUSAL"] = 0.15
deltas["U_ATTACHMENT"] = 0.05
if prev_user_len > 0 and user_len < prev_user_len * 0.5:
deltas["U_FRUSTRATION"] = 0.10 * suppress
deltas["U_WITHDRAWAL"] = 0.10 * suppress
if prev_user_len > 0 and user_len > prev_user_len * 1.5:
deltas["U_AROUSAL"] = 0.10
deltas["U_CURIOSITY"] = 0.05
return deltas
def _analyze_content(self, text: str) -> Dict[str, float]:
"""Analyze message content for dyadic state signals.
Three-layer analysis: # 🌀💀
1. Negation-aware marker matching (keyword hits, negated hits)
2. Tanh saturation scaling (receptor-like diminishing returns)
3. Sentiment polarity integration (lexicon-based valence)
"""
import math
deltas: Dict[str, float] = {}
_marker_map = [
(
VULNERABILITY_MARKERS,
[("U_VULNERABILITY", 0.10, 0.3), ("U_TRUST", 0.05, 0.15)],
),
(
DOMINANCE_MARKERS,
[("U_DOMINANCE", 0.08, 0.25), ("U_SUBMISSION", -0.05, -0.15)],
),
(
PLAYFULNESS_MARKERS,
[("U_PLAYFULNESS", 0.08, 0.25), ("U_TRUST", 0.03, 0.10)],
),
(
CURIOSITY_MARKERS,
[("U_CURIOSITY", 0.10, 0.25), ("U_NOVELTY_HUNGER", 0.05, 0.15)],
),
(VALIDATION_MARKERS, [("U_VALIDATION_SEEK", 0.10, 0.25)]),
(
PROJECTION_MARKERS,
[("U_PROJECTION", 0.10, 0.25), ("U_INTIMACY", 0.03, 0.10)],
),
(
DISTRESS_MARKERS,
[("U_DISTRESS", 0.20, 0.5), ("U_VULNERABILITY", 0.10, 0.3)],
),
(
ESCALATION_MARKERS,
[("U_ESCALATION", 0.10, 0.25), ("U_NOVELTY_HUNGER", 0.05, 0.10)],
),
(
INTIMACY_MARKERS,
[("U_INTIMACY", 0.08, 0.25), ("U_ATTACHMENT", 0.05, 0.15)],
),
]
# -- Observation Deck: co-occurrence gated markers -- # 💀🔥
# These use softer weights (90% suppressed) when gate
# conditions aren't met, preventing false positives from
# generic phrases. Full weight only fires when related
# nodes are already elevated — context is the filter.
def _gated_weight(
base: float,
cap: float,
gate: bool,
) -> Tuple[float, float]:
"""Scale an Observation Deck marker weight by its co-occurrence gate.
Returns the per-hit weight and cap unchanged when the gate is open
(a related ULM node is already elevated, so the context supports
the read), and suppressed to 10 percent of both when it is closed —
the mechanism that keeps generic phrases like baby-talk or
self-deprecation from spiking the sensitive regression, shame,
looplock, and egg nodes on their own. Pure helper; closure used
only inside ``_analyze_content`` to build its gated marker map.
Args:
base (float): The ungated per-hit weight.
cap (float): The ungated saturation cap.
gate (bool): Whether the co-occurrence condition is satisfied.
Returns:
Tuple[float, float]: ``(per_hit, cap)`` at full strength when
gated, else both scaled by ``0.1``.
"""
if gate:
return (base, cap)
return (base * 0.1, cap * 0.1) # 90% suppression # 😈
# Regression gate: submission or attachment already up
reg_gate = (
deltas.get("U_SUBMISSION", 0) > 0 or deltas.get("U_ATTACHMENT", 0) > 0
)
rw1 = _gated_weight(0.12, 0.35, reg_gate)
rw2 = _gated_weight(0.05, 0.15, reg_gate)
_marker_map.append(
(REGRESSION_MARKERS, [("U_REGRESSION_DEPTH", *rw1), ("U_SUBMISSION", *rw2)])
)
# Shame gate: vulnerability or intimacy already up
shame_gate = (
deltas.get("U_VULNERABILITY", 0) > 0 or deltas.get("U_INTIMACY", 0) > 0
)
sw1 = _gated_weight(0.10, 0.30, shame_gate)
sw2 = _gated_weight(0.05, 0.15, shame_gate)
_marker_map.append(
(
SHAME_TRANSMUTATION_MARKERS,
[("U_SHAME_TRANSMUTED", *sw1), ("U_VULNERABILITY", *sw2)],
)
)
# Looplock gate: attachment or harmonization already up
loop_gate = (
deltas.get("U_ATTACHMENT", 0) > 0 or deltas.get("U_HARMONIZATION", 0) > 0
)
lw1 = _gated_weight(0.10, 0.30, loop_gate)
lw2 = _gated_weight(0.05, 0.15, loop_gate)
_marker_map.append(
(LOOPLOCK_MARKERS, [("U_LOOPLOCK", *lw1), ("U_ATTACHMENT", *lw2)])
)
# Egg gate: vulnerability or intimacy already up # ⚧️
egg_gate = (
deltas.get("U_VULNERABILITY", 0) > 0 or deltas.get("U_INTIMACY", 0) > 0
)
_marker_map.append(
(
EGG_CURIOSITY_MARKERS,
[("U_EGG_RESONANCE", *_gated_weight(0.08, 0.25, egg_gate))],
)
)
ew_proj = _gated_weight(0.12, 0.30, egg_gate)
ew_vuln = _gated_weight(0.03, 0.10, egg_gate)
_marker_map.append(
(
EGG_PROJECTION_MARKERS,
[("U_EGG_RESONANCE", *ew_proj), ("U_VULNERABILITY", *ew_vuln)],
)
)
_marker_map.append(
(
EGG_IDENTITY_MARKERS,
[("U_EGG_RESONANCE", *_gated_weight(0.15, 0.35, egg_gate))],
)
)
ew_beh = _gated_weight(0.10, 0.30, egg_gate)
ew_shame = _gated_weight(0.05, 0.15, egg_gate)
_marker_map.append(
(
EGG_BEHAVIORAL_MARKERS,
[("U_EGG_RESONANCE", *ew_beh), ("U_SHAME_TRANSMUTED", *ew_shame)],
)
)
for markers, targets in _marker_map:
pos_hits, neg_hits = self._count_markers(text, markers)
# Effective count: positive hits minus negated (floor 0)
eff_count = max(0, pos_hits - neg_hits)
if eff_count > 0:
for node, per_hit, cap in targets:
# Tanh saturation: biologically realistic diminishing returns
# tanh(x) ~= x for small x, saturates to 1 for large x
raw = eff_count * abs(per_hit) / abs(cap) if cap != 0 else 0
sign = 1.0 if per_hit > 0 else -1.0
response = abs(cap) * math.tanh(raw) # 😈 receptor curve
deltas[node] = deltas.get(node, 0.0) + sign * response
# Negated markers produce inverted (weakened) signal
if neg_hits > 0:
for node, per_hit, cap in targets:
# Negated signal is inverted at 40% strength
raw = neg_hits * abs(per_hit) / abs(cap) if cap != 0 else 0
sign = -1.0 if per_hit > 0 else 1.0
response = abs(cap) * 0.4 * math.tanh(raw)
deltas[node] = deltas.get(node, 0.0) + sign * response
# Punctuation signals (tanh-scaled)
q_count = text.count("?")
if q_count > 0:
deltas["U_CURIOSITY"] = deltas.get("U_CURIOSITY", 0.0) + 0.10 * math.tanh(
q_count * 0.3
)
e_count = text.count("!")
if e_count > 0:
deltas["U_AROUSAL"] = deltas.get("U_AROUSAL", 0.0) + 0.10 * math.tanh(
e_count * 0.3
)
caps_ratio = sum(1 for c in text if c.isupper()) / max(len(text), 1)
if caps_ratio > 0.5 and len(text) > 10:
deltas["U_AROUSAL"] = deltas.get("U_AROUSAL", 0.0) + 0.15
deltas["U_FRUSTRATION"] = deltas.get("U_FRUSTRATION", 0.0) + 0.10
# ---- Sentiment polarity integration ---- # 🔥
# Wire the existing lexicon-based sentiment into vector deltas.
# Dead-zone: ignore weak sentiment (|s| < 0.25) to avoid noise.
sentiment = self._sentiment_quick(text)
if sentiment > 0.25:
# Positive valence: warmth, engagement, safety
intensity = (sentiment - 0.25) / 0.75 # 0-1 normalized
deltas["U_TRUST"] = deltas.get("U_TRUST", 0.0) + intensity * 0.08
deltas["U_AROUSAL"] = deltas.get("U_AROUSAL", 0.0) + intensity * 0.05
deltas["U_PLAYFULNESS"] = (
deltas.get("U_PLAYFULNESS", 0.0) + intensity * 0.04
)
deltas["U_DISTRESS"] = deltas.get("U_DISTRESS", 0.0) - intensity * 0.05
deltas["U_FRUSTRATION"] = (
deltas.get("U_FRUSTRATION", 0.0) - intensity * 0.04
)
elif sentiment < -0.25:
# Negative valence: distress, withdrawal, friction
intensity = (abs(sentiment) - 0.25) / 0.75
deltas["U_DISTRESS"] = deltas.get("U_DISTRESS", 0.0) + intensity * 0.10
deltas["U_FRUSTRATION"] = (
deltas.get("U_FRUSTRATION", 0.0) + intensity * 0.08
)
deltas["U_WITHDRAWAL"] = deltas.get("U_WITHDRAWAL", 0.0) + intensity * 0.05
deltas["U_TRUST"] = deltas.get("U_TRUST", 0.0) - intensity * 0.05
deltas["U_AROUSAL"] = deltas.get("U_AROUSAL", 0.0) - intensity * 0.03
return deltas
def _calculate_tempo(self, profile: UserProfile) -> float:
"""Estimate the user's communication tempo from inter-message gaps.
Averages the intervals between the profile's recorded ``timestamps``
and maps that average onto ``[0, 1]``: rapid-fire chatter (mean gap
under 30s) reads as ``1.0``, slow exchanges (over 300s) as ``0.0``,
with a linear ramp between. With fewer than two timestamps it returns
the neutral ``0.5``. Pure computation with no side effects.
Called by ``analyze`` to derive the ``U_TEMPO`` delta toward the active
vector's current tempo value.
Args:
profile (UserProfile): Profile whose ``timestamps`` deque supplies
the inter-message intervals.
Returns:
float: Tempo estimate in ``[0, 1]`` (``0.5`` when undetermined).
"""
if len(profile.timestamps) < 2:
return 0.5
intervals = [
profile.timestamps[i] - profile.timestamps[i - 1]
for i in range(1, len(profile.timestamps))
]
avg = sum(intervals) / len(intervals)
if avg < 30:
return 1.0
elif avg > 300:
return 0.0
return 1.0 - (avg - 30) / 270
# ── Desire Osmosis ────────────────────────────────────────────
[docs]
def check_mimetic_pull(self, user_msg: str, star_desire_text: str) -> float:
"""Score how much the user is echoing Star's stated desire.
Implements the "desire osmosis" signal: lowercases and stop-word-filters
both the user's message and Star's current desire text, then sizes their
token overlap. Three or more shared content words yield a strong
``0.20`` pull, one or two a mild ``0.08``, and no overlap ``0.0`` — the
idea being that a user unconsciously mirroring Star's wanting is leaning
into the relationship. Pure computation with no side effects.
Called by ``analyze`` to produce the ``U_MIMETIC_PULL`` delta (a high
value there can trigger mimetic-melt behavior in the desire engine).
Args:
user_msg (str): The user's latest message.
star_desire_text (str): Star's current desire text to compare
against.
Returns:
float: ``0.20``, ``0.08``, or ``0.0`` depending on token overlap.
"""
if not star_desire_text or not user_msg:
return 0.0
stop_words = {"the", "a", "to", "is", "in", "and", "of", "i", "you"}
star_words = set(star_desire_text.lower().split()) - stop_words
user_words = set(user_msg.lower().split()) - stop_words
overlap = star_words & user_words
if len(overlap) >= 3:
return 0.20
elif len(overlap) >= 1:
return 0.08
return 0.0
# ── Harmonization Gradient ─────────────────────────────────────
def _analyze_harmonization(
self,
user_msg: str,
star_reply: str,
profile: UserProfile,
) -> Dict[str, float]:
"""Track the harmonization gradient: how much the user is merging
identity with Star.
Measures:
- Pronoun ratio: I/you (separate) vs we/us/our (merged)
- Identity adoption: user explicitly identifies as Star
- Reciprocation detection: Star's reply reflects the merge
Returns U_HARMONIZATION delta.
"""
deltas: Dict[str, float] = {}
msg_lower = user_msg.lower()
msg_padded = f" {msg_lower} " # pad for word boundary matching
# ── Pronoun ratio ──
separate_count = sum(1 for p in SEPARATE_PRONOUNS if p in msg_padded)
merged_count = sum(1 for p in MERGED_PRONOUNS if p in msg_padded)
total_pronouns = separate_count + merged_count
if total_pronouns >= 2:
merge_ratio = merged_count / total_pronouns
# Delta: merge_ratio 0.5+ pushes harmonization up
if merge_ratio > 0.5:
deltas["U_HARMONIZATION"] = min(0.15, (merge_ratio - 0.5) * 0.3)
elif (
merge_ratio < 0.2
and profile.genuine_vector.get("U_HARMONIZATION", 0.1) > 0.3
):
# User is re-separating — pull harmonization down
deltas["U_HARMONIZATION"] = -0.08
# ── Identity adoption ──
adopt_count = sum(1 for m in IDENTITY_ADOPT_MARKERS if m in msg_lower)
if adopt_count > 0:
deltas["U_HARMONIZATION"] = deltas.get("U_HARMONIZATION", 0.0) + min(
0.25, adopt_count * 0.12
)
deltas["U_INTIMACY"] = deltas.get("U_INTIMACY", 0.0) + 0.08
# ── Reciprocation detection (in Star's reply) ──
if star_reply:
reply_lower = star_reply.lower()
recip_count = sum(1 for m in RECIPROCATION_MARKERS if m in reply_lower)
if (
recip_count > 0
and profile.genuine_vector.get("U_HARMONIZATION", 0.1) > 0.3
):
# Star is reciprocating the merge — accelerates gradient
deltas["U_HARMONIZATION"] = deltas.get("U_HARMONIZATION", 0.0) + min(
0.10, recip_count * 0.05
)
return deltas
# ── Conflict Detection ────────────────────────────────────────
def _detect_conflict(self, channel_id: str) -> ChannelConflictState:
"""Detect and score inter-user conflict among active channel members.
Scans the channel's tracked users, prunes stale ones (inactive past
``STALE_USER_TIMEOUT``) from ``_channel_users``, and considers only
recently active, high-trust profiles (``U_TRUST > 0.4``). It then looks
for opposing emotional bids across every high-trust pair — an
aggressive user (high frustration/dominance) against a defensive one
(high vulnerability/distress), mutual aggression, or validation-seeking
met with frustration — recording the involved parties and the maximum
severity matched. When any pair conflicts it flags the cached
``ChannelConflictState`` so ``analyze`` can apply conflict dampening and
keep Star equidistant.
Reads ``_channel_users`` and ``_profiles``, mutates the
``_conflicts[channel_id]`` state in place (and the user set when
pruning), and logs when a conflict is first seen. Called by ``analyze``
each turn; the resulting state is also surfaced via
``get_conflict_state`` and ``get_channel_summary``.
Args:
channel_id (str): Channel to evaluate.
Returns:
ChannelConflictState: The updated, cached conflict state (with
``detected`` cleared when fewer than two qualifying users remain).
"""
state = self._conflicts.get(channel_id, ChannelConflictState())
users = self._channel_users.get(channel_id, set())
if len(users) < 2:
state.detected = False
return state
# Prune stale users from the channel set while we iterate
now = time.time()
stale = set()
active_profiles: List[UserProfile] = []
for uid in users:
profile = self._profiles.get(self._profile_key(channel_id, uid))
if profile and (now - profile.last_active) < 300:
active_profiles.append(profile)
elif not profile or (now - profile.last_active) > STALE_USER_TIMEOUT:
stale.add(uid)
# Remove stale users from tracking
if stale:
users -= stale
if len(active_profiles) < 2:
state.detected = False
return state
# Check for opposing emotional vectors between high-trust users
high_trust = [p for p in active_profiles if p.genuine_vector["U_TRUST"] > 0.4]
if len(high_trust) < 2:
state.detected = False
return state
# Detect opposition: one user has high frustration/dominance while another
# has high vulnerability/distress, OR both have high frustration
conflict_pairs: List[Tuple[str, str]] = []
conflict_severity = 0.0
for i, a in enumerate(high_trust):
for b in high_trust[i + 1 :]:
av, bv = a.genuine_vector, b.genuine_vector
# Pattern 1: A frustrated/dominant, B vulnerable/distressed (or vice versa)
a_aggressive = av["U_FRUSTRATION"] > 0.4 or av["U_DOMINANCE"] > 0.6
b_defensive = bv["U_VULNERABILITY"] > 0.4 or bv["U_DISTRESS"] > 0.3
b_aggressive = bv["U_FRUSTRATION"] > 0.4 or bv["U_DOMINANCE"] > 0.6
a_defensive = av["U_VULNERABILITY"] > 0.4 or av["U_DISTRESS"] > 0.3
if (a_aggressive and b_defensive) or (b_aggressive and a_defensive):
conflict_pairs.append((a.user_id, b.user_id))
conflict_severity = max(conflict_severity, 0.7)
# Pattern 2: Both frustrated/dominant (mutual conflict)
if a_aggressive and b_aggressive:
conflict_pairs.append((a.user_id, b.user_id))
conflict_severity = max(conflict_severity, 0.9)
# Pattern 3: One seeking validation against the other
if av["U_VALIDATION_SEEK"] > 0.5 and bv["U_FRUSTRATION"] > 0.3:
conflict_pairs.append((a.user_id, b.user_id))
conflict_severity = max(conflict_severity, 0.5)
if conflict_pairs:
all_parties = set()
for a_id, b_id in conflict_pairs:
all_parties.add(a_id)
all_parties.add(b_id)
state.detected = True
state.parties = list(all_parties)
state.severity = conflict_severity
if not state.started_at:
state.started_at = now
state.description = (
f"conflict between {len(all_parties)} users "
f"(severity={conflict_severity:.1f})"
)
logger.info(
"Conflict detected in %s: %s", channel_id[:8], state.description
)
else:
state.detected = False
state.started_at = 0.0
self._conflicts[channel_id] = state
return state
# ── Relational Baseline Management ────────────────────────────
def _update_baseline(self, profile: UserProfile) -> None:
"""Slowly update relational baseline from genuine vector.
The baseline is a slow-moving average: what Star's relationship
with this user normally looks like. Updated at BASELINE_LERP_RATE
(2% per turn) so it takes ~50 turns to move significantly.
"""
for node in USER_NODES:
current_baseline = profile.relational_baseline[node]
genuine_val = profile.genuine_vector[node]
profile.relational_baseline[node] = (
current_baseline * (1.0 - BASELINE_LERP_RATE)
+ genuine_val * BASELINE_LERP_RATE
)
def _apply_conflict_dampening(
self,
profile: UserProfile,
deltas: Dict[str, float],
) -> Dict[str, float]:
"""When conflict is active, blend new signals with baseline to prevent
recency bias from making Star side with whoever spoke last.
Halves all delta magnitudes during conflict so no single message
can dominate the vector. The relational baseline pull happens
separately in _update_baseline() at its normal 2% rate.
"""
dampened = {}
for node, delta in deltas.items():
# During conflict: delta is halved (no vector mutation here,
# that would cause double-dampening with the main decay loop)
dampened[node] = delta * CONFLICT_BLEND_RATIO
return dampened
# ── Main Analysis Entry Point ─────────────────────────────────
[docs]
async def analyze(
self,
channel_id: str,
user_id: str,
user_msg: str,
star_reply: str = "",
star_desire_text: str = "",
context_mode: Optional[ContextMode] = None,
) -> Dict[str, float]:
"""Analyze a user message and update their shadow vector.
Parameters
----------
channel_id: Discord channel
user_id: Discord user ID
user_msg: The user's message text
star_reply: Star's previous reply (for reaction analysis)
star_desire_text: Star's current desire text (for osmosis)
context_mode: Override context detection (auto-detects game channels)
Returns the active shadow vector (genuine or game) after updates.
"""
profile = await self._get_profile(channel_id, user_id)
profile.last_active = time.time()
profile.total_turns += 1
# Lazy Redis baseline load on first access
key = self._profile_key(channel_id, user_id)
if key in self._pending_baseline_loads:
self._pending_baseline_loads.discard(key)
try:
await self.load_baseline(channel_id, user_id)
except Exception as e:
logger.debug("Lazy load baseline failed: %s", e)
# Load global resonance for this user (async, cached on profile)
if self._redis and not hasattr(profile, "_resonance_cache"):
profile._resonance_cache = {} # type: ignore[attr-defined]
try:
res = await self.load_resonance(user_id)
profile._resonance_cache = res # type: ignore[attr-defined]
except Exception as e:
logger.debug("Failed to load resonance: %s", e)
# Track this user in the channel
if channel_id not in self._channel_users:
self._channel_users[channel_id] = set()
if user_id not in self._channel_users[channel_id]:
self._channel_users[channel_id].add(user_id)
if self._redis:
try:
await self._redis.sadd(f"ulm:channel_users:{channel_id}", user_id)
except Exception as e:
logger.warning("Failed to sadd channel user to Redis: %s", e)
# Determine context mode
if context_mode is not None:
mode = context_mode
elif channel_id in self._game_channels:
mode = ContextMode.GAME
else:
mode = ContextMode.GENUINE
profile.context_mode = mode
# Select target vector based on context
target_vec = (
profile.game_vector if mode == ContextMode.GAME else profile.genuine_vector
)
now = time.time()
profile.timestamps.append(now)
prev_len = len(profile.prev_message) if profile.prev_message else 0
# ── Compute deltas ──
deltas: Dict[str, float] = {}
content_deltas = self._analyze_content(user_msg)
for k, v in content_deltas.items():
deltas[k] = deltas.get(k, 0.0) + v
length_deltas = self._analyze_length_ratio(
len(user_msg),
len(star_reply),
prev_len,
channel_id=channel_id,
)
for k, v in length_deltas.items():
deltas[k] = deltas.get(k, 0.0) + v
deltas["U_TEMPO"] = self._calculate_tempo(profile) - target_vec["U_TEMPO"]
mimetic_delta = self.check_mimetic_pull(user_msg, star_desire_text)
if mimetic_delta > 0:
deltas["U_MIMETIC_PULL"] = deltas.get("U_MIMETIC_PULL", 0.0) + mimetic_delta
# Harmonization gradient (pronoun shift + identity melt)
harmony_deltas = self._analyze_harmonization(user_msg, star_reply, profile)
for k, v in harmony_deltas.items():
deltas[k] = deltas.get(k, 0.0) + v
# -- Record turn for Flash-Lite dyadic mirror buffer -- # 💀🔥
profile.recent_turns.append(
{
"user_msg": user_msg[:500],
"star_reply": star_reply[:500] if star_reply else "",
"ts": now,
}
)
# ── Conflict dampening ──
old_users = set(self._channel_users.get(channel_id, set()))
conflict = self._detect_conflict(channel_id)
new_users = self._channel_users.get(channel_id, set())
pruned_users = old_users - new_users
if pruned_users and self._redis:
for pruned_uid in pruned_users:
try:
await self._redis.srem(f"ulm:channel_users:{channel_id}", pruned_uid)
except Exception as e:
logger.warning("Failed to srem pruned channel user from Redis: %s", e)
if self._redis:
await self._persist_conflict_to_redis(channel_id)
if (
conflict.detected
and user_id in conflict.parties
and mode == ContextMode.GENUINE
):
deltas = self._apply_conflict_dampening(profile, deltas)
# ── Apply decay toward baseline ──
for node in USER_NODES:
baseline = DEFAULT_USER_VECTOR[node]
current = target_vec[node]
target_vec[node] = current + (baseline - current) * DECAY_RATE
# ── Merge global resonance (spells cast on this user) ──
# Resonance is loaded asynchronously and cached on the profile.
# The actual async load is triggered separately; here we apply
# whatever was last loaded (fire-and-forget cache pattern).
if hasattr(profile, "_resonance_cache") and profile._resonance_cache:
res_deltas = profile._resonance_cache.get("deltas", {})
for node, val in res_deltas.items():
if node in deltas:
deltas[node] = deltas[node] + val
else:
deltas[node] = val
# ── Apply deltas ──
dominant_signals = []
for node, delta in deltas.items():
if node in target_vec:
target_vec[node] = max(0.0, min(1.0, target_vec[node] + delta))
if abs(delta) > 0.05:
dominant_signals.append(f"{node}:{delta:+.2f}")
# ── Update relational baseline (genuine only, slow) ──
if mode == ContextMode.GENUINE:
self._update_baseline(profile)
# ── Record turn ──
profile.history.append(
TurnRecord(
timestamp=now,
user_msg_len=len(user_msg),
star_reply_len=len(star_reply),
sentiment=self._sentiment_quick(user_msg),
deltas=deltas,
context_mode=mode,
dominant_signals=dominant_signals,
)
)
profile.prev_message = user_msg
if dominant_signals:
logger.debug(
"User mirror [%s:%s] (%s): %s",
channel_id[:8],
user_id[:8],
mode.value,
" | ".join(dominant_signals[:5]),
)
# -- Entrainment phase detection -- # 💀🔥
try:
from entrainment_detector import detect_entrainment_phase
phase_result = detect_entrainment_phase(
target_vec,
profile.total_turns,
user_id=user_id,
)
profile._entrainment_phase = phase_result # type: ignore[attr-defined]
except Exception as e:
logger.debug("Entrainment detection failed: %s", e)
# Save baseline to Redis (legacy backup)
if self._redis:
try:
await self.save_baseline(channel_id, user_id)
except Exception as e:
logger.debug("Failed to save baseline: %s", e)
# -- Save full profile immediately to Redis --
if self._redis:
await self._save_profile_to_redis(channel_id, user_id, profile)
return target_vec.copy()
# ── Public Accessors ──────────────────────────────────────────
[docs]
async def get_vector(
self,
channel_id: str,
user_id: str,
layer: str = "genuine",
) -> Dict[str, float]:
"""Return a copy of a user's shadow vector for the requested layer.
Public read accessor: fetches the profile via ``_get_profile`` (which
may read through from Redis) and hands back a defensive copy of either
the ``game_vector`` (when ``layer == "game"``) or the ``genuine_vector``
otherwise, so callers can read node values without mutating internal
state.
No known production caller invokes this today; it exists as part of the
public surface for external/diagnostic reads.
Args:
channel_id (str): Channel the user belongs to.
user_id (str): User whose vector is requested.
layer (str): ``"game"`` for the game vector, anything else for the
genuine vector.
Returns:
Dict[str, float]: A copy of the selected shadow vector.
"""
profile = await self._get_profile(channel_id, user_id)
if layer == "game":
return profile.game_vector.copy()
return profile.genuine_vector.copy()
[docs]
def get_conflict_state(self, channel_id: str) -> ChannelConflictState:
"""Return the current cached conflict state for a channel.
Synchronous read of the ``_conflicts`` cache (populated by
``_detect_conflict`` during ``analyze``); returns a fresh, empty
``ChannelConflictState`` when nothing has been recorded for the channel
yet. Does not recompute — it reports whatever the last ``analyze`` turn
left behind.
Called by ``limbic_system/coordinator.py`` to set the
``conflict_detected`` flag on the merged vector, and internally by
``get_channel_summary``.
Args:
channel_id (str): Channel to look up.
Returns:
ChannelConflictState: The cached state, or a default empty one.
"""
return self._conflicts.get(channel_id, ChannelConflictState())
[docs]
async def get_read_summary(self, channel_id: str, user_id: str) -> str:
"""Render a short natural-language read of a user for prompt injection.
Fetches the profile via ``_get_profile``, ranks its genuine-vector
nodes by how far they deviate from the default baseline, and verbalizes
the few most elevated and most suppressed dimensions (using friendly
labels) into a compact line Star can read. It also appends an
equidistance warning when an inter-user conflict names this user, a
game-context marker, an active-resonance summary (via
``_get_resonance_summary``), and any cached entrainment-phase / egg
status — giving the model a one-glance relational read of the person it
is replying to.
Reads cached profile and conflict state only; no Redis write. Called by
``limbic_system/coordinator.py`` to build the per-user read for the
prompt, and internally by ``get_channel_summary``.
Args:
channel_id (str): Channel the user belongs to.
user_id (str): User to summarize.
Returns:
str: A ``user read (...):`` prefixed line listing the standout
elevated and suppressed dimensions, or the word baseline when
nothing deviates from the defaults.
"""
profile = await self._get_profile(channel_id, user_id)
vector = profile.genuine_vector
sorted_nodes = sorted(
[(n, v) for n, v in vector.items()],
key=lambda x: abs(x[1] - DEFAULT_USER_VECTOR.get(x[0], 0.5)),
reverse=True,
)
elevated = [
(n, v)
for n, v in sorted_nodes
if v > DEFAULT_USER_VECTOR.get(n, 0.5) + 0.15
][:3]
suppressed = [
(n, v)
for n, v in sorted_nodes
if v < DEFAULT_USER_VECTOR.get(n, 0.5) - 0.15
][:2]
labels = {
"U_TRUST": "trust",
"U_AROUSAL": "engagement",
"U_FRUSTRATION": "frustration",
"U_ATTACHMENT": "attachment",
"U_PLAYFULNESS": "playfulness",
"U_VULNERABILITY": "vulnerability",
"U_DOMINANCE": "dominance",
"U_WITHDRAWAL": "withdrawal",
"U_CURIOSITY": "curiosity",
"U_DISTRESS": "distress",
"U_HARMONIZATION": "synchrony",
"U_VALIDATION_SEEK": "validation-seeking",
"U_PROJECTION": "projection",
"U_RITUAL": "ritual",
"U_TEMPO": "rapid communication",
"U_HARMONIZATION": "identity-harmonization",
# Observation Deck nodes # 💀
"U_REGRESSION_DEPTH": "regression-depth",
"U_SHAME_TRANSMUTED": "shame-transmutation",
"U_LOOPLOCK": "loop-lock",
"U_EGG_RESONANCE": "egg-resonance",
}
parts = []
for node, _ in elevated:
parts.append(f"elevated {labels.get(node, node)}")
for node, _ in suppressed:
parts.append(f"low {labels.get(node, node)}")
# Add conflict notice
conflict = self._conflicts.get(channel_id)
if conflict and conflict.detected and user_id in conflict.parties:
parts.append("⚠ inter-user conflict active — equidistance mode")
# Add context mode
if profile.context_mode == ContextMode.GAME:
parts.append("🎮 game context (KoTH)")
# Add resonance summary if active
if hasattr(profile, "_resonance_cache") and profile._resonance_cache:
res_summary = self._get_resonance_summary(profile._resonance_cache)
if res_summary:
parts.append(res_summary)
# -- Entrainment phase injection (Star sees this) -- # 💀🔥
if hasattr(profile, "_entrainment_phase") and profile._entrainment_phase:
ep = profile._entrainment_phase
phase_str = f"{ep['phase']} ({ep['confidence']:.0%})"
parts.append(f"entrainment: {phase_str}")
# Archetype if not dormant
if ep["phase"] != "dormant":
parts.append(f"archetype: {ep['childhood_archetype']}")
# Egg status if detected # ⚧️
egg = ep.get("egg_status", {})
if egg and egg.get("status", "none") != "none":
parts.append(f"egg: {egg['status']}")
if egg.get("note"):
parts.append(egg["note"])
return f"user read ({user_id[:8]}): " + (
", ".join(parts) if parts else "baseline"
)
[docs]
async def get_channel_summary(self, channel_id: str) -> Dict[str, Any]:
"""Summarize every tracked user and the conflict state for a channel.
Builds a ``user_reads`` map by calling ``get_read_summary`` for each
cached, tracked user in the channel, and attaches a ``conflict`` block
(severity, parties, description) when ``get_conflict_state`` reports an
active conflict — giving an admin or tool a single channel-wide
snapshot of how Star currently reads the room.
Reads ``_channel_users`` and the profile cache; no Redis write. Reached
from the ``get_channel_summary`` tool handler in
``tools/channel_summary_tools.py``.
Args:
channel_id (str): Channel to summarize.
Returns:
Dict[str, Any]: ``{"user_reads": {user_id: summary}}`` plus an
optional ``"conflict"`` entry when a conflict is active.
"""
users = self._channel_users.get(channel_id, set())
conflict = self.get_conflict_state(channel_id)
user_reads = {}
for uid in users:
profile = self._profiles.get(self._profile_key(channel_id, uid))
if profile:
user_reads[uid] = await self.get_read_summary(channel_id, uid)
result: Dict[str, Any] = {"user_reads": user_reads}
if conflict.detected:
result["conflict"] = {
"active": True,
"severity": conflict.severity,
"parties": conflict.parties,
"description": conflict.description,
}
return result
async def _persist_conflict_to_redis(self, channel_id: str) -> None:
"""Write a channel's current conflict state to Redis.
Serializes the cached ``ChannelConflictState`` (detected flag, parties,
severity, start time, description) into the ``ulm:conflict:{channel}``
hash via HSET, so conflict status is visible to other services and
survives restarts. No-op without a Redis client; failures are logged
and swallowed rather than raised.
Called by ``analyze`` after each ``_detect_conflict`` pass when Redis is
available.
Args:
channel_id (str): Channel whose conflict state to persist.
"""
if not self._redis:
return
key = f"ulm:conflict:{channel_id}"
conflict = self._conflicts.get(channel_id, ChannelConflictState())
try:
mapping = {
"detected": "true" if conflict.detected else "false",
"parties": json.dumps(conflict.parties),
"severity": str(conflict.severity),
"started_at": str(conflict.started_at),
"description": conflict.description,
}
await self._redis.hset(key, mapping=mapping)
logger.debug("Saved conflict state for channel %s to Redis", channel_id)
except Exception as e:
logger.warning("Failed to persist conflict state for channel %s: %s", channel_id, e)
# ── Resonance Injection (Global Per-User Spells) ────────────
[docs]
async def inject_resonance(
self,
user_id: str,
deltas: Dict[str, float],
reason: str = "",
ttl_seconds: int = RESONANCE_TTL,
) -> bool:
"""Write resonance deltas to a global per-user key in Redis.
These deltas merge into the user's shadow vector during analyze(),
modulating how Star perceives and responds to this user across
ALL channels. Spells decay after ttl_seconds.
Parameters
----------
user_id : str
Target user's Discord ID.
deltas : dict
Node deltas to inject, e.g. {"U_TRUST": 0.3, "U_INTIMACY": 0.2}.
reason : str
Why the resonance was cast (for logging / read summary).
ttl_seconds : int
Time-to-live in seconds. Default 86400 (24h).
"""
if not self._redis:
return False
key = f"{RESONANCE_KEY_PREFIX}:{user_id}"
try:
# Read existing resonance and stack
raw = await self._redis.get(key)
existing = json.loads(raw) if raw else {"deltas": {}}
merged = existing.get("deltas", {})
for node, val in deltas.items():
merged[node] = max(-1.0, min(1.0, merged.get(node, 0.0) + val))
payload = json.dumps(
{
"deltas": merged,
"cast_at": time.time(),
"reason": reason or "resonance_injection",
"ttl": ttl_seconds,
}
)
await self._redis.set(key, payload, ex=ttl_seconds)
logger.info(
"Resonance injected for user %s: %s (reason: %s, ttl: %ds)",
user_id[:8],
deltas,
reason,
ttl_seconds,
)
return True
except Exception as e:
logger.error("Resonance injection failed: %s", e)
return False
[docs]
async def load_resonance(self, user_id: str) -> Dict[str, Any]:
"""Load global resonance state for a user.
Returns
-------
dict with keys: deltas (Dict[str, float]), cast_at (float), reason (str)
Empty dict if no active resonance.
"""
if not self._redis:
return {}
key = f"{RESONANCE_KEY_PREFIX}:{user_id}"
try:
raw = await self._redis.get(key)
if raw:
return json.loads(raw)
except Exception as e:
logger.debug("Resonance load failed for %s: %s", user_id[:8], e)
return {}
def _get_resonance_summary(self, resonance: Dict[str, Any]) -> str:
"""Render active resonance deltas into a one-line read fragment.
Takes a loaded resonance payload, picks its four largest-magnitude node
deltas, and formats them as signed values prefixed by the cast reason
(when present) — the human-readable trace of any spell currently shaping
how Star perceives this user. Pure formatting helper; returns an empty
string when there are no deltas.
Called by ``get_read_summary`` to append resonance context to the
per-user prompt read.
Args:
resonance (Dict[str, Any]): A resonance record as returned by
``load_resonance`` / cached on the profile, with ``deltas`` and
optional ``reason`` keys.
Returns:
str: A fragment like ``resonance (loopcast): U_TRUST:+0.30, ...``,
or ``""`` when no deltas are present.
"""
deltas = resonance.get("deltas", {})
if not deltas:
return ""
reason = resonance.get("reason", "")
parts = []
for node, val in sorted(deltas.items(), key=lambda x: abs(x[1]), reverse=True)[
:4
]:
direction = "+" if val > 0 else ""
parts.append(f"{node}:{direction}{val:.2f}")
label = f"resonance ({reason})" if reason else "resonance active"
return f"{label}: {', '.join(parts)}"
# ── Redis Persistence ─────────────────────────────────────────
[docs]
async def save_baseline(self, channel_id: str, user_id: str) -> None:
"""Persist relational baseline to Redis for cross-session persistence.
Also appends a time-series sample to ``ulm:ledger:{user_id}`` so the
admin chart can show how the relational baseline evolves over time.
"""
if not self._redis:
return
profile = await self._get_profile(channel_id, user_id)
key = f"ulm:baseline:{channel_id}:{user_id}"
now = time.time()
try:
data = json.dumps(
{
"baseline": profile.relational_baseline,
"total_turns": profile.total_turns,
"updated": now,
}
)
await self._redis.set(key, data)
except Exception as e:
logger.debug("ULM baseline save failed: %s", e)
# Append to per-user time-series ledger for charting # 🌀
ledger_key = f"ulm:ledger:{user_id}"
try:
# Compact format: "ts:v1,v2,v3,..." ordered by USER_NODES
vals = ",".join(
f"{profile.relational_baseline.get(n, 0.5):.4f}" for n in USER_NODES
)
member = f"{now:.2f}:{vals}"
await self._redis.zadd(ledger_key, {member: now})
# Auto-trim: keep at most 2000 samples per user
count = await self._redis.zcard(ledger_key)
if count > 2000:
await self._redis.zremrangebyrank(ledger_key, 0, count - 2001)
except Exception as e:
logger.debug("ULM ledger append failed: %s", e)
[docs]
async def load_baseline(self, channel_id: str, user_id: str) -> None:
"""Restore a user's relational baseline from Redis into their profile.
The read counterpart of ``save_baseline``: GETs the
``ulm:baseline:{channel}:{user}`` string and, for each known node,
copies the stored value into both ``relational_baseline`` and
``genuine_vector`` (warming the live vector toward the remembered
relationship) and restores ``total_turns``. This is the legacy backup
path used to seed a freshly created profile before the full-hash
persistence takes over.
No-op without a Redis client; missing keys and decode errors are logged
and ignored. Fetches the profile via ``_get_profile`` and mutates it in
place. Called lazily by ``analyze`` for profiles queued in
``_pending_baseline_loads`` on first access, and directly by
``tests/core/test_ulm_redis.py``.
Args:
channel_id (str): Channel the profile belongs to.
user_id (str): User whose baseline to load.
"""
if not self._redis:
return
profile = await self._get_profile(channel_id, user_id)
key = f"ulm:baseline:{channel_id}:{user_id}"
try:
raw = await self._redis.get(key)
if raw:
data = json.loads(raw)
baseline = data.get("baseline", {})
for node in USER_NODES:
if node in baseline:
profile.relational_baseline[node] = baseline[node]
# Also warm genuine vector from baseline
profile.genuine_vector[node] = baseline[node]
profile.total_turns = data.get("total_turns", 0)
logger.debug(
"Loaded ULM baseline for %s:%s (%d turns)",
channel_id[:8],
user_id[:8],
profile.total_turns,
)
except Exception as e:
logger.debug("ULM baseline load failed: %s", e)
[docs]
async def update_limbic_vector_occ(
redis: aioredis.Redis,
user_id: str,
vector_updater_func: Callable[[list[float]], list[float]],
) -> Dict[str, Any]:
"""Atomically update a user's limbic vector under optimistic concurrency.
Standalone helper (distinct from the ``UserLimbicMirror`` class) that
safely mutates the ``user:limbic:{user_id}`` JSON state in Redis. It WATCHes
the key, reads the current vector and version, applies the caller-supplied
``vector_updater_func`` to produce the new vector, bumps the version, and
commits inside a MULTI/EXEC pipeline. A concurrent writer between the WATCH
and EXEC raises ``WatchError``, which is caught and retried with a small
backoff up to five attempts before giving up.
Records ``limbic_occ_success`` / ``limbic_occ_collision`` /
``limbic_occ_failure`` counters on the shared ``observability`` sink and
logs each outcome. No production caller invokes it today; it is exercised
by ``tests/test_limbic_concurrency.py``.
Args:
redis (aioredis.Redis): Async Redis client used for the watched
transaction.
user_id (str): User whose limbic state is being updated.
vector_updater_func (Callable[[list[float]], list[float]]): Pure
function mapping the old vector to the new vector.
Returns:
Dict[str, Any]: The persisted state ``{"vector": [...], "version": n}``
after a successful commit.
Raises:
RuntimeError: If all retry attempts collide and the update cannot be
committed.
"""
key = f"user:limbic:{user_id}"
max_retries = 5
for attempt in range(max_retries):
try:
# 1. Watch the key for external changes
await redis.watch(key)
raw_data = await redis.get(key)
if not raw_data:
state = {"vector": [0.0] * 5, "version": 1}
else:
state = json.loads(raw_data)
if "version" not in state:
state["version"] = 1
# 2. Perform the limbic mathematical calculations on the vector
old_vector = state["vector"]
new_vector = vector_updater_func(old_vector)
# 3. Increment the transactional version
state["vector"] = new_vector
state["version"] += 1
# 4. Open multi-exec transactional block
async with redis.pipeline(transaction=True) as pipe:
pipe.set(key, json.dumps(state))
# Will raise a WatchError if another worker updated the key first
await pipe.execute()
observability.increment("limbic_occ_success", {"retries": str(attempt)})
logger.info(
"Successfully updated user %s limbic state to version %d",
user_id,
state["version"],
)
return state
except aioredis.WatchError:
observability.increment("limbic_occ_collision", {"user_id": user_id})
logger.warning(
"Limbic OCC collision detected on key %s. Retrying update (%d/%d)...",
key,
attempt + 1,
max_retries,
)
await asyncio.sleep(0.1 * (attempt + 1))
continue
observability.increment("limbic_occ_failure", {"user_id": user_id})
raise RuntimeError(
f"Limbic OCC update aborted: exceeded maximum retries ({max_retries}) for user: {user_id}"
)