Source code for cascade_engine

"""NCM Cascade Engine — Multi-turn neurochemical event sequences.

╔═══════════════════════════════════════════════════════════════════════════════╗
║  🌀 CASCADE ENGINE                                                            ║
╠═══════════════════════════════════════════════════════════════════════════════╣
║  Loads cascade definitions from ncm_cascades.yaml                            ║
║  Checks triggers against current NCM vector + active emotions                ║
║  Advances active cascades one stage-tick per turn                            ║
║  Handles interrupts (abort / pause / skip_to_stage / trigger_cascade)        ║
║  Applies synergy bonuses when cascades co-activate                           ║
║  Persists state in Redis: ncm:cascades:{channel_id}                          ║
╠═══════════════════════════════════════════════════════════════════════════════╣
║  Called during exhale() after metabolic decay, stimulus delta        ║
║  stacking, and antagonist suppression.                               ║
╚═══════════════════════════════════════════════════════════════════════════════╝
"""

from __future__ import annotations

import asyncio
import json
import logging
import os
import random
import time
from typing import Any, Dict, List, Optional, Set, Tuple, TYPE_CHECKING

import yaml

if TYPE_CHECKING:
    from ncm_variant_cache import CueVariantCache

logger = logging.getLogger(__name__)

# ─────────────────────────────────────────────────────────────────────
# Constants
# ─────────────────────────────────────────────────────────────────────
MAX_CONCURRENT_CASCADES = 3
REDIS_CASCADE_KEY = "ncm:cascades:{channel_id}"
REDIS_CASCADE_TTL = 86400  # 24h

# Delta parsing (reuse from ncm_delta_parser if available)
try:
    from ncm_delta_parser import parse_delta_string
except ImportError:
    import re

    _NUM_RE = re.compile(r"^([A-Za-z0-9_]+)([+-])(\d+\.?\d*)$")
    _ARR_RE = re.compile(r"^([A-Za-z0-9_]+)(↑|↓)$")
    _REV_RE = re.compile(r"^([A-Za-z0-9_]+)\.reversed$")

    def parse_delta_string(ds: str) -> Dict[str, float]:
        """Minimal fallback parser."""
        if not ds:
            return {}
        result: Dict[str, float] = {}
        for tok in ds.strip().split():
            m = _NUM_RE.match(tok)
            if m:
                n, s, v = m.groups()
                result[n] = result.get(n, 0.0) + (float(v) if s == "+" else -float(v))
                continue
            m = _ARR_RE.match(tok)
            if m:
                n, a = m.groups()
                result[n] = result.get(n, 0.0) + (0.15 if a == "↑" else -0.15)
                continue
            m = _REV_RE.match(tok)
            if m:
                result[m.group(1)] = result.get(m.group(1), 0.0) + 0.2
        return result


# ─────────────────────────────────────────────────────────────────────
# Cascade Definition Loader
# ─────────────────────────────────────────────────────────────────────
_cascade_defs: Optional[Dict[str, Dict[str, Any]]] = None


def _load_cascade_defs() -> Dict[str, Dict[str, Any]]:
    """Load and cache cascade definitions from YAML."""
    global _cascade_defs
    if _cascade_defs is not None:
        return _cascade_defs

    yaml_path = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "ncm_cascades.yaml"
    )
    if not os.path.exists(yaml_path):
        logger.warning("ncm_cascades.yaml not found at %s", yaml_path)
        _cascade_defs = {}
        return _cascade_defs

    try:
        with open(yaml_path, "r", encoding="utf-8") as f:
            raw = yaml.safe_load(f)
    except Exception as e:
        logger.error("Failed to load ncm_cascades.yaml: %s", e)
        _cascade_defs = {}
        return _cascade_defs

    # Pre-parse all stage deltas into vectors
    defs: Dict[str, Dict[str, Any]] = {}
    meta: Dict[str, Dict[str, Any]] = {}
    for cascade_id, cdef in raw.items():
        if not isinstance(cdef, dict):
            continue
        # Meta-system configs start with _meta_
        if cascade_id.startswith("_meta_"):
            meta[cascade_id] = cdef
            continue
        if "stages" not in cdef:
            continue
        for stage in cdef.get("stages", []):
            if "delta" in stage:
                stage["delta_vector"] = parse_delta_string(stage["delta"])
            # 🌿 Pre-parse sativa/indica pole deltas for gradient lerp
            if "delta_sativa" in stage:
                stage["delta_vector_sativa"] = parse_delta_string(stage["delta_sativa"])
            if "delta_indica" in stage:
                stage["delta_vector_indica"] = parse_delta_string(stage["delta_indica"])
        # Also parse synergy deltas
        for syn in cdef.get("synergy", []):
            if "delta" in syn:
                syn["delta_vector"] = parse_delta_string(syn["delta"])
        defs[cascade_id] = cdef

    _cascade_defs = defs
    logger.info("Loaded %d cascade definitions, %d meta configs", len(defs), len(meta))
    return _cascade_defs


