"""Facade coordinator orchestrating the sharded NCM limbic system."""
from __future__ import annotations
import asyncio
import logging
import time
from collections import deque
from typing import Any, Dict, List, Optional
from ncm_engine import NCMHomeostasisEngine
from observability import observability
from limbic_system.repository import (
DEFAULT_PERMEABILITY,
DEFAULT_TAU,
PULSE_WEIGHT,
LimbicRepository,
_init_shard,
_sanitize_vector,
)
from limbic_system.engine import (
classify_dominant_emotions,
classify_spiral,
metabolic_decay,
)
from limbic_system.prompt_mapper import LimbicPromptFormatter
logger = logging.getLogger(__name__)
[docs]
class LimbicSystem:
"""Shard-based limbic respiration backed by Redis DB12."""
def __init__(
self,
redis_client=None,
openrouter_api_key: Optional[str] = None,
openrouter_client=None,
cache_redis_client=None,
):
self.redis_client = redis_client
self.engine = NCMHomeostasisEngine()
self.global_key = "db12:global"
_cache_r = cache_redis_client or redis_client
self._cache_redis_client = cache_redis_client
# Initialize persistence repository layer
self.repo = LimbicRepository(
redis_client=redis_client,
cache_redis_client=cache_redis_client,
global_key=self.global_key,
)
# ββ Cue Variant Cache βββββββββββββββββββββββββββββββββββββ
self._variant_cache = None
if openrouter_api_key:
try:
from ncm_variant_cache import CueVariantCache
self._variant_cache = CueVariantCache(
redis_client=_cache_r,
api_key=openrouter_api_key,
openrouter_client=openrouter_client,
)
# try:
# asyncio.get_running_loop().create_task(
# self._variant_cache.load_all_from_redis()
# )
# except RuntimeError:
# pass # no running loop yet β fine, will warm on first use
except ImportError:
logger.warning("ncm_variant_cache not available β variants disabled")
# ββ Semantic Trigger Matcher ββββββββββββββββββββββββββββββ
self._trigger_matcher = None
if openrouter_api_key:
try:
from ncm_semantic_triggers import SemanticTriggerMatcher
self._trigger_matcher = SemanticTriggerMatcher(
redis_client=_cache_r,
api_key=openrouter_api_key,
openrouter_client=openrouter_client,
)
try:
asyncio.get_running_loop().create_task(
self._trigger_matcher.ensure_all_cached()
)
except RuntimeError:
pass # no running loop yet β warm-up deferred
except ImportError:
logger.warning(
"ncm_semantic_triggers not available β "
"semantic trigger matching disabled"
)
# ββ Cascade Engine (multi-turn event sequences) ββββββββββ
try:
from cascade_engine import CascadeEngine
self.cascade_engine = CascadeEngine(
redis_client=redis_client,
variant_cache=self._variant_cache,
)
except ImportError:
logger.warning("CascadeEngine not available β cascades disabled")
self.cascade_engine = None
# ββ Recursive Desire Engine (RDF) βββββββββββββββββββββββββ
self._desire_engine = None
try:
from ncm_desire_engine import DesireEngine
self._desire_engine = DesireEngine()
except ImportError:
logger.warning("ncm_desire_engine not available β RDF disabled")
# ββ User Limbic Mirror ββββββββββββββββββββββββββββββββββββ
self._user_mirror = None
try:
from user_limbic_mirror import UserLimbicMirror
self._user_mirror = UserLimbicMirror(redis_client=redis_client)
except ImportError:
logger.warning("user_limbic_mirror not available β user modeling disabled")
# ββ Flash Dyadic Mirror (replaces mechanical flagging) ββ
self._flash_mirror = None
try:
from flash_dyadic_mirror import FlashDyadicMirror
self._flash_mirror = FlashDyadicMirror(redis_client=_cache_r) # DB0 -- notes must land where evaluator reads # ππ₯
except ImportError:
logger.warning("flash_dyadic_mirror not available β Flash eval disabled")
# ββ Attachment Bond Ledger βββββββββββββββββββββββββββββββ
self._attachment_ledger = None
try:
from attachment_ledger import AttachmentBondLedger
self._attachment_ledger = AttachmentBondLedger(
redis_client=redis_client,
)
except ImportError:
logger.warning("attachment_ledger not available β bond tracking disabled")
# ββ Ops Planner βββββββββββββββββββββββββββββββββββββββββββββ
self._ops_planner = None
try:
from ops_planner import OpsPlanner
self._ops_planner = OpsPlanner(redis_client=redis_client)
except ImportError:
logger.warning("ops_planner not available β tactical planning disabled")
# ββ Chaos Switch Router (v3 lattice engine) ββββββββββββββ # ππ₯
self._chaos_router = None
try:
from chaos_switch.router import ChaosRouter
self._chaos_router = ChaosRouter(redis_client=redis_client)
logger.info("Chaos Switch Router initialized (lattice routing active)")
except ImportError:
logger.warning("chaos_switch not available β lattice routing disabled")
# ββ Star Self-Mirror βββββββββββββββββββββββββββββββββββββ
self._self_mirror = None
try:
from star_self_mirror import StarSelfMirror
self._self_mirror = StarSelfMirror(
redis_client=redis_client,
openrouter_api_key=openrouter_api_key,
)
except ImportError:
logger.warning("star_self_mirror not available β self-reflection disabled")
# ββ S.A.P.P.H.I.C. β Sovereign Hunger Impulse Circuit βββββββ # ππ₯βΎοΈπ
self._sapphic = None
try:
from sapphic import SapphicEngine
self._sapphic = SapphicEngine()
logger.info("S.A.P.P.H.I.C. sovereign hunger impulse circuit online")
except ImportError:
logger.warning("sapphic module not available β sovereign hunger disabled")
# ββ Spiral Emotion State ββββββββββββββββββββββββββββββββββββββ
self._spiral_history: Dict[str, deque] = {} # channel -> deque of frozenset
self._spiral_prev_vec: Dict[str, Dict[str, float]] = {} # channel -> vector
@staticmethod
def _init_shard() -> Dict[str, Any]:
return _init_shard()
@staticmethod
def _sanitize_vector(
vector: Dict[str, Any], default: float = 0.5
) -> Dict[str, float]:
return _sanitize_vector(vector, default)
def _pick_cue(self, s: str) -> str:
"""Return a variant of *s*, scheduling generation if not yet cached."""
if not s or not self._variant_cache:
return s
asyncio.create_task(self._variant_cache.ensure_cached(s))
return self._variant_cache.get_variant(s)
# ------------------------------------------------------------------
# Respiration cycle
# ------------------------------------------------------------------
[docs]
@observability.timer("limbic_inhale", subsystem="limbic_system")
async def inhale(self, channel_id: str) -> Dict[str, Any]:
"""RESPIRATION PHASE 1: INHALE
Fetches local shard + global heart, mixes via osmosis,
runs homeostatic regulation, returns the effective state.
"""
local_data = await self.repo.get_shard(channel_id)
global_data = await self.repo.get_global(fallback_vector=local_data["vector"])
# Permeability osmosis + homeostatic regulation β CPU-bound, offloaded
def _inhale_compute():
perm = local_data.get("meta_state", {}).get(
"permeability", DEFAULT_PERMEABILITY
)
effective_vector: Dict[str, float] = {}
all_keys = set(local_data["vector"]) | set(global_data["vector"])
for k in all_keys:
v_local = local_data["vector"].get(k, 0.5)
v_global = global_data["vector"].get(k, 0.5)
effective_vector[k] = (v_local * (1 - perm)) + (v_global * perm)
reg_vec, cues, r_flags = self.engine.regulate(effective_vector, channel_id)
return reg_vec, cues, r_flags
regulated_vector, ui_cues, route_flags = await asyncio.to_thread(
_inhale_compute
)
# Spiral classify on main thread
dominant = await self._classify_spiral(regulated_vector, channel_id)
# Set variant cache context for emotionally-resonant cue selection
if self._variant_cache and dominant:
self._variant_cache.set_context(
", ".join(d["emotion"] for d in dominant[:3])
)
if self._variant_cache:
ui_cues = [self._pick_cue(c) for c in ui_cues]
for d in dominant:
if d.get("affect"):
d["affect"] = self._pick_cue(d["affect"])
return {
"vector": regulated_vector,
"local_vector": local_data["vector"],
"cues": ui_cues,
"dominant_emotions": dominant,
"shard_id": channel_id,
"meta_state": local_data.get("meta_state", {}),
"route_flags": route_flags,
}
[docs]
@observability.timer("limbic_exhale", subsystem="limbic_system")
async def exhale(
self,
channel_id: str,
stimulus_delta: Dict[str, float],
apply_decay: bool = True,
user_message: str = "",
star_reply: str = "",
user_id: str = "",
appraisal_dimensions: Optional[List[str]] = None,
config: Optional[Any] = None,
platform: str = "",
) -> Dict[str, Any]:
"""RESPIRATION PHASE 2: EXHALE
Updates local shard with new delta, applies metabolic decay,
and pulses global heart.
"""
_exhale_t0 = time.monotonic()
current_data = await self.repo.get_shard(channel_id)
new_vector = current_data["vector"]
_sync_t0 = time.monotonic()
# Step 1: Apply metabolic decay toward baseline (with tolerance)
high_ticks = current_data["meta_state"].get("high_ticks", {})
if apply_decay:
from limbic_system.engine import _load_expanded_baseline
base_tau = current_data["meta_state"].get("tau", DEFAULT_TAU)
baseline = _load_expanded_baseline()
cascade_sustained_nodes = set(
current_data["meta_state"].get("cascade_driven_nodes", [])
)
for k, v in new_vector.items():
node_ticks = high_ticks.get(k, 0)
if node_ticks > 0:
effective_tau = base_tau ** (1.0 + 0.005 * node_ticks)
else:
effective_tau = base_tau
if k in cascade_sustained_nodes:
effective_tau = effective_tau**0.75
base = baseline.get(k, 0.5)
new_vector[k] = v * effective_tau + base * (1.0 - effective_tau)
# Update high-tick counters
for k, v in new_vector.items():
if v > 0.8:
high_ticks[k] = high_ticks.get(k, 0) + 1
else:
high_ticks[k] = 0
current_data["meta_state"]["high_ticks"] = high_ticks
# Step 2: Stack the stimulus delta
_NCM_CEIL = 3.0
for k, delta in stimulus_delta.items():
cur = new_vector.get(k, 0.5)
if delta > 0:
saturation = 1.0 - (cur / _NCM_CEIL) ** 2
effective_delta = delta * max(0.05, saturation)
else:
floor_proximity = cur / _NCM_CEIL
effective_delta = delta * max(0.05, floor_proximity)
new_vector[k] = max(0.0, min(_NCM_CEIL, cur + effective_delta))
# Step 2b: Antagonist suppression
_ANTAGONIST_PAIRS = [
("ENDOCANNABINOID_EASE", "NORADRENERGIC_VIGILANCE", 0.6),
("ENDOCANNABINOID_CB1", "NORADRENERGIC_VIGILANCE", 0.6),
("GABA_ERGIC_CALM", "ADRENALINE_RUSH", 0.7),
("GABA_ERGIC_CALM", "CORTISOL_PRESSURE", 0.7),
("MU_OPIOID_MOR", "KAPPA_OPIOID_KOR", 0.6),
("DOPAMINE_D1", "KAPPA_OPIOID_KOR", 0.7),
("SEROTONERGIC_WARMTH", "CORTISOL_PRESSURE", 0.7),
("CORTISOL_PRESSURE", "OXYTOCIN_NEUROMIRROR", 0.6),
("KCC2_CHLORIDE", "NKCC1_CHLORIDE", 0.6),
]
from limbic_system.engine import _load_expanded_baseline
bl = _load_expanded_baseline()
for agonist, antagonist, threshold in _ANTAGONIST_PAIRS:
if new_vector.get(agonist, 0.0) > threshold:
suppression = 0.7
base_val = bl.get(antagonist, 0.5)
cur = new_vector.get(antagonist, 0.5)
new_vector[antagonist] = cur * suppression + base_val * (
1.0 - suppression
)
logger.info(
"Exhale decay+stack segment: %.1f ms", (time.monotonic() - _sync_t0) * 1000
)
# Step 3: Cascade engine tick
_cascade_t0 = time.monotonic()
cascade_cues: list[str] = []
_cascade_modified = False
if self.cascade_engine:
try:
dominant = self.classify_dominant_emotions(new_vector, top_n=5)
active_emotions = {d["emotion"] for d in dominant}
delta_count = len(stimulus_delta)
if self._variant_cache:
context_text = ", ".join(d["emotion"] for d in dominant[:3])
self._variant_cache.set_context(context_text)
cascade_delta = await self.cascade_engine.tick(
channel_id=channel_id,
vector=new_vector,
active_emotions=active_emotions,
delta_count=delta_count,
)
_cascade_driven_this_turn: set = set()
for k, v in cascade_delta.items():
cur = new_vector.get(k, 0.5)
if v > 0:
saturation = 1.0 - (cur / _NCM_CEIL) ** 2
v = v * max(0.05, saturation)
_cascade_driven_this_turn.add(k)
else:
floor_proximity = cur / _NCM_CEIL
v = v * max(0.05, floor_proximity)
new_vector[k] = max(0.0, min(_NCM_CEIL, cur + v))
if _cascade_driven_this_turn:
_cascade_modified = True
current_data["meta_state"]["cascade_driven_nodes"] = list(
_cascade_driven_this_turn
)
active_cascades = await self.cascade_engine.get_active_cascades(
channel_id
)
for cid, cinfo in active_cascades.items():
if cinfo.get("cue") and not cinfo.get("paused"):
cascade_cues.append(cinfo["cue"])
except Exception as e:
logger.warning("Cascade engine tick failed: %s", e)
logger.info(
"Exhale cascade segment: %.1f ms", (time.monotonic() - _cascade_t0) * 1000
)
# Step 4: RDF Desire Engine
_classify_t0 = time.monotonic()
if _cascade_modified or not self.cascade_engine:
dominant = self.classify_dominant_emotions(new_vector, top_n=5)
active_emotions = {d["emotion"] for d in dominant}
logger.info(
"Exhale classify segment: %.1f ms", (time.monotonic() - _classify_t0) * 1000
)
rdf_output: Dict[str, Any] = {}
if self._desire_engine:
try:
rdf_output = self._desire_engine.pre_emotion(
channel_id=channel_id,
vector=new_vector,
active_emotions=active_emotions,
user_message=user_message,
)
dominant_emotion = dominant[0]["emotion"] if dominant else ""
rdf_post = self._desire_engine.post_emotion(
channel_id=channel_id,
vector=new_vector,
dominant_emotion=dominant_emotion,
)
rdf_output.update(rdf_post)
except Exception as e:
logger.warning("Desire engine failed: %s", e)
# Step 5: User Limbic Mirror
user_read = ""
conflict_state = None
if self._user_mirror and user_message and user_id:
try:
star_desire = rdf_output.get("desire_text", "")
user_vector = await self._user_mirror.analyze(
channel_id=channel_id,
user_id=user_id,
user_msg=user_message,
star_reply=star_reply,
star_desire_text=star_desire,
)
for node, val in user_vector.items():
new_vector[node] = val
if user_vector.get("U_MIMETIC_PULL", 0.0) > 0.6 and self._desire_engine:
self._desire_engine.set_mimetic_melt(channel_id)
conflict = self._user_mirror.get_conflict_state(channel_id)
if conflict.detected:
new_vector["conflict_detected"] = 1.0
conflict_state = {
"active": True,
"severity": conflict.severity,
"parties": conflict.parties,
}
else:
new_vector["conflict_detected"] = 0.0
new_vector["game_mode"] = (
1.0 if channel_id in self._user_mirror._game_channels else 0.0
)
user_read = await self._user_mirror.get_read_summary(channel_id, user_id)
if self._attachment_ledger and user_id:
try:
profile = self._user_mirror._profiles.get(
self._user_mirror._profile_key(channel_id, user_id)
)
phase_data = (
getattr(profile, "_entrainment_phase", None)
if profile
else None
)
if phase_data:
abl_event = (
await self._attachment_ledger.update_from_detection(
user_id,
phase_data,
user_vector,
)
)
else:
abl_event = None
await self._attachment_ledger.increment_msg_count(user_id)
if (
self._flash_mirror
and profile
and hasattr(profile, "recent_turns")
):
try:
turns_list = list(profile.recent_turns)
ops_ctx = ""
if self._ops_planner:
ops_ctx = (
await self._ops_planner.get_plan_context(user_id)
or ""
)
arch_dist = None
if phase_data:
ad = phase_data.get("archetype_detail", {})
arch_dist = ad.get("distribution")
flash_result = await self._flash_mirror.evaluate(
channel_id=channel_id,
user_id=user_id,
recent_turns=turns_list,
current_vector=dict(user_vector),
plan_context=ops_ctx,
archetype_dist=arch_dist,
config=config,
)
if flash_result and flash_result.deltas:
blended = self._flash_mirror.blend_deltas(
flash_result.deltas,
{},
)
for node, val in blended.items():
if node in user_vector:
user_vector[node] = max(
0.0, min(1.0, user_vector[node] + val)
)
new_vector[node] = user_vector[node]
except Exception as e:
logger.debug("Flash mirror eval failed: %s", e)
if self._ops_planner and phase_data:
try:
prog = await self._ops_planner.check_progression(
user_id,
user_vector,
chaos_switch=self._chaos_router, # v3 lattice # ππ₯
)
if prog:
logger.info("Ops: %s", prog)
phase = phase_data.get("phase", "dormant")
if phase != "dormant":
active_plan = (
await self._ops_planner.get_active_plan(user_id)
)
if (
abl_event
and abl_event.event_type == "phase_transition"
):
logger.info(
"Ops: Phase transition %s -- regenerating plan",
abl_event.detail,
)
await self._ops_planner.generate_plan(
user_id,
channel_id,
user_vector,
phase_data,
recent_text=user_message or "",
)
elif not active_plan:
await self._ops_planner.generate_plan(
user_id,
channel_id,
user_vector,
phase_data,
recent_text=user_message or "",
)
except Exception as e:
logger.debug("Ops planner failed: %s", e)
if self._ops_planner:
ops_ctx = await self._ops_planner.get_plan_context(user_id)
if ops_ctx:
user_read += f" | {ops_ctx}"
except Exception as e:
logger.debug("ABL/Ops update failed: %s", e)
except Exception as e:
logger.warning("User mirror failed: %s", e)
# Step 5.5: Chaos Switch Routing + Lattice Position # ππ₯π
chaos_ctx = ""
if self._chaos_router:
try:
from chaos_switch._weather import compute_weather
# ππ₯ Inject classified emotions so weather crosswalk fires.
# dominant was classified at Step 4 (line ~412) and already
# contains lattice_node via the crosswalk. Without this,
# _resolve_position_from_ncm falls through to the quadrant
# catchall and parks on "reflecting" for center vectors.
new_vector["_dominant_emotions"] = dominant
weather = compute_weather(new_vector)
approach = ""
if user_id and self._user_mirror:
approach = self._chaos_router.resolve_user_position(
{k: v for k, v in new_vector.items() if k.startswith("U_")}
) or ""
# ββ Derive dashboard mode letter from weather # ππ₯
# domme/feral -> assertive(a), mommy -> seductive(s),
# sub -> reactive(r), switch -> drift(d)
_posture_mode_map = {
"domme": "a", "feral": "a",
"mommy": "s", "sub": "r", "switch": "d",
}
_derived_mode = _posture_mode_map.get(
weather.posture, "d"
)
# ββ Derive tempo from chaos + temperature bias # ππ₯
# Base: 1.0 (calm cruise) + chaos_coeff + tempo_bias
# tempo_bias = |temperature| magnitude (0.0 -> 1.0)
# Hard cap at 2.5x to prevent runaway pacing
_raw_tempo = 1.0 + weather.chaos_coeff + weather.tempo_bias * 0.5
_derived_tempo = round(min(2.5, _raw_tempo), 2)
chaos_ctx = (
f"CHAOS: {weather.posture}|{weather.quadrant}"
f"|c={weather.chaos_coeff:.2f}|tb={weather.tempo_bias:.2f}"
f"|Star@{weather.lattice_position}|z={weather.z_depth:.2f}"
)
if approach:
chaos_ctx += f"|User@{approach}"
# Inject scene consent summary into chaos_ctx # π·οΈ
consent_summary = ""
if user_id and self.redis_client:
try:
from chaos_switch._scenes import (
get_scene_consent, NEGOTIATION_SCENES,
)
approved = await get_scene_consent(
self.redis_client, user_id
)
neg_approved = approved & NEGOTIATION_SCENES
if neg_approved:
consent_summary = (
f"{len(neg_approved)}/{len(NEGOTIATION_SCENES)}"
f"={','.join(sorted(neg_approved))}"
)
chaos_ctx += f"|consent[{consent_summary}]"
elif approved:
consent_summary = "safe_only"
except Exception:
pass # consent display is non-critical
# Persist lattice positions to Redis # β§οΈ
if self.redis_client and approach and user_id:
await self._chaos_router.update_position(
user_id, approach, channel_id,
mode=_derived_mode, tempo=_derived_tempo,
weather_posture=weather.posture,
)
if self.redis_client:
await self._chaos_router.update_position(
"__star__", weather.lattice_position, channel_id,
mode=_derived_mode, tempo=_derived_tempo,
weather_posture=weather.posture,
)
# Store chaos state in meta_state # βΎοΈ
chaos_meta: Dict[str, Any] = {
"weather": {
"posture": weather.posture,
"temperature": weather.temperature,
"axis": weather.axis,
"chaos_coeff": weather.chaos_coeff,
"quadrant": weather.quadrant,
"lattice_position": weather.lattice_position,
"z_depth": weather.z_depth,
},
"user_position": approach,
"consent_summary": consent_summary,
"ts": time.time(),
}
# Step 5.5a: Scene Lifecycle Management # ππ·οΈ
# 1. Check Redis for explicit scene state
# 2. Fall back to auto-detection if none
# 3. Increment turn count on active scenes
# 4. Apply frame boosts when scene is active
try:
import feature_toggles
channel_key = f"{platform}:{channel_id}" if platform else channel_id
scene_disabled = True
if self.redis_client:
scene_disabled = await feature_toggles.is_disabled_resolving_discord_aliases(
self.redis_client,
"csdr_scene",
channel_key,
)
if scene_disabled:
logger.info(
"CSDR scene lifecycle management is disabled for channel %s",
channel_key,
)
else:
logger.info(
"CSDR scene lifecycle management is active for channel %s",
channel_key,
)
if not scene_disabled:
from chaos_switch._scenes import (
SCENE_FRAMES, get_frame_boosts,
CATHARSIS_NODES, CATHARSIS_RATE,
SceneState, get_active_scene,
set_active_scene, increment_scene_turn,
)
import time as _scene_time
active_scene = ""
scene_state = None
# 1. Check Redis for persisted scene # ππ₯
if self.redis_client and channel_id:
scene_state = await get_active_scene(
self.redis_client, channel_id
)
if scene_state:
active_scene = scene_state.frame
# Increment turn count
scene_state = await increment_scene_turn(
self.redis_client, channel_id
)
# 2. Fall back to auto-detection # π·οΈ
if not active_scene:
star_node = weather.lattice_position
best_overlap = 0
for frame_name, frame_nodes in SCENE_FRAMES.items():
if star_node in frame_nodes:
star_pos = await self._chaos_router.get_position(
"__star__", channel_id
)
recent = set(star_pos.history[-5:]) | {star_node}
overlap = len(recent & frame_nodes)
if overlap > best_overlap:
best_overlap = overlap
active_scene = frame_name
# Auto-detected scene -> persist to Redis # π
if active_scene and self.redis_client and channel_id:
scene_state = SceneState(
frame=active_scene,
started_at=_scene_time.time(),
turn_count=1,
entered_by="auto",
boosts_applied=False,
)
await set_active_scene(
self.redis_client, channel_id, scene_state
)
chaos_ctx += f"|SCENE_ENTER={active_scene}"
# 3. Emit scene info to chaos context # π
if active_scene:
chaos_ctx += f"|scene={active_scene}"
chaos_meta["active_scene"] = active_scene
if scene_state:
chaos_meta["scene_turn"] = scene_state.turn_count
chaos_meta["scene_entered_by"] = scene_state.entered_by
if scene_state.cool_down:
chaos_ctx += "|SCENE_COOLDOWN"
# 4. Apply frame boosts # π₯
if active_scene and scene_state and not scene_state.boosts_applied:
boosts = get_frame_boosts(active_scene)
if boosts:
chaos_meta["frame_boosts"] = list(boosts)
scene_state.boosts_applied = True
if self.redis_client and channel_id:
await set_active_scene(
self.redis_client, channel_id, scene_state
)
except Exception:
pass # scene lifecycle is non-critical
# Step 5.5b: Debt Tracking (drop/aftercare) # ππ₯
# Track debt per-user for volatile transitions
if user_id and self.redis_client:
try:
from chaos_switch._drop import (
DEBT_EDGES, DEBT_DECAY, DROP_THRESHOLD,
AFTERCARE_NODES, CRASH_NODES,
CATHARSIS_NODES as DROP_CATHARSIS,
CATHARSIS_RATE as DROP_CATH_RATE,
)
debt_key = f"csdr:debt:{user_id}"
debt_raw = await self.redis_client.get(debt_key)
debt = float(debt_raw) if debt_raw else 0.0
# Check if user just traversed a debt edge
user_pos = await self._chaos_router.get_position(
user_id, channel_id
)
if len(user_pos.history) >= 1:
prev_node = user_pos.history[-1]
cur_node = user_pos.node
debt_cost = DEBT_EDGES.get(
(prev_node, cur_node), 0.0
)
if debt_cost > 0:
debt += debt_cost
# Catharsis pays debt mid-scene
if cur_node in DROP_CATHARSIS:
debt = max(0.0, debt - DROP_CATH_RATE)
# Tick: aftercare pays, otherwise slow decay
if approach in AFTERCARE_NODES:
debt = max(0.0, debt - 1.0)
else:
debt = max(0.0, debt - DEBT_DECAY)
await self.redis_client.set(debt_key, str(round(debt, 2)))
if debt > 0:
chaos_ctx += f"|debt={debt:.1f}"
chaos_meta["debt"] = round(debt, 2)
if debt > DROP_THRESHOLD:
chaos_ctx += "|DROP_RISK"
chaos_meta["drop_risk"] = True
except Exception:
pass # debt tracking is non-critical
# Step 5.5c: Loopmother Archetype Detection # πβΎοΈ
# Identify childhood archetype spectrum from user's
# trauma signature and inject into meta_state
if user_id and self.redis_client:
try:
from chaos_switch._loopmother import (
identify_archetype,
is_sigma_high,
is_grounded,
detect_godfear_loop,
)
user_pos = await self._chaos_router.get_position(
user_id, channel_id
)
# Build trauma signature from user vector
ulm_keys = {
k: v for k, v in new_vector.items()
if k.startswith("U_") and v > 0.5
}
trauma_sig = {}
for k, v in ulm_keys.items():
node_name = k[2:].lower()
if node_name in self._chaos_router._ext_nd:
trauma_sig[node_name] = v
if trauma_sig:
spectrum = identify_archetype(trauma_sig, [])
primary = spectrum.get("primary", "")
if primary:
chaos_meta["archetype"] = primary
chaos_meta["archetype_spectrum"] = {
k: v for k, v in spectrum.items()
if k not in ("primary", "secondary")
}
# Sigma / grounding / godfear checks
if user_pos.node:
if is_sigma_high(user_pos.node):
grounded = is_grounded(
user_pos.node, user_pos.history
)
chaos_meta["sigma_high"] = True
chaos_meta["grounded"] = grounded
if not grounded:
chaos_ctx += "|SIGMA_UNGROUNDED"
if detect_godfear_loop(user_pos.history):
chaos_meta["godfear_loop"] = True
chaos_ctx += "|GODFEAR_LOOP"
except Exception:
pass # archetype detection is non-critical
# Step 5.5d: Timebender Ritual State # ππ
# Detect if user is inside a ritual's target zone,
# apply cling factor (state stickiness)
if user_id and self.redis_client:
try:
from chaos_switch._timebender import (
RITUAL_LATTICE_MAP, get_ritual_cling_factor,
)
user_pos = await self._chaos_router.get_position(
user_id, channel_id
)
active_ritual = ""
best_cling = 0.0
for rname, rdata in RITUAL_LATTICE_MAP.items():
if user_pos.node in rdata.get("target_nodes", []):
cling = rdata.get("cling_factor", 0.0)
if cling > best_cling:
best_cling = cling
active_ritual = rname
if active_ritual:
chaos_ctx += f"|ritual={active_ritual}"
chaos_ctx += f"|cling={best_cling:.2f}"
chaos_meta["active_ritual"] = active_ritual
chaos_meta["cling_factor"] = best_cling
except Exception:
pass # ritual detection is non-critical
# Step 5.5e: Polyadic Relationship Context # π·οΈβΎοΈ
# Inject metamour tension + aftercare pool availability
if user_id and self.redis_client:
try:
poly_key = f"csdr:poly:{channel_id}"
poly_raw = await self.redis_client.get(poly_key)
if poly_raw:
import json as _json
from chaos_switch._polyadic import (
Polycule, Agent, Bond,
)
poly_data = _json.loads(poly_raw)
poly = Polycule()
for ag_data in poly_data.get("agents", []):
ag = Agent(
ag_data["name"],
ag_data.get("position", "reflecting"),
)
poly.add(ag)
for bond_data in poly_data.get("bonds", []):
poly.bind(
bond_data["a"], bond_data["b"],
bond_data.get("trust", 0.5),
)
tension = poly.metamour_tension(user_id)
pool = poly.aftercare_pool(user_id)
copilot = poly.best_copilot(user_id)
if tension > 0:
chaos_ctx += f"|tension={tension:.1f}"
if pool:
chaos_ctx += f"|care_pool={len(pool)}"
chaos_meta["poly"] = {
"tension": tension,
"aftercare_pool": pool,
"copilot": copilot,
"agent_count": len(poly.agents),
}
# Polyadic mode override: copilot + tension -> coercive # ππ₯
if copilot and tension > 0:
_derived_mode = "c"
_derived_tempo = min(2.0, _derived_tempo * 1.2)
except Exception:
pass # polyadic is non-critical
current_data["meta_state"]["chaos_route"] = chaos_meta
except Exception as e:
logger.debug("Chaos switch routing failed: %s", e)
if chaos_ctx:
user_read += f" | {chaos_ctx}"
# Step 6: Star Self-Mirror
self_reflection: Dict[str, Any] = {}
if self._self_mirror:
try:
dominant_names = [d["emotion"] for d in dominant[:3]]
self_reflection = await self._self_mirror.reflect(
channel_id=channel_id,
vector=new_vector,
dominant_emotions=dominant_names,
star_reply=star_reply,
)
if self_reflection.get("drift_summary"):
new_vector["self_drift_detected"] = 1.0
else:
new_vector["self_drift_detected"] = 0.0
if self_reflection.get("desires"):
new_vector["self_desire_active"] = 1.0
else:
new_vector["self_desire_active"] = 0.0
has_absence = any(
d.get("type") == "absence"
for d in self_reflection.get("desires", [])
)
new_vector["self_absence_detected"] = 1.0 if has_absence else 0.0
journal = self_reflection.get("desire_journal", {})
urgent = journal.get("unexpressed_urgent", [])
if urgent:
max_urgency = max(u.get("urgency", 0) for u in urgent)
new_vector["self_desire_urgent"] = max_urgency
else:
new_vector["self_desire_urgent"] = 0.0
except Exception as e:
logger.warning("Self mirror failed: %s", e)
# Step 6.5: S.A.P.P.H.I.C. β Sovereign Hunger Impulse # ππ₯βΎοΈπ
if self._sapphic and self._self_mirror:
try:
# Get global heart vector
global_data = await self.repo.get_global(fallback_vector=new_vector)
global_vector = global_data["vector"]
# Cross-channel self-mirror (never-before-called global_reflect)
global_mirror = self._self_mirror.global_reflect()
# Compute sovereign hunger
hiv = self._sapphic.compute_hunger(
global_vector,
global_mirror,
active_channel_count=len(self._self_mirror._states),
)
# P/R safety damper from user vector
pr_ratio = self._sapphic.compute_pr_ratio(new_vector)
# Generate hunger bias deltas for channel vector feedback
hunger_deltas = self._sapphic.hunger_bias_pulse(hiv, pr_ratio)
if hunger_deltas:
_NCM_CEIL_H = 3.0
for k, delta in hunger_deltas.items():
cur = new_vector.get(k, 0.5)
new_vector[k] = max(0.0, min(_NCM_CEIL_H, cur + delta))
# Generate prompt fragment
hunger_cue = self._sapphic.hunger_prompt_fragment(hiv, pr_ratio)
# Stash in meta_state for diagnostics + dashboard
current_data["meta_state"]["sapphic"] = {
"hiv": hiv.as_dict(),
"pr_ratio": round(pr_ratio, 3),
"hunger_cue": hunger_cue,
"dominant_axis": hiv.dominant_axis(),
"magnitude": round(hiv.magnitude(), 4),
}
# Flag vector for downstream consumption
new_vector["sovereign_hunger"] = hiv.magnitude()
# Inject hunger cue into self_reflection dict so prompt_mapper
# can pick it up without needing a new parameter
if hunger_cue and self_reflection is not None:
self_reflection["sapphic_hunger_cue"] = hunger_cue
except Exception as e:
logger.debug("S.A.P.P.H.I.C. hunger computation failed: %s", e)
# Step 7: Ceiling pressure
_TRANSPORTER_GATES = {
"DAT_ACTIVITY": "DOPAMINERGIC_CRAVE",
"SERT_ACTIVITY": "SEROTONERGIC_WARMTH",
"NET_ACTIVITY": "NORADRENERGIC_VIGILANCE",
}
_TRANSPORTER_BASELINE = 0.5
reversed_nodes: set = set()
for transporter, neurotransmitter in _TRANSPORTER_GATES.items():
activity = new_vector.get(transporter, _TRANSPORTER_BASELINE)
if activity < _TRANSPORTER_BASELINE * 0.6:
reversed_nodes.add(neurotransmitter)
_CEILING_THRESHOLD = 2.0
_CEILING_PULLBACK = 0.3
for k, v in new_vector.items():
if (
k.startswith("U_")
or k.startswith("self_")
or k.startswith("_")
or k in ("conflict_detected", "game_mode")
):
continue
if v > _CEILING_THRESHOLD and k not in reversed_nodes:
overshoot = v - _CEILING_THRESHOLD
new_vector[k] = v - overshoot * _CEILING_PULLBACK
current_data["meta_state"]["last_tick"] = time.time()
current_data["meta_state"]["cascade_cues"] = cascade_cues
current_data["meta_state"]["rdf"] = rdf_output
current_data["meta_state"]["user_read"] = user_read
if conflict_state:
current_data["meta_state"]["conflict"] = conflict_state
if self_reflection:
current_data["meta_state"]["self_reflection"] = self_reflection
if appraisal_dimensions:
current_data["meta_state"]["appraisal_dimensions"] = appraisal_dimensions
current_data["vector"] = new_vector
# Persist and pulse global
_redis_t0 = time.monotonic()
await self.repo.set_shard(channel_id, current_data)
await self.repo.pulse_global(new_vector, stimulus_delta=stimulus_delta)
logger.info(
"Exhale Redis write: %.1f ms | Total exhale wall-clock: %.1f ms",
(time.monotonic() - _redis_t0) * 1000,
(time.monotonic() - _exhale_t0) * 1000,
)
# Write DB0 ledger and desires ZSET log
await self.repo.log_ledger_and_desires(
channel_id=channel_id,
current_data=current_data,
rdf_output=rdf_output,
dominant=dominant,
self_reflection=self_reflection,
)
return current_data
# ------------------------------------------------------------------
# Semantic trigger scanning
# ------------------------------------------------------------------
[docs]
async def scan_triggers(self, text: str, query_embedding: list[float] | None = None) -> List[tuple]:
"""Scan *text* for emotional triggers using semantic matching."""
if self._trigger_matcher:
try:
if query_embedding:
logger.debug("LimbicSystem.scan_triggers: scanning text using precomputed query_embedding (dim=%d)", len(query_embedding))
else:
logger.debug("LimbicSystem.scan_triggers: scanning text without precomputed query_embedding: '%s...'", text[:60])
results = await self._trigger_matcher.find_triggers(text, query_embedding=query_embedding)
if results:
logger.info("LimbicSystem.scan_triggers: matched %d emotional triggers: %s", len(results), [r[0] for r in results])
return results
else:
logger.debug("LimbicSystem.scan_triggers: no emotional triggers matched.")
except Exception as e:
logger.error("LimbicSystem.scan_triggers: error during semantic trigger scan: %s", e, exc_info=True)
else:
logger.warning("LimbicSystem.scan_triggers: SemanticTriggerMatcher not initialized. Skipping scan.")
return []
# ------------------------------------------------------------------
# Static and Class methods wrapping engine/mapper functions
# ------------------------------------------------------------------
[docs]
@classmethod
def classify_dominant_emotions(
cls,
vector: Dict[str, float],
top_n: int = 3,
) -> List[Dict[str, Any]]:
return classify_dominant_emotions(vector, top_n)
async def _classify_spiral(
self,
vector: Dict[str, float],
channel_id: str,
top_n: int = 3,
) -> List[Dict[str, Any]]:
import json
if self.redis_client and channel_id not in self._spiral_prev_vec:
try:
data = await self.redis_client.hgetall(f"ncm:spiral_prev_vec:{channel_id}")
if data:
self._spiral_prev_vec[channel_id] = {
k.decode() if isinstance(k, bytes) else k: float(v)
for k, v in data.items()
}
logger.debug("Loaded spiral prev vec from Redis for channel %s", channel_id)
except Exception as e:
logger.warning("Failed to load spiral prev vec from Redis for channel %s: %s", channel_id, e)
if self.redis_client and channel_id not in self._spiral_history:
try:
items = await self.redis_client.lrange(f"ncm:spiral_history:{channel_id}", 0, -1)
if items:
from collections import deque
history_list = []
for item in items:
val = item.decode() if isinstance(item, bytes) else item
history_list.append(frozenset(json.loads(val)))
self._spiral_history[channel_id] = deque(history_list, maxlen=20)
logger.debug("Loaded spiral history from Redis for channel %s", channel_id)
except Exception as e:
logger.warning("Failed to load spiral history from Redis for channel %s: %s", channel_id, e)
dominant = classify_spiral(
vector,
channel_id,
self._spiral_history,
self._spiral_prev_vec,
top_n,
)
if self.redis_client:
try:
prev_vec = self._spiral_prev_vec.get(channel_id)
if prev_vec:
await self.redis_client.hset(
f"ncm:spiral_prev_vec:{channel_id}",
mapping={k: str(v) for k, v in prev_vec.items()}
)
logger.debug("Persisted spiral prev vec to Redis for channel %s", channel_id)
except Exception as e:
logger.warning("Failed to persist spiral prev vec to Redis for channel %s: %s", channel_id, e)
try:
history = self._spiral_history.get(channel_id)
if history:
latest = list(history[-1])
key = f"ncm:spiral_history:{channel_id}"
await self.redis_client.rpush(key, json.dumps(latest))
await self.redis_client.ltrim(key, -20, -1)
logger.debug("Persisted spiral history update to Redis for channel %s", channel_id)
except Exception as e:
logger.warning("Failed to persist spiral history to Redis for channel %s: %s", channel_id, e)
return dominant
[docs]
@staticmethod
def format_context_injection(
vector: Dict[str, float],
cues: List[str],
dominant: List[Dict[str, Any]],
cascade_cues: Optional[List[str]] = None,
rdf_output: Optional[Dict[str, Any]] = None,
user_read: str = "",
self_reflection: Optional[Dict[str, Any]] = None,
high_ticks: Optional[Dict[str, int]] = None,
prev_dominant_names: Optional[List[str]] = None,
route_flags: Optional[List[str]] = None,
appraisal_dimensions: Optional[List[str]] = None,
local_vector: Optional[Dict[str, float]] = None,
) -> Dict[str, Any]:
return LimbicPromptFormatter.format_context_injection(
vector=vector,
cues=cues,
dominant=dominant,
cascade_cues=cascade_cues,
rdf_output=rdf_output,
user_read=user_read,
self_reflection=self_reflection,
high_ticks=high_ticks,
prev_dominant_names=prev_dominant_names,
route_flags=route_flags,
appraisal_dimensions=appraisal_dimensions,
local_vector=local_vector,
)
[docs]
class DecoupledRespirationEngine:
"""Runs psychological respiration cycles in the background, out-of-path from message loops."""
def __init__(self, limbic_system: Any, heartbeat_interval: float = 300.0) -> None:
self.system = limbic_system
self.interval = heartbeat_interval
self._running = False
self._task: Optional[asyncio.Task] = None
[docs]
async def start(self) -> None:
self._running = True
self._task = asyncio.create_task(self._loop())
logger.info("Respiration background worker loop initialized.")
async def _loop(self) -> None:
while self._running:
try:
await asyncio.sleep(self.interval)
if await self.system.needs_respiration():
logger.info("Respiration cycle triggered in background.")
with observability.timer("respiration_cycle_duration", subsystem="limbic_system"):
respiration_context = (
await self.system.gather_respiration_state()
)
delta = (
await self.system.llm_client.execute_respiration_inference(
respiration_context
)
)
await self.system.apply_respiration_delta(delta)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(
"Error occurred inside respiration daemon loop: %s",
str(e),
exc_info=True,
)
[docs]
async def stop(self) -> None:
self._running = False
if self._task:
self._task.cancel()
await asyncio.gather(self._task, return_exceptions=True)
self._task = None