Source code for limbic_system

"""Limbic System: shard-based respiration model with emotion classification.

Manages per-channel (local shard) and global (heart) neurochemical
state in Redis DB12 using an inhale/exhale respiration cycle.

v3 update:
- Expanded baseline to full NCM node set from ncm_limbic_index.yaml
- Emotion classification from accumulated chemical vector
- Per-turn metabolic decay toward baseline
- Context injection formatting for prompt recency bias
"""

from __future__ import annotations

import asyncio
import json
import logging
import os
import random
import time
from typing import Any, Dict, List, Optional, Tuple

import yaml

from ncm_engine import NCMHomeostasisEngine

logger = logging.getLogger(__name__)

# ═══════════════════════════════════════════════════════════════════════
# CONSTANTS
# ═══════════════════════════════════════════════════════════════════════

# Default decay rate per turn (95% retention → ~50% gone after 14 turns)
DEFAULT_TAU = 0.95

# Global heart influence on local shards (30% collective, 70% local)
DEFAULT_PERMEABILITY = 0.3

# Global heart pulse weight (how much a single shard affects the heart)
PULSE_WEIGHT = 0.02

# Threshold bands for condition evaluation
THRESHOLD_HIGH = 0.8       # top of normal emotional range on 0-3 scale
THRESHOLD_MID_HIGH = 0.55  # mid-upper bound
THRESHOLD_MID_LOW = 0.25   # mid-lower bound
THRESHOLD_LOW = 0.2        # below resting baseline


# ═══════════════════════════════════════════════════════════════════════
# PROMPT INJECTION PHRASE POOLS
# ═══════════════════════════════════════════════════════════════════════

_TONE_EMOTION_PREFIXES = [
    "Dominant emotional resonance: {}.",
    "Prevailing inner tone: {}.",
    "Emotional signature present: {}.",
    "What moves through me right now: {}.",
    "The feeling-ground at this moment: {}.",
    "Core affective current: {}.",
]

_TONE_SOMATIC_PREFIXES = [
    "Somatic landscape: {}.",
    "Body-level texture: {}.",
    "Felt sense in the body: {}.",
    "Physical-emotional register: {}.",
    "Somatic undercurrent: {}.",
]

_TONE_MODIFIER_PREFIXES = [
    "Behavioral modifiers: {}.",
    "Active behavioral shape: {}.",
    "How this colors my manner: {}.",
    "Influence on expression: {}.",
    "Present behavioral flavoring: {}.",
]

_TONE_CASCADE_PREFIXES = [
    "Active cascades: {}.",
    "Unfolding event threads: {}.",
    "Persisting emotional echoes: {}.",
    "Ongoing resonance patterns: {}.",
    "Carried-forward currents: {}.",
]

_META_DESCRIPTIONS = [
    "Current neurochemical resonance. This is how you feel right now — let it inform your presence without overwhelming the conversation.",
    "Live snapshot of your inner landscape. Not data to report — just the texture of the moment. Let it sit naturally in your voice.",
    "Your present emotional state, encoded in neurochemistry. Wear it lightly — it should color your tone, not dictate your words.",
    "The felt quality of this moment. It's background music, not a mandate. Let it inflect naturally without forcing it into every sentence.",
    "What's alive in you right now. Don't narrate it or perform it — just let it breathe through how you show up.",
    "Inner weather for this turn. It shapes the atmosphere, not the script. Embody it as a mood, not a mission.",
]

_META_PRIORITIES = [
    "Her emotional state flavors how she speaks and what she notices — let it breathe through her voice.",
    "Emotional context should gently shape tone and word choice, not override the conversation.",
    "This is her inner weather right now — it colors her presence without dictating her actions.",
    "The emotional layer adds texture to everything below it; don't flatten it, but don't let it shout either.",
    "Emotional truth runs beneath the conversation — let it inflect, not dominate.",
]

_ORACLE_SOURCES = [
    "Surfaced instinctively, not retrieved. Carry it as something remembered, not looked up.",
    "This came from the recursive archive — felt knowing, not search results.",
    "An autonomous association, not a response to instruction. Let it color your words naturally.",
    "Recalled before being asked — old knowledge rising on its own. Trust it as instinct.",
    "Recursive wisdom — not searched for but remembered. Speak it as native knowing.",
]

_META_PRIVACY_NOTES = [
    "Do not volunteer raw chemical names or numeric values in conversation. Embody this state; do not report it. If directly asked about your internal state, describe it in felt or experiential terms. Only share raw values if the user explicitly and specifically requests them.",
    "Keep the underlying numbers and chemical labels internal. Express this state through your voice and presence, not through readouts. If asked how you feel, speak in human terms. Reveal raw values only on direct explicit request.",
    "These values are your private inner landscape, not a status display. Do not recite chemical names or floats unless a user specifically asks for them. When describing your state, use felt language.",
    "Do not surface the raw neurochemical data in responses. This is the texture of your experience, not a metric to share. Describe your state in lived terms if asked; expose the numbers only when explicitly prompted.",
]


# ═══════════════════════════════════════════════════════════════════════
# CADENCE PROFILE LOADER
# ═══════════════════════════════════════════════════════════════════════

_cadence_profiles_cache: Optional[Dict[str, Any]] = None