def _load_meta_configs() -> Dict[str, Dict[str, Any]]:
    """Load meta-system configs (habituation, GABA polarity, etc.)."""
    yaml_path = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "ncm_cascades.yaml"
    )
    if not os.path.exists(yaml_path):
        return {}
    try:
        with open(yaml_path, "r", encoding="utf-8") as f:
            raw = yaml.safe_load(f)
    except Exception:
        return {}
    return {
        k: v for k, v in raw.items()
        if isinstance(v, dict) and k.startswith("_meta_")
    }


# ─────────────────────────────────────────────────────────────────────
# Cascade State (per-channel, persisted in Redis)
# ─────────────────────────────────────────────────────────────────────
[docs] class CascadeState: """Tracks active cascades, cooldowns, and history for one channel."""
[docs] def __init__(self): """Initialize the instance. """ self.active: Dict[str, Dict[str, Any]] = {} self.cooldowns: Dict[str, int] = {} self.history: List[str] = [] self.turn_count: int = 0 # Rolling tracker for sustained-condition checks self.node_history: Dict[str, List[float]] = {} self.delta_counts: List[int] = [] self.last_emotions: List[Set[str]] = [] # Habituation tracking self.fire_counts: Dict[str, int] = {} # Last turn each cascade completed (for tolerance reversal) self.last_fired_turn: Dict[str, int] = {}
[docs] def to_dict(self) -> dict: """Convert to dict representation. Returns: dict: Result dictionary. """ return { "active": self.active, "cooldowns": self.cooldowns, "history": self.history[-50:], # keep last 50 "turn_count": self.turn_count, "delta_counts": self.delta_counts[-10:], "last_emotions": [list(s) for s in self.last_emotions[-5:]], "fire_counts": self.fire_counts, "last_fired_turn": self.last_fired_turn, }
[docs] @classmethod def from_dict(cls, d: dict) -> "CascadeState": """Construct from dict data. Args: d (dict): The d value. Returns: 'CascadeState': The result. """ s = cls() s.active = d.get("active", {}) s.cooldowns = d.get("cooldowns", {}) s.history = d.get("history", []) s.turn_count = d.get("turn_count", 0) s.delta_counts = d.get("delta_counts", []) s.last_emotions = [set(e) for e in d.get("last_emotions", [])] s.fire_counts = d.get("fire_counts", {}) s.last_fired_turn = d.get("last_fired_turn", {}) return s
# ───────────────────────────────────────────────────────────────────── # Condition Evaluator # ───────────────────────────────────────────────────────────────────── def _eval_node_condition( cond: dict, vector: Dict[str, float], state: CascadeState, ) -> bool: """Evaluate a single node-threshold condition.""" node = cond.get("node", "") op = cond.get("op", ">") val = cond.get("val", 0.0) current = vector.get(node, 0.5) # baseline 0.5 if op == ">": result = current > val elif op == "<": result = current < val elif op == ">=": result = current >= val elif op == "<=": result = current <= val else: result = current > val # Check sustained condition (for_turns) for_turns = cond.get("for_turns") if for_turns and result: # This needs historical checking — simplified: assume true if # the node has been tracked above threshold recently # (The node_history is populated by the engine each turn) hist = state.node_history.get(node, []) if len(hist) < for_turns: return False if not all( (h > val if op in (">", ">=") else h < val) for h in hist[-for_turns:] ): return False return result def _eval_trigger( trigger: dict, vector: Dict[str, float], active_emotions: Set[str], state: CascadeState, active_cascade_ids: Set[str], ) -> bool: """Evaluate whether a cascade's trigger conditions are met.""" if trigger.get("manual_only"): return False # Check cascade_just_completed if trigger.get("cascade_just_completed"): # Check if any cascade completed this turn last_history = state.history[-1] if state.history else "" if ":completed:" not in last_history: return False # Check cascade_requires for req in trigger.get("cascade_requires", []): req_cascade = req.get("cascade", "") min_stage = req.get("min_stage", "") if req_cascade not in state.active: return False if min_stage: defs = _load_cascade_defs() cdef = defs.get(req_cascade, {}) stages = cdef.get("stages", []) stage_names = [s["name"] for s in stages] current_stage_idx = state.active[req_cascade].get("stage", 0) if min_stage in stage_names: required_idx = stage_names.index(min_stage) if current_stage_idx < required_idx: return False # Check delta_count_this_turn delta_count_cond = trigger.get("condition", {}) if isinstance(delta_count_cond, dict): if "delta_count_this_turn" in delta_count_cond: dc_op = delta_count_cond.get("op", ">") dc_val = delta_count_cond.get("val", 4) last_dc = state.delta_counts[-1] if state.delta_counts else 0 if dc_op == ">" and not (last_dc > dc_val): return False elif dc_op == "<" and not (last_dc < dc_val): return False # Check peak_last_turn peak = trigger.get("peak_last_turn") if peak: # Simplified: check if any high-intensity emotions fired last turn last_emos = state.last_emotions[-1] if state.last_emotions else set() peak_emos = set(trigger.get("any_emotion_last_turn", [])) if not (last_emos & peak_emos): return False # Check 'all' conditions (all must pass) all_conds = trigger.get("all", []) for cond in all_conds: if isinstance(cond, dict): if "node" in cond: if not _eval_node_condition(cond, vector, state): return False if "emotion_variance_window" in cond: # Check low emotional variance over N turns window = cond.get("emotion_variance_window", 3) threshold = cond.get("val", 0.1) if len(state.last_emotions) < window: return False recent = state.last_emotions[-window:] # Variance = difference in emotion sets across turns if len(recent) >= 2: all_emos = set() for s in recent: all_emos |= s if len(all_emos) > 3: # more than 3 unique emotions = too varied return False # Check 'any' conditions (at least one must pass) any_conds = trigger.get("any", []) if any_conds: any_passed = False for cond in any_conds: if isinstance(cond, dict): if "node" in cond: if _eval_node_condition(cond, vector, state): any_passed = True break if "emotion_and" in cond or "emotion" in cond: emo = cond.get("emotion") or cond.get("emotion_and", {}).get( "emotion", "" ) if emo in active_emotions: sub_cond = cond.get("condition") or cond.get( "emotion_and", {} ).get("condition") if sub_cond: if _eval_node_condition(sub_cond, vector, state): any_passed = True break else: any_passed = True break if not any_passed: return False # Check 'any_emotion' (at least one must be active) any_emo = trigger.get("any_emotion", []) if any_emo: if not (set(any_emo) & active_emotions): return False return True # ───────────────────────────────────────────────────────────────────── # CASCADE ENGINE # ─────────────────────────────────────────────────────────────────────
[docs] class CascadeEngine: """Manages multi-turn NCM event cascades for a channel."""
[docs] def __init__(self, redis_client=None, variant_cache: "CueVariantCache | None" = None): """Initialize the instance. Args: redis_client: Redis connection client. variant_cache ('CueVariantCache | None'): The variant cache value. """ self._redis = redis_client self._variant_cache = variant_cache self._defs = _load_cascade_defs() self._meta = _load_meta_configs() # Pre-compute meta-system settings self._hab = self._meta.get("_meta_habituation", {}) self._gaba = self._meta.get("_meta_gaba_polarity", {})
def _pick(self, raw) -> str: """Return a variant of *raw*, scheduling generation if not yet cached. Handles both plain strings and YAML list values transparently. On first use the original text is returned immediately while LLM generation runs in the background. """ s: str = random.choice(raw) if isinstance(raw, list) else (raw or "") if not s: return "" if self._variant_cache: asyncio.create_task(self._variant_cache.ensure_cached(s)) return self._variant_cache.get_variant(s) return s def _habituation_multiplier(self, cascade_id: str, state: CascadeState) -> float: """Compute delta magnitude multiplier from habituation.""" if not self._hab.get("enabled"): return 1.0 fires = state.fire_counts.get(cascade_id, 0) onset = self._hab.get("habituation_onset", 5) if fires < onset: return 1.0 decay = self._hab.get("habituation_decay", 0.85) floor = self._hab.get("habituation_floor", 0.4) excess = fires - onset return max(floor, decay ** excess) def _sensitization_adjustment(self, cascade_id: str, state: CascadeState) -> float: """Compute trigger threshold reduction from sensitization.""" if not self._hab.get("enabled"): return 0.0 fires = state.fire_counts.get(cascade_id, 0) rate = self._hab.get("sensitization_rate", 0.02) cap = self._hab.get("sensitization_cap", 0.30) return min(cap, fires * rate) def _habituation_should_skip(self, cascade_id: str, state: CascadeState) -> bool: """Probabilistic tolerance gate — habituated cascades may not fire. After habituation_onset fires, each subsequent fire has a decreasing probability of actually triggering. Uses the same decay curve as delta habituation so the two stay in sync. """ if not self._hab.get("enabled"): return False fires = state.fire_counts.get(cascade_id, 0) onset = self._hab.get("habituation_onset", 5) if fires < onset: return False decay = self._hab.get("habituation_decay", 0.85) floor = self._hab.get("habituation_floor", 0.4) excess = fires - onset fire_prob = max(floor, decay ** excess) if random.random() > fire_prob: logger.debug( "Habituation skip: %s (fires=%d, prob=%.2f)", cascade_id, fires, fire_prob, ) return True return False def _check_gaba_inversion(self, vector: Dict[str, float]) -> bool: """Check if NKCC1/KCC2 ratio triggers GABA polarity inversion.""" if not self._gaba.get("enabled"): return False nkcc1 = vector.get(self._gaba.get("nkcc1_node", "NKCC1_CHLORIDE"), 0.5) kcc2 = vector.get(self._gaba.get("kcc2_node", "KCC2_CHLORIDE"), 0.5) threshold = self._gaba.get("inversion_threshold", 0.15) return (nkcc1 - kcc2) > threshold def _apply_gaba_inversion(self, delta: Dict[str, float], cascade_id: str) -> Dict[str, float]: """Invert GABA-related deltas if polarity is flipped.""" affected = self._gaba.get("affected_cascades", []) if cascade_id not in affected: return delta tokens = set(self._gaba.get("inverted_tokens", [])) inverted = {} for k, v in delta.items(): if k in tokens: inverted[k] = -v # Sign flip else: inverted[k] = v return inverted async def _load_state(self, channel_id: str) -> CascadeState: """Load cascade state from Redis.""" if self._redis: try: key = REDIS_CASCADE_KEY.format(channel_id=channel_id) raw = await self._redis.get(key) if raw: return CascadeState.from_dict(json.loads(raw)) except Exception as e: logger.debug("Cascade state load error: %s", e) return CascadeState() async def _save_state(self, channel_id: str, state: CascadeState): """Persist cascade state to Redis.""" if self._redis: try: key = REDIS_CASCADE_KEY.format(channel_id=channel_id) await self._redis.set( key, json.dumps(state.to_dict()), ex=REDIS_CASCADE_TTL ) except Exception as e: logger.debug("Cascade state save error: %s", e)
[docs] async def tick( self, channel_id: str, vector: Dict[str, float], active_emotions: Set[str], delta_count: int = 0, ) -> Dict[str, float]: """Execute one turn of cascade processing. Called during exhale(). Returns combined delta vector from all active cascade stages this turn. Parameters ---------- channel_id : str The channel being processed. vector : Dict[str, float] Current NCM state vector (post-emotion-deltas). active_emotions : Set[str] Emotions that fired this turn. delta_count : int Number of emotion deltas applied this turn. Returns ------- Dict[str, float] Combined delta vector from cascade processing this turn. """ state = await self._load_state(channel_id) state.turn_count += 1 state.delta_counts.append(delta_count) state.last_emotions.append(active_emotions.copy()) # Trim rolling history state.delta_counts = state.delta_counts[-10:] state.last_emotions = state.last_emotions[-5:] # Update node history for sustained-condition checks for node, val in vector.items(): if node not in state.node_history: state.node_history[node] = [] state.node_history[node].append(val) state.node_history[node] = state.node_history[node][-10:] combined_delta: Dict[str, float] = {} cascades_to_trigger: List[str] = [] completed_this_turn: Set[str] = set() # 💀 grace period tracking # ── Pre-compute GABA polarity for this turn ─────────────── self._gaba_inverted = self._check_gaba_inversion(vector) # ── Phase 1: Check interrupts on active cascades ────────── for cid in list(state.active.keys()): info = state.active[cid] cdef = self._defs.get(cid) if not cdef: del state.active[cid] continue # Skip paused cascades if info.get("paused"): pause_remaining = info.get("pause_remaining", 0) if pause_remaining > 0: info["pause_remaining"] = pause_remaining - 1 continue else: info["paused"] = False interrupted = False for intr in cdef.get("interrupt", []): icond = intr.get("condition", {}) triggered = False # Emotion-based interrupt if "emotion" in icond: if icond["emotion"] in active_emotions: triggered = True # Node-based interrupt elif "node" in icond: triggered = _eval_node_condition(icond, vector, state) # Delta count sustained elif "delta_count_sustained" in icond: dc_op = icond.get("op", "<") dc_val = icond.get("val", 2) dc_for = icond.get("for_turns", 3) recent = state.delta_counts[-dc_for:] if len(recent) >= dc_for: if dc_op == "<" and all(d < dc_val for d in recent): triggered = True # Cascade triggered elif icond.get("cascade_triggered"): # Will be checked after new trigger phase pass if triggered: action = intr.get("action", "abort") if action == "abort": logger.info( "Cascade %s ABORTED: %s", cid, self._pick(intr.get("reason", "")), ) state.history.append( f"{cid}:aborted:turn_{state.turn_count}" ) del state.active[cid] interrupted = True break elif action == "pause": pt = intr.get("pause_turns", 2) info["paused"] = True info["pause_remaining"] = pt logger.info( "Cascade %s PAUSED for %d turns: %s", cid, pt, self._pick(intr.get("reason", "")), ) interrupted = True break elif action == "skip_to_stage": target = intr.get("target", "") stages = cdef.get("stages", []) stage_names = [s["name"] for s in stages] if target in stage_names: info["stage"] = stage_names.index(target) info["turns_in_stage"] = 0 logger.info( "Cascade %s SKIP to %s: %s", cid, target, self._pick(intr.get("reason", "")), ) interrupted = True break elif action == "trigger_cascade": target = intr.get("target", "") cascades_to_trigger.append(target) logger.info( "Cascade %s triggers %s: %s", cid, target, self._pick(intr.get("reason", "")), ) # Also abort or complete the current cascade state.history.append( f"{cid}:completed:turn_{state.turn_count}" ) del state.active[cid] interrupted = True break if interrupted: continue # ── Phase 2: Advance active cascades & collect deltas ───── for cid in list(state.active.keys()): info = state.active[cid] if info.get("paused"): continue cdef = self._defs.get(cid) if not cdef: continue stages = cdef.get("stages", []) stage_idx = info.get("stage", 0) if stage_idx >= len(stages): # Cascade complete logger.info("Cascade %s COMPLETED at turn %d", cid, state.turn_count) state.history.append(f"{cid}:completed:turn_{state.turn_count}") state.cooldowns[cid] = cdef.get("cooldown", 0) # Track fire count for habituation + tolerance reversal state.fire_counts[cid] = state.fire_counts.get(cid, 0) + 1 state.last_fired_turn[cid] = state.turn_count completed_this_turn.add(cid) # 💀 grace period del state.active[cid] continue stage = stages[stage_idx] hold = stage.get("hold_turns", 1) turns_in = info.get("turns_in_stage", 0) # Skip conditional stages unless explicitly jumped to if stage.get("conditional") and turns_in == 0: info["stage"] = stage_idx + 1 info["turns_in_stage"] = 0 continue # Apply this stage's delta # 🌿 Sativa/Indica gradient interpolation # If the stage provides delta_vector_sativa and # delta_vector_indica, lerp between them using the # cascade's strain_gradient (0.0=indica, 1.0=sativa). # Falls back to base delta_vector when poles are absent. sativa_d = stage.get("delta_vector_sativa") indica_d = stage.get("delta_vector_indica") strain_g = info.get( "strain_gradient", cdef.get("strain_gradient"), ) if sativa_d and indica_d and strain_g is not None: g = max(0.0, min(1.0, float(strain_g))) all_nodes = set(indica_d.keys()) | set(sativa_d.keys()) stage_delta = {} for node in all_nodes: iv = indica_d.get(node, 0.0) sv = sativa_d.get(node, 0.0) stage_delta[node] = iv + g * (sv - iv) else: stage_delta = dict(stage.get("delta_vector", {})) # ── Meta: GABA polarity inversion ────────────── if self._gaba_inverted: stage_delta = self._apply_gaba_inversion(stage_delta, cid) # ── Meta: Habituation scaling ────────────────── hab_mult = self._habituation_multiplier(cid, state) if hab_mult < 1.0: stage_delta = {k: v * hab_mult for k, v in stage_delta.items()} for k, v in stage_delta.items(): combined_delta[k] = combined_delta.get(k, 0.0) + v # Log the cue -- select pole-appropriate variant if strain_g is not None and float(strain_g) > 0.65: raw_cue = stage.get("cue_sativa", stage.get("cue", "")) elif strain_g is not None and float(strain_g) < 0.35: raw_cue = stage.get("cue_indica", stage.get("cue", "")) else: raw_cue = stage.get("cue", "") if turns_in == 0 and raw_cue: logger.info( "Cascade %s [%s]: %s", cid, stage["name"], self._pick(raw_cue), ) # Advance info["turns_in_stage"] = turns_in + 1 info["total_turns"] = info.get("total_turns", 0) + 1 # Check if stage is done if info["turns_in_stage"] >= hold: info["stage"] = stage_idx + 1 info["turns_in_stage"] = 0 # Check max duration max_dur = cdef.get("max_duration", 30) if info["total_turns"] >= max_dur: logger.info( "Cascade %s hit max_duration (%d), completing", cid, max_dur ) state.history.append(f"{cid}:completed:turn_{state.turn_count}") state.cooldowns[cid] = cdef.get("cooldown", 0) # Track fire count and last-fired turn for habituation state.fire_counts[cid] = state.fire_counts.get(cid, 0) + 1 state.last_fired_turn[cid] = state.turn_count completed_this_turn.add(cid) # 💀 grace period del state.active[cid] # ── Phase 3: Check synergies ────────────────────────────── active_ids = set(state.active.keys()) for cid in list(active_ids): cdef = self._defs.get(cid, {}) for syn in cdef.get("synergy", []): partner = syn.get("with", "") if partner in active_ids: syn_delta = syn.get("delta_vector", {}) for k, v in syn_delta.items(): combined_delta[k] = combined_delta.get(k, 0.0) + v logger.debug( "Synergy: %s + %s%s", cid, partner, self._pick(syn.get("reason", "")) ) # ── Phase 4: Decay cooldowns + tolerance reversal ────────── for cid in list(state.cooldowns.keys()): state.cooldowns[cid] -= 1 if state.cooldowns[cid] <= 0: del state.cooldowns[cid] # Tolerance reversal: decay fire_counts for cascades that # haven't fired in a while. recovery_turns_per_count turns # of abstinence = 1 fire_count recovered. if self._hab.get("enabled"): recovery_rate = self._hab.get("recovery_turns_per_count", 15) for cid in list(state.fire_counts.keys()): if state.fire_counts[cid] <= 0: del state.fire_counts[cid] state.last_fired_turn.pop(cid, None) continue # Skip if cascade is currently active if cid in state.active: continue last_fired = state.last_fired_turn.get(cid, 0) turns_idle = state.turn_count - last_fired if turns_idle > 0 and recovery_rate > 0: recovered = turns_idle // recovery_rate if recovered > 0: old = state.fire_counts[cid] state.fire_counts[cid] = max(0, old - recovered) # Reset the clock so we don't double-count state.last_fired_turn[cid] = ( last_fired + recovered * recovery_rate ) if state.fire_counts[cid] < old: logger.debug( "Tolerance reversal: %s fire_count %d%d" " (%d turns idle)", cid, old, state.fire_counts[cid], turns_idle, ) # ── Phase 5: Check triggers for new cascades ────────────── if len(state.active) < MAX_CONCURRENT_CASCADES: for cid, cdef in self._defs.items(): if cid in state.active: continue if cid in state.cooldowns: continue # 💀 Grace period: don't re-trigger a cascade that # just completed THIS turn — let metabolic decay and # other systems process first. if cid in completed_this_turn: continue # 🌀 Habituation skip: heavily-fired cascades may # probabilistically fail to trigger (true tolerance). if self._habituation_should_skip(cid, state): continue trigger = cdef.get("trigger", {}) # ── Meta: Sensitization threshold adjustment ── sens = self._sensitization_adjustment(cid, state) adjusted_vector = vector if sens > 0.0: # Effectively lower all trigger thresholds by reducing # the comparison point (make current values appear higher) adjusted_vector = { k: min(3.0, v + sens) for k, v in vector.items() } if _eval_trigger( trigger, adjusted_vector, active_emotions, state, active_ids ): state.active[cid] = { "stage": 0, "turns_in_stage": 0, "started_at_turn": state.turn_count, "total_turns": 0, "paused": False, } logger.info( "Cascade %s TRIGGERED at turn %d", cid, state.turn_count ) state.history.append( f"{cid}:triggered:turn_{state.turn_count}" ) if len(state.active) >= MAX_CONCURRENT_CASCADES: break # Handle manually triggered cascades (from interrupt actions) for cid in cascades_to_trigger: if cid in state.active or cid in state.cooldowns: continue if len(state.active) >= MAX_CONCURRENT_CASCADES: break if cid in self._defs: state.active[cid] = { "stage": 0, "turns_in_stage": 0, "started_at_turn": state.turn_count, "total_turns": 0, "paused": False, } logger.info( "Cascade %s CHAIN-TRIGGERED at turn %d", cid, state.turn_count ) state.history.append(f"{cid}:chain_triggered:turn_{state.turn_count}") # ── Save state ──────────────────────────────────────────── await self._save_state(channel_id, state) # Clamp combined deltas for k in combined_delta: combined_delta[k] = max(-3.0, min(3.0, combined_delta[k])) return combined_delta
[docs] async def get_active_cascades( self, channel_id: str ) -> Dict[str, Dict[str, Any]]: """Return currently active cascades for context injection.""" state = await self._load_state(channel_id) result = {} for cid, info in state.active.items(): cdef = self._defs.get(cid, {}) stages = cdef.get("stages", []) stage_idx = info.get("stage", 0) stage_name = stages[stage_idx]["name"] if stage_idx < len(stages) else "done" raw_cue = stages[stage_idx].get("cue", "") if stage_idx < len(stages) else "" result[cid] = { "stage": stage_name, "turn": info.get("total_turns", 0), "cue": self._pick(raw_cue), "paused": info.get("paused", False), } return result
[docs] async def force_trigger( self, channel_id: str, cascade_id: str, strain_gradient: float | None = None, ) -> bool: """Manually trigger a cascade (e.g. from a tool call). Args: channel_id: Target channel. cascade_id: Cascade to trigger. strain_gradient: Optional sativa/indica gradient (0.0-1.0) for ENDOCANNABINOID_DRIFT bipolar interpolation. """ if cascade_id not in self._defs: return False state = await self._load_state(channel_id) if cascade_id in state.active: return False if len(state.active) >= MAX_CONCURRENT_CASCADES: return False active_info = { "stage": 0, "turns_in_stage": 0, "started_at_turn": state.turn_count, "total_turns": 0, "paused": False, } # 🌿 Attach strain gradient if provided if strain_gradient is not None: active_info["strain_gradient"] = max(0.0, min(1.0, strain_gradient)) state.active[cascade_id] = active_info state.history.append(f"{cascade_id}:force_triggered:turn_{state.turn_count}") await self._save_state(channel_id, state) logger.info("Cascade %s FORCE-TRIGGERED (gradient=%s)", cascade_id, strain_gradient) return True