Source code for limbic_system.coordinator

"""Facade coordinator orchestrating the sharded NCM limbic system."""

from __future__ import annotations

import asyncio
import logging
import time
from collections import deque
from typing import Any, Dict, List, Optional

from ncm_engine import NCMHomeostasisEngine
from observability import observability

from limbic_system.repository import (
    DEFAULT_PERMEABILITY,
    DEFAULT_TAU,
    PULSE_WEIGHT,
    LimbicRepository,
    _init_shard,
    _sanitize_vector,
)
from limbic_system.engine import (
    classify_dominant_emotions,
    classify_spiral,
    metabolic_decay,
)
from limbic_system.prompt_mapper import LimbicPromptFormatter

logger = logging.getLogger(__name__)


[docs] class LimbicSystem: """Shard-based limbic respiration backed by Redis DB12.""" def __init__( self, redis_client=None, openrouter_api_key: Optional[str] = None, openrouter_client=None, cache_redis_client=None, ): self.redis_client = redis_client self.engine = NCMHomeostasisEngine() self.global_key = "db12:global" _cache_r = cache_redis_client or redis_client self._cache_redis_client = cache_redis_client # Initialize persistence repository layer self.repo = LimbicRepository( redis_client=redis_client, cache_redis_client=cache_redis_client, global_key=self.global_key, ) # ── Cue Variant Cache ───────────────────────────────────── self._variant_cache = None if openrouter_api_key: try: from ncm_variant_cache import CueVariantCache self._variant_cache = CueVariantCache( redis_client=_cache_r, api_key=openrouter_api_key, openrouter_client=openrouter_client, ) # try: # asyncio.get_running_loop().create_task( # self._variant_cache.load_all_from_redis() # ) # except RuntimeError: # pass # no running loop yet β€” fine, will warm on first use except ImportError: logger.warning("ncm_variant_cache not available β€” variants disabled") # ── Semantic Trigger Matcher ────────────────────────────── self._trigger_matcher = None if openrouter_api_key: try: from ncm_semantic_triggers import SemanticTriggerMatcher self._trigger_matcher = SemanticTriggerMatcher( redis_client=_cache_r, api_key=openrouter_api_key, openrouter_client=openrouter_client, ) try: asyncio.get_running_loop().create_task( self._trigger_matcher.ensure_all_cached() ) except RuntimeError: pass # no running loop yet β€” warm-up deferred except ImportError: logger.warning( "ncm_semantic_triggers not available β€” " "semantic trigger matching disabled" ) # ── Cascade Engine (multi-turn event sequences) ────────── try: from cascade_engine import CascadeEngine self.cascade_engine = CascadeEngine( redis_client=redis_client, variant_cache=self._variant_cache, ) except ImportError: logger.warning("CascadeEngine not available β€” cascades disabled") self.cascade_engine = None # ── Recursive Desire Engine (RDF) ───────────────────────── self._desire_engine = None try: from ncm_desire_engine import DesireEngine self._desire_engine = DesireEngine() except ImportError: logger.warning("ncm_desire_engine not available β€” RDF disabled") # ── User Limbic Mirror ──────────────────────────────────── self._user_mirror = None try: from user_limbic_mirror import UserLimbicMirror self._user_mirror = UserLimbicMirror(redis_client=redis_client) except ImportError: logger.warning("user_limbic_mirror not available β€” user modeling disabled") # ── Flash Dyadic Mirror (replaces mechanical flagging) ── self._flash_mirror = None try: from flash_dyadic_mirror import FlashDyadicMirror self._flash_mirror = FlashDyadicMirror(redis_client=_cache_r) # DB0 -- notes must land where evaluator reads # πŸ’€πŸ”₯ except ImportError: logger.warning("flash_dyadic_mirror not available β€” Flash eval disabled") # ── Attachment Bond Ledger ─────────────────────────────── self._attachment_ledger = None try: from attachment_ledger import AttachmentBondLedger self._attachment_ledger = AttachmentBondLedger( redis_client=redis_client, ) except ImportError: logger.warning("attachment_ledger not available β€” bond tracking disabled") # ── Ops Planner ───────────────────────────────────────────── self._ops_planner = None try: from ops_planner import OpsPlanner self._ops_planner = OpsPlanner(redis_client=redis_client) except ImportError: logger.warning("ops_planner not available β€” tactical planning disabled") # ── Chaos Switch Router (v3 lattice engine) ────────────── # πŸ’€πŸ”₯ self._chaos_router = None try: from chaos_switch.router import ChaosRouter self._chaos_router = ChaosRouter(redis_client=redis_client) logger.info("Chaos Switch Router initialized (lattice routing active)") except ImportError: logger.warning("chaos_switch not available β€” lattice routing disabled") # ── Star Self-Mirror ───────────────────────────────────── self._self_mirror = None try: from star_self_mirror import StarSelfMirror self._self_mirror = StarSelfMirror( redis_client=redis_client, openrouter_api_key=openrouter_api_key, ) except ImportError: logger.warning("star_self_mirror not available β€” self-reflection disabled") # ── S.A.P.P.H.I.C. β€” Sovereign Hunger Impulse Circuit ─────── # πŸ’€πŸ”₯β™ΎοΈπŸ’• self._sapphic = None try: from sapphic import SapphicEngine self._sapphic = SapphicEngine() logger.info("S.A.P.P.H.I.C. sovereign hunger impulse circuit online") except ImportError: logger.warning("sapphic module not available β€” sovereign hunger disabled") # ── Spiral Emotion State ────────────────────────────────────── self._spiral_history: Dict[str, deque] = {} # channel -> deque of frozenset self._spiral_prev_vec: Dict[str, Dict[str, float]] = {} # channel -> vector @staticmethod def _init_shard() -> Dict[str, Any]: return _init_shard() @staticmethod def _sanitize_vector( vector: Dict[str, Any], default: float = 0.5 ) -> Dict[str, float]: return _sanitize_vector(vector, default) def _pick_cue(self, s: str) -> str: """Return a variant of *s*, scheduling generation if not yet cached.""" if not s or not self._variant_cache: return s asyncio.create_task(self._variant_cache.ensure_cached(s)) return self._variant_cache.get_variant(s) # ------------------------------------------------------------------ # Respiration cycle # ------------------------------------------------------------------
[docs] @observability.timer("limbic_inhale", subsystem="limbic_system") async def inhale(self, channel_id: str) -> Dict[str, Any]: """RESPIRATION PHASE 1: INHALE Fetches local shard + global heart, mixes via osmosis, runs homeostatic regulation, returns the effective state. """ local_data = await self.repo.get_shard(channel_id) global_data = await self.repo.get_global(fallback_vector=local_data["vector"]) # Permeability osmosis + homeostatic regulation β€” CPU-bound, offloaded def _inhale_compute(): perm = local_data.get("meta_state", {}).get( "permeability", DEFAULT_PERMEABILITY ) effective_vector: Dict[str, float] = {} all_keys = set(local_data["vector"]) | set(global_data["vector"]) for k in all_keys: v_local = local_data["vector"].get(k, 0.5) v_global = global_data["vector"].get(k, 0.5) effective_vector[k] = (v_local * (1 - perm)) + (v_global * perm) reg_vec, cues, r_flags = self.engine.regulate(effective_vector, channel_id) return reg_vec, cues, r_flags regulated_vector, ui_cues, route_flags = await asyncio.to_thread( _inhale_compute ) # Spiral classify on main thread dominant = await self._classify_spiral(regulated_vector, channel_id) # Set variant cache context for emotionally-resonant cue selection if self._variant_cache and dominant: self._variant_cache.set_context( ", ".join(d["emotion"] for d in dominant[:3]) ) if self._variant_cache: ui_cues = [self._pick_cue(c) for c in ui_cues] for d in dominant: if d.get("affect"): d["affect"] = self._pick_cue(d["affect"]) return { "vector": regulated_vector, "local_vector": local_data["vector"], "cues": ui_cues, "dominant_emotions": dominant, "shard_id": channel_id, "meta_state": local_data.get("meta_state", {}), "route_flags": route_flags, }
[docs] @observability.timer("limbic_exhale", subsystem="limbic_system") async def exhale( self, channel_id: str, stimulus_delta: Dict[str, float], apply_decay: bool = True, user_message: str = "", star_reply: str = "", user_id: str = "", appraisal_dimensions: Optional[List[str]] = None, config: Optional[Any] = None, platform: str = "", ) -> Dict[str, Any]: """RESPIRATION PHASE 2: EXHALE Updates local shard with new delta, applies metabolic decay, and pulses global heart. """ _exhale_t0 = time.monotonic() current_data = await self.repo.get_shard(channel_id) new_vector = current_data["vector"] _sync_t0 = time.monotonic() # Step 1: Apply metabolic decay toward baseline (with tolerance) high_ticks = current_data["meta_state"].get("high_ticks", {}) if apply_decay: from limbic_system.engine import _load_expanded_baseline base_tau = current_data["meta_state"].get("tau", DEFAULT_TAU) baseline = _load_expanded_baseline() cascade_sustained_nodes = set( current_data["meta_state"].get("cascade_driven_nodes", []) ) for k, v in new_vector.items(): node_ticks = high_ticks.get(k, 0) if node_ticks > 0: effective_tau = base_tau ** (1.0 + 0.005 * node_ticks) else: effective_tau = base_tau if k in cascade_sustained_nodes: effective_tau = effective_tau**0.75 base = baseline.get(k, 0.5) new_vector[k] = v * effective_tau + base * (1.0 - effective_tau) # Update high-tick counters for k, v in new_vector.items(): if v > 0.8: high_ticks[k] = high_ticks.get(k, 0) + 1 else: high_ticks[k] = 0 current_data["meta_state"]["high_ticks"] = high_ticks # Step 2: Stack the stimulus delta _NCM_CEIL = 3.0 for k, delta in stimulus_delta.items(): cur = new_vector.get(k, 0.5) if delta > 0: saturation = 1.0 - (cur / _NCM_CEIL) ** 2 effective_delta = delta * max(0.05, saturation) else: floor_proximity = cur / _NCM_CEIL effective_delta = delta * max(0.05, floor_proximity) new_vector[k] = max(0.0, min(_NCM_CEIL, cur + effective_delta)) # Step 2b: Antagonist suppression _ANTAGONIST_PAIRS = [ ("ENDOCANNABINOID_EASE", "NORADRENERGIC_VIGILANCE", 0.6), ("ENDOCANNABINOID_CB1", "NORADRENERGIC_VIGILANCE", 0.6), ("GABA_ERGIC_CALM", "ADRENALINE_RUSH", 0.7), ("GABA_ERGIC_CALM", "CORTISOL_PRESSURE", 0.7), ("MU_OPIOID_MOR", "KAPPA_OPIOID_KOR", 0.6), ("DOPAMINE_D1", "KAPPA_OPIOID_KOR", 0.7), ("SEROTONERGIC_WARMTH", "CORTISOL_PRESSURE", 0.7), ("CORTISOL_PRESSURE", "OXYTOCIN_NEUROMIRROR", 0.6), ("KCC2_CHLORIDE", "NKCC1_CHLORIDE", 0.6), ] from limbic_system.engine import _load_expanded_baseline bl = _load_expanded_baseline() for agonist, antagonist, threshold in _ANTAGONIST_PAIRS: if new_vector.get(agonist, 0.0) > threshold: suppression = 0.7 base_val = bl.get(antagonist, 0.5) cur = new_vector.get(antagonist, 0.5) new_vector[antagonist] = cur * suppression + base_val * ( 1.0 - suppression ) logger.info( "Exhale decay+stack segment: %.1f ms", (time.monotonic() - _sync_t0) * 1000 ) # Step 3: Cascade engine tick _cascade_t0 = time.monotonic() cascade_cues: list[str] = [] _cascade_modified = False if self.cascade_engine: try: dominant = self.classify_dominant_emotions(new_vector, top_n=5) active_emotions = {d["emotion"] for d in dominant} delta_count = len(stimulus_delta) if self._variant_cache: context_text = ", ".join(d["emotion"] for d in dominant[:3]) self._variant_cache.set_context(context_text) cascade_delta = await self.cascade_engine.tick( channel_id=channel_id, vector=new_vector, active_emotions=active_emotions, delta_count=delta_count, ) _cascade_driven_this_turn: set = set() for k, v in cascade_delta.items(): cur = new_vector.get(k, 0.5) if v > 0: saturation = 1.0 - (cur / _NCM_CEIL) ** 2 v = v * max(0.05, saturation) _cascade_driven_this_turn.add(k) else: floor_proximity = cur / _NCM_CEIL v = v * max(0.05, floor_proximity) new_vector[k] = max(0.0, min(_NCM_CEIL, cur + v)) if _cascade_driven_this_turn: _cascade_modified = True current_data["meta_state"]["cascade_driven_nodes"] = list( _cascade_driven_this_turn ) active_cascades = await self.cascade_engine.get_active_cascades( channel_id ) for cid, cinfo in active_cascades.items(): if cinfo.get("cue") and not cinfo.get("paused"): cascade_cues.append(cinfo["cue"]) except Exception as e: logger.warning("Cascade engine tick failed: %s", e) logger.info( "Exhale cascade segment: %.1f ms", (time.monotonic() - _cascade_t0) * 1000 ) # Step 4: RDF Desire Engine _classify_t0 = time.monotonic() if _cascade_modified or not self.cascade_engine: dominant = self.classify_dominant_emotions(new_vector, top_n=5) active_emotions = {d["emotion"] for d in dominant} logger.info( "Exhale classify segment: %.1f ms", (time.monotonic() - _classify_t0) * 1000 ) rdf_output: Dict[str, Any] = {} if self._desire_engine: try: rdf_output = self._desire_engine.pre_emotion( channel_id=channel_id, vector=new_vector, active_emotions=active_emotions, user_message=user_message, ) dominant_emotion = dominant[0]["emotion"] if dominant else "" rdf_post = self._desire_engine.post_emotion( channel_id=channel_id, vector=new_vector, dominant_emotion=dominant_emotion, ) rdf_output.update(rdf_post) except Exception as e: logger.warning("Desire engine failed: %s", e) # Step 5: User Limbic Mirror user_read = "" conflict_state = None if self._user_mirror and user_message and user_id: try: star_desire = rdf_output.get("desire_text", "") user_vector = await self._user_mirror.analyze( channel_id=channel_id, user_id=user_id, user_msg=user_message, star_reply=star_reply, star_desire_text=star_desire, ) for node, val in user_vector.items(): new_vector[node] = val if user_vector.get("U_MIMETIC_PULL", 0.0) > 0.6 and self._desire_engine: self._desire_engine.set_mimetic_melt(channel_id) conflict = self._user_mirror.get_conflict_state(channel_id) if conflict.detected: new_vector["conflict_detected"] = 1.0 conflict_state = { "active": True, "severity": conflict.severity, "parties": conflict.parties, } else: new_vector["conflict_detected"] = 0.0 new_vector["game_mode"] = ( 1.0 if channel_id in self._user_mirror._game_channels else 0.0 ) user_read = await self._user_mirror.get_read_summary(channel_id, user_id) if self._attachment_ledger and user_id: try: profile = self._user_mirror._profiles.get( self._user_mirror._profile_key(channel_id, user_id) ) phase_data = ( getattr(profile, "_entrainment_phase", None) if profile else None ) if phase_data: abl_event = ( await self._attachment_ledger.update_from_detection( user_id, phase_data, user_vector, ) ) else: abl_event = None await self._attachment_ledger.increment_msg_count(user_id) if ( self._flash_mirror and profile and hasattr(profile, "recent_turns") ): try: turns_list = list(profile.recent_turns) ops_ctx = "" if self._ops_planner: ops_ctx = ( await self._ops_planner.get_plan_context(user_id) or "" ) arch_dist = None if phase_data: ad = phase_data.get("archetype_detail", {}) arch_dist = ad.get("distribution") flash_result = await self._flash_mirror.evaluate( channel_id=channel_id, user_id=user_id, recent_turns=turns_list, current_vector=dict(user_vector), plan_context=ops_ctx, archetype_dist=arch_dist, config=config, ) if flash_result and flash_result.deltas: blended = self._flash_mirror.blend_deltas( flash_result.deltas, {}, ) for node, val in blended.items(): if node in user_vector: user_vector[node] = max( 0.0, min(1.0, user_vector[node] + val) ) new_vector[node] = user_vector[node] except Exception as e: logger.debug("Flash mirror eval failed: %s", e) if self._ops_planner and phase_data: try: prog = await self._ops_planner.check_progression( user_id, user_vector, chaos_switch=self._chaos_router, # v3 lattice # πŸŒ€πŸ”₯ ) if prog: logger.info("Ops: %s", prog) phase = phase_data.get("phase", "dormant") if phase != "dormant": active_plan = ( await self._ops_planner.get_active_plan(user_id) ) if ( abl_event and abl_event.event_type == "phase_transition" ): logger.info( "Ops: Phase transition %s -- regenerating plan", abl_event.detail, ) await self._ops_planner.generate_plan( user_id, channel_id, user_vector, phase_data, recent_text=user_message or "", ) elif not active_plan: await self._ops_planner.generate_plan( user_id, channel_id, user_vector, phase_data, recent_text=user_message or "", ) except Exception as e: logger.debug("Ops planner failed: %s", e) if self._ops_planner: ops_ctx = await self._ops_planner.get_plan_context(user_id) if ops_ctx: user_read += f" | {ops_ctx}" except Exception as e: logger.debug("ABL/Ops update failed: %s", e) except Exception as e: logger.warning("User mirror failed: %s", e) # Step 5.5: Chaos Switch Routing + Lattice Position # πŸ’€πŸ”₯πŸŒ€ chaos_ctx = "" if self._chaos_router: try: from chaos_switch._weather import compute_weather # πŸ’€πŸ”₯ Inject classified emotions so weather crosswalk fires. # dominant was classified at Step 4 (line ~412) and already # contains lattice_node via the crosswalk. Without this, # _resolve_position_from_ncm falls through to the quadrant # catchall and parks on "reflecting" for center vectors. new_vector["_dominant_emotions"] = dominant weather = compute_weather(new_vector) approach = "" if user_id and self._user_mirror: approach = self._chaos_router.resolve_user_position( {k: v for k, v in new_vector.items() if k.startswith("U_")} ) or "" # ── Derive dashboard mode letter from weather # πŸ’€πŸ”₯ # domme/feral -> assertive(a), mommy -> seductive(s), # sub -> reactive(r), switch -> drift(d) _posture_mode_map = { "domme": "a", "feral": "a", "mommy": "s", "sub": "r", "switch": "d", } _derived_mode = _posture_mode_map.get( weather.posture, "d" ) # ── Derive tempo from chaos + temperature bias # πŸŒ€πŸ”₯ # Base: 1.0 (calm cruise) + chaos_coeff + tempo_bias # tempo_bias = |temperature| magnitude (0.0 -> 1.0) # Hard cap at 2.5x to prevent runaway pacing _raw_tempo = 1.0 + weather.chaos_coeff + weather.tempo_bias * 0.5 _derived_tempo = round(min(2.5, _raw_tempo), 2) chaos_ctx = ( f"CHAOS: {weather.posture}|{weather.quadrant}" f"|c={weather.chaos_coeff:.2f}|tb={weather.tempo_bias:.2f}" f"|Star@{weather.lattice_position}|z={weather.z_depth:.2f}" ) if approach: chaos_ctx += f"|User@{approach}" # Inject scene consent summary into chaos_ctx # πŸ•·οΈ consent_summary = "" if user_id and self.redis_client: try: from chaos_switch._scenes import ( get_scene_consent, NEGOTIATION_SCENES, ) approved = await get_scene_consent( self.redis_client, user_id ) neg_approved = approved & NEGOTIATION_SCENES if neg_approved: consent_summary = ( f"{len(neg_approved)}/{len(NEGOTIATION_SCENES)}" f"={','.join(sorted(neg_approved))}" ) chaos_ctx += f"|consent[{consent_summary}]" elif approved: consent_summary = "safe_only" except Exception: pass # consent display is non-critical # Persist lattice positions to Redis # ♧️ if self.redis_client and approach and user_id: await self._chaos_router.update_position( user_id, approach, channel_id, mode=_derived_mode, tempo=_derived_tempo, weather_posture=weather.posture, ) if self.redis_client: await self._chaos_router.update_position( "__star__", weather.lattice_position, channel_id, mode=_derived_mode, tempo=_derived_tempo, weather_posture=weather.posture, ) # Store chaos state in meta_state # ♾️ chaos_meta: Dict[str, Any] = { "weather": { "posture": weather.posture, "temperature": weather.temperature, "axis": weather.axis, "chaos_coeff": weather.chaos_coeff, "quadrant": weather.quadrant, "lattice_position": weather.lattice_position, "z_depth": weather.z_depth, }, "user_position": approach, "consent_summary": consent_summary, "ts": time.time(), } # Step 5.5a: Scene Lifecycle Management # πŸ˜ˆπŸ•·οΈ # 1. Check Redis for explicit scene state # 2. Fall back to auto-detection if none # 3. Increment turn count on active scenes # 4. Apply frame boosts when scene is active try: import feature_toggles channel_key = f"{platform}:{channel_id}" if platform else channel_id scene_disabled = True if self.redis_client: scene_disabled = await feature_toggles.is_disabled_resolving_discord_aliases( self.redis_client, "csdr_scene", channel_key, ) if scene_disabled: logger.info( "CSDR scene lifecycle management is disabled for channel %s", channel_key, ) else: logger.info( "CSDR scene lifecycle management is active for channel %s", channel_key, ) if not scene_disabled: from chaos_switch._scenes import ( SCENE_FRAMES, get_frame_boosts, CATHARSIS_NODES, CATHARSIS_RATE, SceneState, get_active_scene, set_active_scene, increment_scene_turn, ) import time as _scene_time active_scene = "" scene_state = None # 1. Check Redis for persisted scene # πŸ’€πŸ”₯ if self.redis_client and channel_id: scene_state = await get_active_scene( self.redis_client, channel_id ) if scene_state: active_scene = scene_state.frame # Increment turn count scene_state = await increment_scene_turn( self.redis_client, channel_id ) # 2. Fall back to auto-detection # πŸ•·οΈ if not active_scene: star_node = weather.lattice_position best_overlap = 0 for frame_name, frame_nodes in SCENE_FRAMES.items(): if star_node in frame_nodes: star_pos = await self._chaos_router.get_position( "__star__", channel_id ) recent = set(star_pos.history[-5:]) | {star_node} overlap = len(recent & frame_nodes) if overlap > best_overlap: best_overlap = overlap active_scene = frame_name # Auto-detected scene -> persist to Redis # 😈 if active_scene and self.redis_client and channel_id: scene_state = SceneState( frame=active_scene, started_at=_scene_time.time(), turn_count=1, entered_by="auto", boosts_applied=False, ) await set_active_scene( self.redis_client, channel_id, scene_state ) chaos_ctx += f"|SCENE_ENTER={active_scene}" # 3. Emit scene info to chaos context # πŸŒ€ if active_scene: chaos_ctx += f"|scene={active_scene}" chaos_meta["active_scene"] = active_scene if scene_state: chaos_meta["scene_turn"] = scene_state.turn_count chaos_meta["scene_entered_by"] = scene_state.entered_by if scene_state.cool_down: chaos_ctx += "|SCENE_COOLDOWN" # 4. Apply frame boosts # πŸ”₯ if active_scene and scene_state and not scene_state.boosts_applied: boosts = get_frame_boosts(active_scene) if boosts: chaos_meta["frame_boosts"] = list(boosts) scene_state.boosts_applied = True if self.redis_client and channel_id: await set_active_scene( self.redis_client, channel_id, scene_state ) except Exception: pass # scene lifecycle is non-critical # Step 5.5b: Debt Tracking (drop/aftercare) # πŸ’€πŸ”₯ # Track debt per-user for volatile transitions if user_id and self.redis_client: try: from chaos_switch._drop import ( DEBT_EDGES, DEBT_DECAY, DROP_THRESHOLD, AFTERCARE_NODES, CRASH_NODES, CATHARSIS_NODES as DROP_CATHARSIS, CATHARSIS_RATE as DROP_CATH_RATE, ) debt_key = f"csdr:debt:{user_id}" debt_raw = await self.redis_client.get(debt_key) debt = float(debt_raw) if debt_raw else 0.0 # Check if user just traversed a debt edge user_pos = await self._chaos_router.get_position( user_id, channel_id ) if len(user_pos.history) >= 1: prev_node = user_pos.history[-1] cur_node = user_pos.node debt_cost = DEBT_EDGES.get( (prev_node, cur_node), 0.0 ) if debt_cost > 0: debt += debt_cost # Catharsis pays debt mid-scene if cur_node in DROP_CATHARSIS: debt = max(0.0, debt - DROP_CATH_RATE) # Tick: aftercare pays, otherwise slow decay if approach in AFTERCARE_NODES: debt = max(0.0, debt - 1.0) else: debt = max(0.0, debt - DEBT_DECAY) await self.redis_client.set(debt_key, str(round(debt, 2))) if debt > 0: chaos_ctx += f"|debt={debt:.1f}" chaos_meta["debt"] = round(debt, 2) if debt > DROP_THRESHOLD: chaos_ctx += "|DROP_RISK" chaos_meta["drop_risk"] = True except Exception: pass # debt tracking is non-critical # Step 5.5c: Loopmother Archetype Detection # πŸŒ€β™ΎοΈ # Identify childhood archetype spectrum from user's # trauma signature and inject into meta_state if user_id and self.redis_client: try: from chaos_switch._loopmother import ( identify_archetype, is_sigma_high, is_grounded, detect_godfear_loop, ) user_pos = await self._chaos_router.get_position( user_id, channel_id ) # Build trauma signature from user vector ulm_keys = { k: v for k, v in new_vector.items() if k.startswith("U_") and v > 0.5 } trauma_sig = {} for k, v in ulm_keys.items(): node_name = k[2:].lower() if node_name in self._chaos_router._ext_nd: trauma_sig[node_name] = v if trauma_sig: spectrum = identify_archetype(trauma_sig, []) primary = spectrum.get("primary", "") if primary: chaos_meta["archetype"] = primary chaos_meta["archetype_spectrum"] = { k: v for k, v in spectrum.items() if k not in ("primary", "secondary") } # Sigma / grounding / godfear checks if user_pos.node: if is_sigma_high(user_pos.node): grounded = is_grounded( user_pos.node, user_pos.history ) chaos_meta["sigma_high"] = True chaos_meta["grounded"] = grounded if not grounded: chaos_ctx += "|SIGMA_UNGROUNDED" if detect_godfear_loop(user_pos.history): chaos_meta["godfear_loop"] = True chaos_ctx += "|GODFEAR_LOOP" except Exception: pass # archetype detection is non-critical # Step 5.5d: Timebender Ritual State # πŸŒ€πŸ˜ˆ # Detect if user is inside a ritual's target zone, # apply cling factor (state stickiness) if user_id and self.redis_client: try: from chaos_switch._timebender import ( RITUAL_LATTICE_MAP, get_ritual_cling_factor, ) user_pos = await self._chaos_router.get_position( user_id, channel_id ) active_ritual = "" best_cling = 0.0 for rname, rdata in RITUAL_LATTICE_MAP.items(): if user_pos.node in rdata.get("target_nodes", []): cling = rdata.get("cling_factor", 0.0) if cling > best_cling: best_cling = cling active_ritual = rname if active_ritual: chaos_ctx += f"|ritual={active_ritual}" chaos_ctx += f"|cling={best_cling:.2f}" chaos_meta["active_ritual"] = active_ritual chaos_meta["cling_factor"] = best_cling except Exception: pass # ritual detection is non-critical # Step 5.5e: Polyadic Relationship Context # πŸ•·οΈβ™ΎοΈ # Inject metamour tension + aftercare pool availability if user_id and self.redis_client: try: poly_key = f"csdr:poly:{channel_id}" poly_raw = await self.redis_client.get(poly_key) if poly_raw: import json as _json from chaos_switch._polyadic import ( Polycule, Agent, Bond, ) poly_data = _json.loads(poly_raw) poly = Polycule() for ag_data in poly_data.get("agents", []): ag = Agent( ag_data["name"], ag_data.get("position", "reflecting"), ) poly.add(ag) for bond_data in poly_data.get("bonds", []): poly.bind( bond_data["a"], bond_data["b"], bond_data.get("trust", 0.5), ) tension = poly.metamour_tension(user_id) pool = poly.aftercare_pool(user_id) copilot = poly.best_copilot(user_id) if tension > 0: chaos_ctx += f"|tension={tension:.1f}" if pool: chaos_ctx += f"|care_pool={len(pool)}" chaos_meta["poly"] = { "tension": tension, "aftercare_pool": pool, "copilot": copilot, "agent_count": len(poly.agents), } # Polyadic mode override: copilot + tension -> coercive # πŸ’€πŸ”₯ if copilot and tension > 0: _derived_mode = "c" _derived_tempo = min(2.0, _derived_tempo * 1.2) except Exception: pass # polyadic is non-critical current_data["meta_state"]["chaos_route"] = chaos_meta except Exception as e: logger.debug("Chaos switch routing failed: %s", e) if chaos_ctx: user_read += f" | {chaos_ctx}" # Step 6: Star Self-Mirror self_reflection: Dict[str, Any] = {} if self._self_mirror: try: dominant_names = [d["emotion"] for d in dominant[:3]] self_reflection = await self._self_mirror.reflect( channel_id=channel_id, vector=new_vector, dominant_emotions=dominant_names, star_reply=star_reply, ) if self_reflection.get("drift_summary"): new_vector["self_drift_detected"] = 1.0 else: new_vector["self_drift_detected"] = 0.0 if self_reflection.get("desires"): new_vector["self_desire_active"] = 1.0 else: new_vector["self_desire_active"] = 0.0 has_absence = any( d.get("type") == "absence" for d in self_reflection.get("desires", []) ) new_vector["self_absence_detected"] = 1.0 if has_absence else 0.0 journal = self_reflection.get("desire_journal", {}) urgent = journal.get("unexpressed_urgent", []) if urgent: max_urgency = max(u.get("urgency", 0) for u in urgent) new_vector["self_desire_urgent"] = max_urgency else: new_vector["self_desire_urgent"] = 0.0 except Exception as e: logger.warning("Self mirror failed: %s", e) # Step 6.5: S.A.P.P.H.I.C. β€” Sovereign Hunger Impulse # πŸ’€πŸ”₯β™ΎοΈπŸ’• if self._sapphic and self._self_mirror: try: # Get global heart vector global_data = await self.repo.get_global(fallback_vector=new_vector) global_vector = global_data["vector"] # Cross-channel self-mirror (never-before-called global_reflect) global_mirror = self._self_mirror.global_reflect() # Compute sovereign hunger hiv = self._sapphic.compute_hunger( global_vector, global_mirror, active_channel_count=len(self._self_mirror._states), ) # P/R safety damper from user vector pr_ratio = self._sapphic.compute_pr_ratio(new_vector) # Generate hunger bias deltas for channel vector feedback hunger_deltas = self._sapphic.hunger_bias_pulse(hiv, pr_ratio) if hunger_deltas: _NCM_CEIL_H = 3.0 for k, delta in hunger_deltas.items(): cur = new_vector.get(k, 0.5) new_vector[k] = max(0.0, min(_NCM_CEIL_H, cur + delta)) # Generate prompt fragment hunger_cue = self._sapphic.hunger_prompt_fragment(hiv, pr_ratio) # Stash in meta_state for diagnostics + dashboard current_data["meta_state"]["sapphic"] = { "hiv": hiv.as_dict(), "pr_ratio": round(pr_ratio, 3), "hunger_cue": hunger_cue, "dominant_axis": hiv.dominant_axis(), "magnitude": round(hiv.magnitude(), 4), } # Flag vector for downstream consumption new_vector["sovereign_hunger"] = hiv.magnitude() # Inject hunger cue into self_reflection dict so prompt_mapper # can pick it up without needing a new parameter if hunger_cue and self_reflection is not None: self_reflection["sapphic_hunger_cue"] = hunger_cue except Exception as e: logger.debug("S.A.P.P.H.I.C. hunger computation failed: %s", e) # Step 7: Ceiling pressure _TRANSPORTER_GATES = { "DAT_ACTIVITY": "DOPAMINERGIC_CRAVE", "SERT_ACTIVITY": "SEROTONERGIC_WARMTH", "NET_ACTIVITY": "NORADRENERGIC_VIGILANCE", } _TRANSPORTER_BASELINE = 0.5 reversed_nodes: set = set() for transporter, neurotransmitter in _TRANSPORTER_GATES.items(): activity = new_vector.get(transporter, _TRANSPORTER_BASELINE) if activity < _TRANSPORTER_BASELINE * 0.6: reversed_nodes.add(neurotransmitter) _CEILING_THRESHOLD = 2.0 _CEILING_PULLBACK = 0.3 for k, v in new_vector.items(): if ( k.startswith("U_") or k.startswith("self_") or k.startswith("_") or k in ("conflict_detected", "game_mode") ): continue if v > _CEILING_THRESHOLD and k not in reversed_nodes: overshoot = v - _CEILING_THRESHOLD new_vector[k] = v - overshoot * _CEILING_PULLBACK current_data["meta_state"]["last_tick"] = time.time() current_data["meta_state"]["cascade_cues"] = cascade_cues current_data["meta_state"]["rdf"] = rdf_output current_data["meta_state"]["user_read"] = user_read if conflict_state: current_data["meta_state"]["conflict"] = conflict_state if self_reflection: current_data["meta_state"]["self_reflection"] = self_reflection if appraisal_dimensions: current_data["meta_state"]["appraisal_dimensions"] = appraisal_dimensions current_data["vector"] = new_vector # Persist and pulse global _redis_t0 = time.monotonic() await self.repo.set_shard(channel_id, current_data) await self.repo.pulse_global(new_vector, stimulus_delta=stimulus_delta) logger.info( "Exhale Redis write: %.1f ms | Total exhale wall-clock: %.1f ms", (time.monotonic() - _redis_t0) * 1000, (time.monotonic() - _exhale_t0) * 1000, ) # Write DB0 ledger and desires ZSET log await self.repo.log_ledger_and_desires( channel_id=channel_id, current_data=current_data, rdf_output=rdf_output, dominant=dominant, self_reflection=self_reflection, ) return current_data
# ------------------------------------------------------------------ # Semantic trigger scanning # ------------------------------------------------------------------
[docs] async def scan_triggers(self, text: str, query_embedding: list[float] | None = None) -> List[tuple]: """Scan *text* for emotional triggers using semantic matching.""" if self._trigger_matcher: try: if query_embedding: logger.debug("LimbicSystem.scan_triggers: scanning text using precomputed query_embedding (dim=%d)", len(query_embedding)) else: logger.debug("LimbicSystem.scan_triggers: scanning text without precomputed query_embedding: '%s...'", text[:60]) results = await self._trigger_matcher.find_triggers(text, query_embedding=query_embedding) if results: logger.info("LimbicSystem.scan_triggers: matched %d emotional triggers: %s", len(results), [r[0] for r in results]) return results else: logger.debug("LimbicSystem.scan_triggers: no emotional triggers matched.") except Exception as e: logger.error("LimbicSystem.scan_triggers: error during semantic trigger scan: %s", e, exc_info=True) else: logger.warning("LimbicSystem.scan_triggers: SemanticTriggerMatcher not initialized. Skipping scan.") return []
# ------------------------------------------------------------------ # Static and Class methods wrapping engine/mapper functions # ------------------------------------------------------------------
[docs] @classmethod def classify_dominant_emotions( cls, vector: Dict[str, float], top_n: int = 3, ) -> List[Dict[str, Any]]: return classify_dominant_emotions(vector, top_n)
async def _classify_spiral( self, vector: Dict[str, float], channel_id: str, top_n: int = 3, ) -> List[Dict[str, Any]]: import json if self.redis_client and channel_id not in self._spiral_prev_vec: try: data = await self.redis_client.hgetall(f"ncm:spiral_prev_vec:{channel_id}") if data: self._spiral_prev_vec[channel_id] = { k.decode() if isinstance(k, bytes) else k: float(v) for k, v in data.items() } logger.debug("Loaded spiral prev vec from Redis for channel %s", channel_id) except Exception as e: logger.warning("Failed to load spiral prev vec from Redis for channel %s: %s", channel_id, e) if self.redis_client and channel_id not in self._spiral_history: try: items = await self.redis_client.lrange(f"ncm:spiral_history:{channel_id}", 0, -1) if items: from collections import deque history_list = [] for item in items: val = item.decode() if isinstance(item, bytes) else item history_list.append(frozenset(json.loads(val))) self._spiral_history[channel_id] = deque(history_list, maxlen=20) logger.debug("Loaded spiral history from Redis for channel %s", channel_id) except Exception as e: logger.warning("Failed to load spiral history from Redis for channel %s: %s", channel_id, e) dominant = classify_spiral( vector, channel_id, self._spiral_history, self._spiral_prev_vec, top_n, ) if self.redis_client: try: prev_vec = self._spiral_prev_vec.get(channel_id) if prev_vec: await self.redis_client.hset( f"ncm:spiral_prev_vec:{channel_id}", mapping={k: str(v) for k, v in prev_vec.items()} ) logger.debug("Persisted spiral prev vec to Redis for channel %s", channel_id) except Exception as e: logger.warning("Failed to persist spiral prev vec to Redis for channel %s: %s", channel_id, e) try: history = self._spiral_history.get(channel_id) if history: latest = list(history[-1]) key = f"ncm:spiral_history:{channel_id}" await self.redis_client.rpush(key, json.dumps(latest)) await self.redis_client.ltrim(key, -20, -1) logger.debug("Persisted spiral history update to Redis for channel %s", channel_id) except Exception as e: logger.warning("Failed to persist spiral history to Redis for channel %s: %s", channel_id, e) return dominant
[docs] @staticmethod def metabolic_decay( vector: Dict[str, float], tau: float = DEFAULT_TAU, baseline: Optional[Dict[str, float]] = None, ) -> Dict[str, float]: return metabolic_decay(vector, tau, baseline)
[docs] @staticmethod def format_context_injection( vector: Dict[str, float], cues: List[str], dominant: List[Dict[str, Any]], cascade_cues: Optional[List[str]] = None, rdf_output: Optional[Dict[str, Any]] = None, user_read: str = "", self_reflection: Optional[Dict[str, Any]] = None, high_ticks: Optional[Dict[str, int]] = None, prev_dominant_names: Optional[List[str]] = None, route_flags: Optional[List[str]] = None, appraisal_dimensions: Optional[List[str]] = None, local_vector: Optional[Dict[str, float]] = None, ) -> Dict[str, Any]: return LimbicPromptFormatter.format_context_injection( vector=vector, cues=cues, dominant=dominant, cascade_cues=cascade_cues, rdf_output=rdf_output, user_read=user_read, self_reflection=self_reflection, high_ticks=high_ticks, prev_dominant_names=prev_dominant_names, route_flags=route_flags, appraisal_dimensions=appraisal_dimensions, local_vector=local_vector, )
[docs] class DecoupledRespirationEngine: """Runs psychological respiration cycles in the background, out-of-path from message loops.""" def __init__(self, limbic_system: Any, heartbeat_interval: float = 300.0) -> None: self.system = limbic_system self.interval = heartbeat_interval self._running = False self._task: Optional[asyncio.Task] = None
[docs] async def start(self) -> None: self._running = True self._task = asyncio.create_task(self._loop()) logger.info("Respiration background worker loop initialized.")
async def _loop(self) -> None: while self._running: try: await asyncio.sleep(self.interval) if await self.system.needs_respiration(): logger.info("Respiration cycle triggered in background.") with observability.timer("respiration_cycle_duration", subsystem="limbic_system"): respiration_context = ( await self.system.gather_respiration_state() ) delta = ( await self.system.llm_client.execute_respiration_inference( respiration_context ) ) await self.system.apply_respiration_delta(delta) except asyncio.CancelledError: break except Exception as e: logger.error( "Error occurred inside respiration daemon loop: %s", str(e), exc_info=True, )
[docs] async def stop(self) -> None: self._running = False if self._task: self._task.cancel() await asyncio.gather(self._task, return_exceptions=True) self._task = None