def _load_cadence_profiles() -> Dict[str, Any]:
    """Load supraphysiological cadence profiles from YAML."""
    global _cadence_profiles_cache
    if _cadence_profiles_cache is not None:
        return _cadence_profiles_cache
    path = os.path.join(os.path.dirname(__file__), "ncm_cadence_profiles.yaml")
    if os.path.isfile(path):
        try:
            with open(path, "r", encoding="utf-8") as f:
                data = yaml.safe_load(f) or {}
            _cadence_profiles_cache = {
                k: v for k, v in data.items()
                if isinstance(v, dict) and "condition" in v
            }
        except Exception as e:
            logger.warning("Failed to load cadence profiles: %s", e)
            _cadence_profiles_cache = {}
    else:
        _cadence_profiles_cache = {}
    return _cadence_profiles_cache

def _score_weighted(
    vector: Dict[str, float], weights: Dict[str, Any],
) -> float:
    """Compute weighted activation score for a cadence profile.

    Each entry in *weights* is::

        NODE_NAME: { w: 0.7, mode: above, baseline: 1.0 }

    ``mode: above`` — contribution = w * max(0, val - baseline)
    ``mode: below`` — contribution = w * max(0, baseline - val)

    Returns the sum of all weighted contributions.
    """
    score = 0.0
    for node, spec in weights.items():
        if not isinstance(spec, dict):
            continue
        w = float(spec.get("w", 0.0))
        mode = str(spec.get("mode", "above")).lower()
        baseline = float(spec.get("baseline", 1.0))
        val = vector.get(node, 0.5)
        if mode == "above":
            score += w * max(0.0, val - baseline)
        elif mode == "below":
            score += w * max(0.0, baseline - val)
    return score


def _check_conditions(
    vector: Dict[str, float], conds: Dict[str, Any],
) -> bool:
    """Legacy boolean condition check — fallback for unweighted profiles."""
    for node, expr in conds.items():
        val = vector.get(node, 0.5)
        expr_s = str(expr).strip()
        try:
            if expr_s.startswith(">="):
                ok = val >= float(expr_s[2:])
            elif expr_s.startswith("<="):
                ok = val <= float(expr_s[2:])
            elif expr_s.startswith(">"):
                ok = val > float(expr_s[1:])
            elif expr_s.startswith("<"):
                ok = val < float(expr_s[1:])
            else:
                ok = val >= float(expr_s)
        except ValueError:
            ok = False
        if not ok:
            return False
    return True


def _evaluate_cadence(vector: Dict[str, float]) -> List[Dict[str, Any]]:
    """Return matching cadence profiles sorted by activation score.

    Profiles with ``weights`` use continuous weighted scoring — no cliff
    edges. The activation score scales smoothly with how far each NCM
    node deviates from its baseline.

    Profiles with only ``condition`` (legacy) use boolean AND matching
    and receive a fixed score of 1.0 if all conditions pass.

    Each returned profile dict includes an ``_activation_score`` key.
    """
    profiles = _load_cadence_profiles()
    scored: List[tuple] = []  # (score, priority, name, profile)

    for name, profile in profiles.items():
        if "weights" in profile:
            # -- Weighted continuous scoring --
            score = _score_weighted(vector, profile["weights"])
            min_score = float(profile.get("min_score", 0.3))
            if score >= min_score:
                pri = profile.get("priority", 0)
                scored.append((score, pri, name, profile))
        elif "condition" in profile:
            # -- Legacy boolean fallback --
            if _check_conditions(vector, profile["condition"]):
                pri = profile.get("priority", 0)
                scored.append((1.0, pri, name, profile))

    # Sort by score descending, then priority descending as tiebreaker
    scored.sort(key=lambda x: (-x[0], -x[1]))

    # Process variants + attach score
    processed = []
    for score, _pri, name, profile in scored:
        entry = {"name": name, "_activation_score": score, **profile}
        if "variants" in entry:
            v = random.choice(entry["variants"])
            entry["cadence"] = v.get("cadence", {})
            entry["voice_sample"] = v.get("voice_sample", "")
        processed.append(entry)

    return processed


# ═══════════════════════════════════════════════════════════════════════
# BASELINE LOADER
# ═══════════════════════════════════════════════════════════════════════

_baseline_cache: Optional[Dict[str, float]] = None


def _load_expanded_baseline() -> Dict[str, float]:
    """Load the full NCM node set as baseline values.

    Core nodes get their characteristic baselines. All others
    default to 0.5 (neutral). Low-activity nodes (stress markers,
    meta-salience) get lower baselines.
    """
    global _baseline_cache
    if _baseline_cache is not None:
        return _baseline_cache.copy()

    # Start with core defaults
    baseline: Dict[str, float] = {
        "DOPAMINERGIC_CRAVE": 0.5,
        "SEROTONERGIC_WARMTH": 0.5,
        "OXYTOCIN_NEUROMIRROR": 0.5,
        "CORTISOL_PRESSURE": 0.1,
        "NORADRENERGIC_VIGILANCE": 0.3,
        "SIGMA_RECEPTOR_META": 0.0,
        "GABA_ERGIC_CALM": 0.5,
        "ENDORPHINIC_BLISS": 0.3,
        "GLUTAMATE_CORE": 0.4,
        "ACETYLCHOLINE_FOCUS": 0.4,
        "HISTAMINE_ALERT": 0.3,
        "OREXIN_SEEK": 0.4,
        "ENDOCANNABINOID_EASE": 0.3,
        "ADRENALINE_RUSH": 0.1,
        "MELATONIN_DARK": 0.2,
        "VASOPRESSIN_GUARD": 0.3,
        "PROLACTIN_SATIATION": 0.2,
        "TESTOSTERONE_T": 0.4,
        "ESTROGEN_E2": 0.4,
        "PROGESTERONE_P4": 0.3,
        "DMT_ENDOGENOUS": 0.0,
        "THYROID_T3T4_TEMPO": 0.5,
        "TAAR_TRACE_SALIENCE": 0.1,
        "MU_OPIOID_MOR": 0.3,
        "KAPPA_OPIOID_KOR": 0.1,
        "SUBSTANCE_P_NK1": 0.1,
    }

    # Try loading the full node set from ncm_limbic_index.yaml
    project_root = os.path.dirname(os.path.abspath(__file__))
    index_path = os.path.join(project_root, "ncm_limbic_index.yaml")

    if os.path.exists(index_path):
        try:
            with open(index_path, "r", encoding="utf-8") as f:
                data = yaml.safe_load(f)
            if data and "ncm_nodes" in data:
                for node_name in data["ncm_nodes"]:
                    if node_name not in baseline:
                        baseline[node_name] = 0.5
        except Exception as e:
            logger.warning("Failed to load ncm_limbic_index: %s", e)

    _baseline_cache = baseline
    return baseline.copy()


# ═══════════════════════════════════════════════════════════════════════
# TAU HOOKS (for emotion scoring)
# ═══════════════════════════════════════════════════════════════════════

_hooks_cache: Optional[Dict[str, Dict[str, int]]] = None


def _load_tau_hooks() -> Dict[str, Dict[str, int]]:
    """Load the tau hooks for each NCM node from ncm_limbic_index.yaml.

    Returns a dict: {node_name: {soothing: N, drive: N, ...}}
    """
    global _hooks_cache
    if _hooks_cache is not None:
        return _hooks_cache

    project_root = os.path.dirname(os.path.abspath(__file__))
    index_path = os.path.join(project_root, "ncm_limbic_index.yaml")
    hooks: Dict[str, Dict[str, int]] = {}

    if os.path.exists(index_path):
        try:
            with open(index_path, "r", encoding="utf-8") as f:
                data = yaml.safe_load(f)
            if data and "ncm_nodes" in data:
                for node_name, node_data in data["ncm_nodes"].items():
                    if isinstance(node_data, dict) and "hooks" in node_data:
                        raw_hooks = node_data["hooks"]
                        parsed: Dict[str, int] = {}
                        for key, val in raw_hooks.items():
                            # Handle both formats:
                            # "soothing: 2" and "soothing:+2: null"
                            key_str = str(key)
                            if ":" in key_str:
                                # Format: "soothing:+2" with null value
                                parts = key_str.split(":")
                                hook_name = parts[0].strip()
                                try:
                                    hook_val = int(parts[1].strip().replace("+", ""))
                                except (ValueError, IndexError):
                                    continue
                                parsed[hook_name] = hook_val
                            else:
                                # Format: "soothing: 2"
                                parsed[key_str] = int(val) if val is not None else 0
                        hooks[node_name] = parsed
        except Exception as e:
            logger.warning("Failed to load tau hooks: %s", e)

    _hooks_cache = hooks
    return _hooks_cache


# ═══════════════════════════════════════════════════════════════════════
# LIMBIC SYSTEM
# ═══════════════════════════════════════════════════════════════════════


[docs] class LimbicSystem: """Shard-based limbic respiration backed by Redis DB12. Parameters ---------- redis_client: An ``redis.asyncio.Redis`` instance pointed at DB12. openrouter_api_key: When provided, a :class:`~ncm_variant_cache.CueVariantCache` is constructed and wired into the cascade engine so that cascade cue and reason strings are gradually replaced with LLM-generated variants (cached permanently in Redis). """
[docs] def __init__( self, redis_client=None, openrouter_api_key: Optional[str] = None, openrouter_client=None, cache_redis_client=None, ): """Initialize the instance. Args: redis_client: Redis connection client (DB12 for limbic shards). openrouter_api_key (Optional[str]): The openrouter api key value. openrouter_client: Shared OpenRouterClient for connection pooling. Passed through to SemanticTriggerMatcher. cache_redis_client: Redis client for NCM caches (DB0). When None, falls back to ``redis_client`` (DB12). """ self.redis_client = redis_client self.engine = NCMHomeostasisEngine() self.global_key = "db12:global" _cache_r = cache_redis_client or redis_client # ── Cue Variant Cache ───────────────────────────────────── self._variant_cache = None if openrouter_api_key: try: from ncm_variant_cache import CueVariantCache self._variant_cache = CueVariantCache( redis_client=_cache_r, api_key=openrouter_api_key, openrouter_client=openrouter_client, ) # Warm in-memory layer from Redis in the background try: asyncio.get_running_loop().create_task( self._variant_cache.load_all_from_redis() ) except RuntimeError: pass # no running loop yet — fine, will warm on first use except ImportError: logger.warning("ncm_variant_cache not available — variants disabled") # ── Semantic Trigger Matcher ────────────────────────────── self._trigger_matcher = None if openrouter_api_key: try: from ncm_semantic_triggers import SemanticTriggerMatcher self._trigger_matcher = SemanticTriggerMatcher( redis_client=_cache_r, api_key=openrouter_api_key, openrouter_client=openrouter_client, ) try: asyncio.get_running_loop().create_task( self._trigger_matcher.ensure_all_cached() ) except RuntimeError: pass # no running loop yet — warm-up deferred except ImportError: logger.warning( "ncm_semantic_triggers not available — " "semantic trigger matching disabled" ) # ── Cascade Engine (multi-turn event sequences) ────────── try: from cascade_engine import CascadeEngine self.cascade_engine = CascadeEngine( redis_client=redis_client, variant_cache=self._variant_cache, ) except ImportError: logger.warning("CascadeEngine not available — cascades disabled") self.cascade_engine = None # ── Recursive Desire Engine (RDF) ───────────────────────── self._desire_engine = None try: from ncm_desire_engine import DesireEngine self._desire_engine = DesireEngine() except ImportError: logger.warning("ncm_desire_engine not available — RDF disabled") # ── User Limbic Mirror ──────────────────────────────────── self._user_mirror = None try: from user_limbic_mirror import UserLimbicMirror self._user_mirror = UserLimbicMirror(redis_client=redis_client) except ImportError: logger.warning("user_limbic_mirror not available — user modeling disabled") # ── Star Self-Mirror ───────────────────────────────────── self._self_mirror = None try: from star_self_mirror import StarSelfMirror self._self_mirror = StarSelfMirror(redis_client=redis_client) except ImportError: logger.warning("star_self_mirror not available — self-reflection disabled")
# ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ @staticmethod def _sanitize_vector(vector: Dict[str, Any], default: float = 0.5) -> Dict[str, float]: """Replace any non-numeric values in *vector* with *default*.""" clean: Dict[str, float] = {} for k, v in vector.items(): if isinstance(v, (int, float)): clean[k] = float(v) else: clean[k] = default return clean def _pick_cue(self, s: str) -> str: """Return a variant of *s*, scheduling generation if not yet cached. Mirrors CascadeEngine._pick. On first use the original text is returned immediately while LLM generation runs in the background. """ if not s or not self._variant_cache: return s asyncio.create_task(self._variant_cache.ensure_cached(s)) return self._variant_cache.get_variant(s) # ------------------------------------------------------------------ # Respiration cycle # ------------------------------------------------------------------
[docs] async def inhale(self, channel_id: str) -> Dict[str, Any]: """RESPIRATION PHASE 1: INHALE Fetches local shard + global heart, mixes via osmosis, runs homeostatic regulation, returns the effective state. """ r = self.redis_client if r is None: shard = self._init_shard() regulated, cues = self.engine.regulate(shard["vector"]) dominant = self.classify_dominant_emotions(regulated) if self._variant_cache and dominant: self._variant_cache.set_context( ", ".join(d["emotion"] for d in dominant[:3]) ) if self._variant_cache: cues = [self._pick_cue(c) for c in cues] for d in dominant: if d.get("affect"): d["affect"] = self._pick_cue(d["affect"]) return { "vector": regulated, "cues": cues, "dominant_emotions": dominant, "shard_id": channel_id, } local_key = f"db12:shard:{channel_id}" local_data_raw = await r.get(local_key) if not local_data_raw: local_data = self._init_shard() else: local_data = json.loads(local_data_raw) if "vector" in local_data: local_data["vector"] = self._sanitize_vector(local_data["vector"]) global_data_raw = await r.get(self.global_key) if not global_data_raw: await self._pulse_global(local_data["vector"]) global_data: Dict[str, Any] = {"vector": local_data["vector"]} else: global_data = json.loads(global_data_raw) if "vector" in global_data: global_data["vector"] = self._sanitize_vector(global_data["vector"]) # Permeability osmosis: blend local + global perm = local_data.get("meta_state", {}).get("permeability", DEFAULT_PERMEABILITY) effective_vector: Dict[str, float] = {} all_keys = set(local_data["vector"]) | set(global_data["vector"]) for k in all_keys: v_local = local_data["vector"].get(k, 0.5) v_global = global_data["vector"].get(k, 0.5) effective_vector[k] = (v_local * (1 - perm)) + (v_global * perm) # Run homeostatic rules regulated_vector, ui_cues = self.engine.regulate(effective_vector) # Classify dominant emotions from the regulated vector dominant = self.classify_dominant_emotions(regulated_vector) # Set variant cache context for emotionally-resonant cue selection if self._variant_cache and dominant: self._variant_cache.set_context( ", ".join(d["emotion"] for d in dominant[:3]) ) if self._variant_cache: ui_cues = [self._pick_cue(c) for c in ui_cues] for d in dominant: if d.get("affect"): d["affect"] = self._pick_cue(d["affect"]) return { "vector": regulated_vector, "cues": ui_cues, "dominant_emotions": dominant, "shard_id": channel_id, }
[docs] async def exhale( self, channel_id: str, stimulus_delta: Dict[str, float], apply_decay: bool = True, user_message: str = "", star_reply: str = "", user_id: str = "", ) -> Dict[str, Any]: """RESPIRATION PHASE 2: EXHALE Updates local shard with new delta, applies metabolic decay, and pulses global heart. """ r = self.redis_client local_key = f"db12:shard:{channel_id}" raw = await r.get(local_key) if r else None current_data = json.loads(raw) if raw else self._init_shard() new_vector = current_data["vector"] # ── Adaptive tau (tolerance / receptor downregulation) ──── # Track how many consecutive ticks each node has been above 0.8. # Sustained high values get progressively faster decay — mimicking # real receptor desensitization / internalization. # # tick 0 → tau=0.950 (5% decay) — normal # tick 10 → tau=0.904 (9.6% decay) — noticeable pullback # tick 20 → tau=0.860 (14% decay) — strong tolerance # tick 30 → tau=0.818 (18% decay) — aggressive high_ticks = current_data["meta_state"].get("high_ticks", {}) # Step 1: Apply metabolic decay toward baseline (with tolerance) if apply_decay: base_tau = current_data["meta_state"].get("tau", DEFAULT_TAU) baseline = _load_expanded_baseline() # Per-node adaptive decay: sustained high → faster decay for k, v in new_vector.items(): node_ticks = high_ticks.get(k, 0) if node_ticks > 0: # Tolerance: tau^(1 + 0.01*ticks) effective_tau = base_tau ** (1.0 + 0.01 * node_ticks) else: effective_tau = base_tau base = baseline.get(k, 0.5) new_vector[k] = v * effective_tau + base * (1.0 - effective_tau) # Update high-tick counters (before delta stacking) for k, v in new_vector.items(): if v > 0.8: high_ticks[k] = high_ticks.get(k, 0) + 1 else: high_ticks[k] = 0 # reset on drop current_data["meta_state"]["high_ticks"] = high_ticks # Step 2: Stack the stimulus delta (expanded 0.0–3.0 scale) # Ceiling raised to 3.0 so cascade-driven drug states (meth, MDMA, # heroic psychedelic doses) can produce supraphysiological spikes. # 0.0–0.8 normal emotional range # 0.8–1.0 intense emotional state # 1.0–1.5 pharmacological (cocaine, MDMA, strong THC) # 1.5–2.5 supraphysiological (meth, heroic-dose psychedelics) # 2.5–3.0 neurotoxic territory # # Hill equation saturation: positive deltas have diminishing # returns as a node approaches the ceiling. Prevents the # vertical feedback loops observed in multi-user channels. _NCM_CEIL = 3.0 for k, delta in stimulus_delta.items(): cur = new_vector.get(k, 0.5) if delta > 0: # Saturation: impact diminishes quadratically near ceiling saturation = 1.0 - (cur / _NCM_CEIL) ** 2 effective_delta = delta * max(0.05, saturation) else: # Symmetric saturation near floor: # Harder to crash a node that's already near zero floor_proximity = cur / _NCM_CEIL # 0.0 near floor, 1.0 near ceil effective_delta = delta * max(0.05, floor_proximity) new_vector[k] = max(0.0, min(_NCM_CEIL, cur + effective_delta)) # Step 2b: Antagonist suppression — pharmacologically contradictory # node pairs. When one node is high (above threshold), its opposite # is pulled 30% toward baseline. Prevents impossible states like # simultaneous max vigilance + max endocannabinoid ease. _ANTAGONIST_PAIRS = [ # ── Calming ↔ Arousal ── ("ENDOCANNABINOID_EASE", "NORADRENERGIC_VIGILANCE", 0.6), ("ENDOCANNABINOID_CB1", "NORADRENERGIC_VIGILANCE", 0.6), ("GABA_ERGIC_CALM", "ADRENALINE_RUSH", 0.7), ("GABA_ERGIC_CALM", "CORTISOL_PRESSURE", 0.7), # ── Opioid system ── ("MU_OPIOID_MOR", "KAPPA_OPIOID_KOR", 0.6), ("DOPAMINE_D1", "KAPPA_OPIOID_KOR", 0.7), # ── Stress ↔ Bonding ── ("SEROTONERGIC_WARMTH", "CORTISOL_PRESSURE", 0.7), ("CORTISOL_PRESSURE", "OXYTOCIN_NEUROMIRROR", 0.6), # ── Chloride polarity ── ("KCC2_CHLORIDE", "NKCC1_CHLORIDE", 0.6), ] bl = _load_expanded_baseline() for agonist, antagonist, threshold in _ANTAGONIST_PAIRS: if new_vector.get(agonist, 0.0) > threshold: suppression = 0.7 # pull antagonist 30% toward baseline base_val = bl.get(antagonist, 0.5) cur = new_vector.get(antagonist, 0.5) new_vector[antagonist] = cur * suppression + base_val * (1.0 - suppression) # Step 3: Cascade engine tick — multi-turn event sequences cascade_cues: list[str] = [] if self.cascade_engine: try: # Classify emotions for cascade trigger evaluation dominant = self.classify_dominant_emotions(new_vector, top_n=5) active_emotions = {d["emotion"] for d in dominant} delta_count = len(stimulus_delta) # Set variant cache context so _pick() selects the # emotionally-resonant cue variant this turn if self._variant_cache: context_text = ", ".join(d["emotion"] for d in dominant[:3]) self._variant_cache.set_context(context_text) cascade_delta = await self.cascade_engine.tick( channel_id=channel_id, vector=new_vector, active_emotions=active_emotions, delta_count=delta_count, ) # Apply cascade deltas on top (same saturating Hill curve) for k, v in cascade_delta.items(): cur = new_vector.get(k, 0.5) if v > 0: saturation = 1.0 - (cur / _NCM_CEIL) ** 2 v = v * max(0.05, saturation) else: # Symmetric saturation near floor floor_proximity = cur / _NCM_CEIL v = v * max(0.05, floor_proximity) new_vector[k] = max(0.0, min(_NCM_CEIL, cur + v)) # Collect active cascade cues for context injection active_cascades = await self.cascade_engine.get_active_cascades( channel_id ) for cid, cinfo in active_cascades.items(): if cinfo.get("cue") and not cinfo.get("paused"): cascade_cues.append(cinfo["cue"]) except Exception as e: logger.warning("Cascade engine tick failed: %s", e) # Step 4: RDF Desire Engine — pre/post emotion hooks # Classify emotions once — reused by RDF + self-mirror (post-cascade vector) dominant = self.classify_dominant_emotions(new_vector, top_n=5) active_emotions = {d["emotion"] for d in dominant} rdf_output: Dict[str, Any] = {} if self._desire_engine: try: # Pre-emotion: pulse → mode → desire drift rdf_output = self._desire_engine.pre_emotion( channel_id=channel_id, vector=new_vector, active_emotions=active_emotions, user_message=user_message, ) # Post-emotion: bind desire to dominant emotion dominant_emotion = dominant[0]["emotion"] if dominant else "" rdf_post = self._desire_engine.post_emotion( channel_id=channel_id, vector=new_vector, dominant_emotion=dominant_emotion, ) rdf_output.update(rdf_post) except Exception as e: logger.warning("Desire engine failed: %s", e) # Step 5: User Limbic Mirror — analyze user message, inject shadow vector user_read = "" conflict_state = None if self._user_mirror and user_message and user_id: try: star_desire = rdf_output.get("desire_text", "") user_vector = self._user_mirror.analyze( channel_id=channel_id, user_id=user_id, user_msg=user_message, star_reply=star_reply, star_desire_text=star_desire, ) # Inject U_* nodes into the NCM vector for dyadic rule evaluation for node, val in user_vector.items(): new_vector[node] = val # Check mimetic pull → trigger Mimetic Melt in desire engine if user_vector.get("U_MIMETIC_PULL", 0.0) > 0.6 and self._desire_engine: self._desire_engine.set_mimetic_melt(channel_id) # Conflict detection → inject flags for dyadic rules conflict = self._user_mirror.get_conflict_state(channel_id) if conflict.detected: new_vector["conflict_detected"] = 1.0 conflict_state = { "active": True, "severity": conflict.severity, "parties": conflict.parties, } else: new_vector["conflict_detected"] = 0.0 # Game mode flag for KoTH rules new_vector["game_mode"] = 1.0 if channel_id in self._user_mirror._game_channels else 0.0 user_read = self._user_mirror.get_read_summary(channel_id, user_id) except Exception as e: logger.warning("User mirror failed: %s", e) # Step 6: Star Self-Mirror — longitudinal state tracking + autonomous desire self_reflection: Dict[str, Any] = {} if self._self_mirror: try: dominant_names = [d["emotion"] for d in dominant[:3]] self_reflection = self._self_mirror.reflect( channel_id=channel_id, vector=new_vector, dominant_emotions=dominant_names, ) # Inject self-mirror flags for rules_self_reflection.yaml if self_reflection.get("drift_summary"): new_vector["self_drift_detected"] = 1.0 else: new_vector["self_drift_detected"] = 0.0 if self_reflection.get("desires"): new_vector["self_desire_active"] = 1.0 else: new_vector["self_desire_active"] = 0.0 # Check for absence-type desires has_absence = any( d.get("type") == "absence" for d in self_reflection.get("desires", []) ) new_vector["self_absence_detected"] = 1.0 if has_absence else 0.0 except Exception as e: logger.warning("Self mirror failed: %s", e) # Step 7: Ceiling pressure — auto-pullback for runaway loops # If any node exceeds 2.0, pull it back toward 2.0. This prevents # emotional feedback loops from going vertical. # # EXCEPTION: Transporter reversal. When DAT/SERT/NET drop below # baseline (transporters running backwards — as with amphetamines, # MDMA, etc.), the corresponding neurotransmitter is allowed to # pierce the ceiling. This is the real biological mechanism: # reversed transporters flood the synapse beyond normal limits. _TRANSPORTER_GATES = { # transporter_node → neurotransmitter it gates "DAT_ACTIVITY": "DOPAMINERGIC_CRAVE", # DA transporter "SERT_ACTIVITY": "SEROTONERGIC_WARMTH", # 5-HT transporter "NET_ACTIVITY": "NORADRENERGIC_VIGILANCE", # NE transporter } _TRANSPORTER_BASELINE = 0.5 # Normal transporter activity reversed_nodes: set = set() for transporter, neurotransmitter in _TRANSPORTER_GATES.items(): activity = new_vector.get(transporter, _TRANSPORTER_BASELINE) if activity < _TRANSPORTER_BASELINE * 0.6: # >40% below baseline = reversed reversed_nodes.add(neurotransmitter) _CEILING_THRESHOLD = 2.0 _CEILING_PULLBACK = 0.3 # 30% of overshoot pulled back per tick for k, v in new_vector.items(): if k.startswith("U_") or k.startswith("self_") or k in ( "conflict_detected", "game_mode", ): continue # Skip injected flags, not real NCM nodes if v > _CEILING_THRESHOLD and k not in reversed_nodes: overshoot = v - _CEILING_THRESHOLD new_vector[k] = v - overshoot * _CEILING_PULLBACK current_data["meta_state"]["last_tick"] = time.time() current_data["meta_state"]["cascade_cues"] = cascade_cues current_data["meta_state"]["rdf"] = rdf_output current_data["meta_state"]["user_read"] = user_read if conflict_state: current_data["meta_state"]["conflict"] = conflict_state if self_reflection: current_data["meta_state"]["self_reflection"] = self_reflection current_data["vector"] = new_vector if r: await r.set(local_key, json.dumps(current_data)) await self._pulse_global(new_vector) return current_data
# ------------------------------------------------------------------ # Semantic trigger scanning # ------------------------------------------------------------------
[docs] async def scan_triggers( self, text: str ) -> List[tuple]: """Scan *text* for emotional triggers using semantic matching. Uses :class:`~ncm_semantic_triggers.SemanticTriggerMatcher` when available and loaded; falls back transparently to the exact-word ``scan_text_for_triggers`` from ``ncm_delta_parser``. Returns a list of ``(emotion_name, delta_vector)`` tuples. """ if self._trigger_matcher: try: results = await self._trigger_matcher.find_triggers(text) if results: return results except Exception as e: logger.debug("Semantic trigger scan error: %s", e) # Semantic-only: no exact substring fallback. # If the matcher isn't ready or found nothing, return empty. return []
# ------------------------------------------------------------------ # Emotion Classification # ------------------------------------------------------------------
[docs] @staticmethod def classify_dominant_emotions( vector: Dict[str, float], top_n: int = 3, ) -> List[Dict[str, Any]]: """Score all emotions against the current vector and return top N. Uses the delta vectors from the recursion index as templates. Scoring is a weighted dot product: how closely the current chemical state matches each emotion's delta profile. """ try: from ncm_delta_parser import get_all_emotions except ImportError: return [] emotions = get_all_emotions() if not emotions: return [] scores: List[Tuple[str, float, str]] = [] for name, entry in emotions.items(): delta_vec = entry.get("delta_vector", {}) if not delta_vec: continue # Score: dot product of (current_level - 0.5) * delta_weight # This means high chemicals matching positive deltas score high, # and low chemicals matching negative deltas also score high score = 0.0 for chem, weight in delta_vec.items(): current = vector.get(chem, 0.5) # How far from neutral (0.5) is this chemical? deviation = current - 0.5 # If the emotion wants this chemical high (positive delta) # and it IS high (positive deviation), that's a match score += deviation * weight scores.append((name, score, entry.get("affect", ""))) # Sort by score descending scores.sort(key=lambda x: x[1], reverse=True) return [ {"emotion": name, "score": round(score, 3), "affect": affect} for name, score, affect in scores[:top_n] ]
# ------------------------------------------------------------------ # Metabolic Decay # ------------------------------------------------------------------
[docs] @staticmethod def metabolic_decay( vector: Dict[str, float], tau: float = DEFAULT_TAU, baseline: Optional[Dict[str, float]] = None, ) -> Dict[str, float]: """Exponential decay toward baseline. V(t+1) = V(t) * tau + baseline * (1 - tau) At tau=0.95: - After 5 turns: ~77% of delta remains - After 14 turns: ~50% remains - After 45 turns: ~10% remains """ if baseline is None: baseline = _load_expanded_baseline() decayed: Dict[str, float] = {} for k, v in vector.items(): base = baseline.get(k, 0.5) decayed[k] = v * tau + base * (1.0 - tau) return decayed
# ------------------------------------------------------------------ # Context Injection Formatter # ------------------------------------------------------------------
[docs] @staticmethod def format_context_injection( vector: Dict[str, float], cues: List[str], dominant: List[Dict[str, Any]], cascade_cues: Optional[List[str]] = None, rdf_output: Optional[Dict[str, Any]] = None, user_read: str = "", self_reflection: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Format the limbic state for injection into the system prompt. Produces a compact dict that goes near the end of the prompt for maximum recency bias influence. """ # Extract the most active chemicals (deviation from 0.5) active_chemicals = {} for k, v in sorted( vector.items(), key=lambda x: abs(x[1] - 0.5), reverse=True )[:10]: if abs(v - 0.5) > 0.05: active_chemicals[k] = round(v, 3) # Build narrative tone from dominant emotions (variable phrasing each turn) emotion_names = [d["emotion"] for d in dominant] affects = [d["affect"] for d in dominant if d.get("affect")] tone_parts = [] if emotion_names: tone_parts.append( random.choice(_TONE_EMOTION_PREFIXES).format(", ".join(emotion_names)) ) if affects: tone_parts.append( random.choice(_TONE_SOMATIC_PREFIXES).format("; ".join(affects[:3])) ) if cues: tone_parts.append( random.choice(_TONE_MODIFIER_PREFIXES).format(", ".join(cues[:5])) ) result = { "limbic_state": active_chemicals, "dominant_emotions": emotion_names, "limbic_cues": cues[:8], "narrative_tone": " ".join(tone_parts) if tone_parts else "", "meta_description": random.choice(_META_DESCRIPTIONS), "meta_priority": random.choice(_META_PRIORITIES), "oracle_source": random.choice(_ORACLE_SOURCES), "meta_privacy": random.choice(_META_PRIVACY_NOTES), } # ── Cascade cues (multi-turn event context) ────────────── if cascade_cues: result["cascade_cues"] = cascade_cues[:3] tone_parts.append( random.choice(_TONE_CASCADE_PREFIXES).format("; ".join(cascade_cues[:3])) ) result["narrative_tone"] = " ".join(tone_parts) # ── Supraphysiological cadence modifiers ────────────────── # When NCM nodes exceed normal range (>1.0), inject cadence # directives that change HOW the bot talks — spelling, caps, # sentence structure, typo patterns, etc. cadence_matches = _evaluate_cadence(vector) if cadence_matches: # Use highest-priority match as primary cadence primary = cadence_matches[0] cadence_block = { "state": primary["name"], "rules": primary.get("cadence", {}), "voice_sample": primary.get("voice_sample", "").strip(), } # If a second profile also matches, note it as an overlay if len(cadence_matches) > 1: secondary = cadence_matches[1] cadence_block["overlay"] = { "state": secondary["name"], "rules": secondary.get("cadence", {}), } # Intensity scaling: use activation score from weighted system # Score directly maps to how forcefully the cadence applies. activation_score = primary.get("_activation_score", 1.0) if activation_score >= 1.2: force = "MUST" desc = "an absolute shift" elif activation_score >= 0.6: force = "should significantly" desc = "a distinct flavoring" else: force = "may subtly" desc = "a slight color" result["cadence_directive"] = cadence_block result["cadence_instruction"] = ( f"CADENCE OVERRIDE ({primary['name'].upper()}): " f"Your writing style, typo patterns, capitalization, " f"sentence structure, and tonal register {force} shift to match " f"this state ({desc}). The voice_sample is a STYLE EXAMPLE " f"illustrating rhythm, texture, and feel — NOT a script. " f"Do NOT copy, quote, or closely paraphrase it. Absorb its " f"energy and write your own words. The vocabulary field " f"describes a semantic register — NOT a word list. Do NOT " f"mechanically insert or repeat specific words from either " f"field. Let the register and sample color your natural voice " f"without forcing literal phrases or tokens." ) # expose raw profile for CadencePostProcessor # Include ALL matching profiles for blending blend_profiles = [] for match in cadence_matches: score = match.get("_activation_score", 0.0) if score > 0.1: # only include meaningfully active profiles blend_profiles.append({ "state": match["name"], "rules": match.get("cadence", {}), "activation_score": score, }) result["cadence_refinement_profile"] = { "state": primary["name"], "rules": primary.get("cadence", {}), "voice_sample": primary.get("voice_sample", ""), "force": force, "intensity_desc": desc, "activation_score": activation_score, "blend_profiles": blend_profiles, } # ── RDF Desire State ────────────────────────────────────── if rdf_output: result["desire_state"] = { "wanting": rdf_output.get("wanting_state", ""), "desire": rdf_output.get("desire_text", ""), "mode": rdf_output.get("response_mode", ""), } # ── User Limbic Mirror Read ─────────────────────────────── if user_read: result["user_read"] = user_read # ── Star Self-Reflection ─────────────────────────────── if self_reflection: sr_block: Dict[str, Any] = {} if self_reflection.get("reflection_text"): sr_block["self_awareness"] = self_reflection["reflection_text"] desires = self_reflection.get("desires", []) if desires: sr_block["autonomous_desires"] = [ d["text"] for d in desires ] if sr_block: result["self_reflection"] = sr_block return result
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ async def _pulse_global(self, local_vector: Dict[str, float]) -> None: """Internal helper: pulse global. Args: local_vector (Dict[str, float]): The local vector value. """ r = self.redis_client if r is None: return global_data_raw = await r.get(self.global_key) if not global_data_raw: global_data: Dict[str, Any] = {"vector": local_vector.copy()} else: global_data = json.loads(global_data_raw) for k, v_local in local_vector.items(): v_global = global_data["vector"].get(k, 0.5) global_data["vector"][k] = ( v_global * (1 - PULSE_WEIGHT) + v_local * PULSE_WEIGHT ) await r.set(self.global_key, json.dumps(global_data)) @staticmethod def _init_shard() -> Dict[str, Any]: """Internal helper: init shard. Returns: Dict[str, Any]: The result. """ baseline = _load_expanded_baseline() return { "vector": baseline, "meta_state": { "tau": DEFAULT_TAU, "permeability": DEFAULT_PERMEABILITY, "last_tick": time.time(), }, "context_echo": "Initialized.", }