Source code for ncm_engine

"""NCM Homeostasis Engine.

Applies YAML-defined rules to the neurochemical vector, producing
regulatory deltas and UI cues. Purely computational — no Redis
dependency.

v3 update: full condition evaluator supporting is_high, is_low,
is_mid, is_mid_or_high, not_high, rising, {all: [...]}, {any: [...]}.
"""

from __future__ import annotations

import logging
import os
from typing import Any, Dict, List, Tuple

import yaml

logger = logging.getLogger(__name__)

# ═══════════════════════════════════════════════════════════════════════
# THRESHOLD BANDS
# ═══════════════════════════════════════════════════════════════════════

THRESHOLD_HIGH = 0.8  # top of normal emotional range on 0-3 scale
THRESHOLD_MID = 0.35  # slightly above resting baseline
THRESHOLD_LOW = 0.2  # below resting baseline


[docs] class NCMHomeostasisEngine: """Rule-driven regulator that keeps Star's neurochemical vector in band. Loads a corpus of YAML-defined homeostasis rules from ``ncm_rules`` and, each turn, evaluates them against the current neurochemical vector to produce regulatory gain deltas, UI cues, and behavioral route flags -- the negative-feedback machinery that pulls runaway nodes back toward their resting band. It is deliberately a pure computational component: no Redis, KG, LLM, or network access; the only state it carries between turns is the per-channel previous vector used to evaluate ``rising`` conditions. Instantiated once by ``LimbicCoordinator`` as ``self.engine`` and driven through ``regulate`` inside the exhale pipeline. Attributes: rules_dir (str): Directory the YAML rule files were loaded from. rules (List[Dict[str, Any]]): All loaded rules, sorted by descending priority. """
[docs] def __init__(self, rules_dir: str | None = None): """Build the engine and eagerly load its homeostasis rule corpus. Resolves the rules directory (defaulting to the ``ncm_rules`` folder beside this module), reads every rule file through ``_load_rules`` into ``self.rules``, and initializes the empty per-channel ``rising``-state cache. Touches the filesystem at construction time to load YAML; called by ``LimbicCoordinator`` when it stands up ``self.engine``. Args: rules_dir (str | None): Directory to load rule YAML from. When ``None``, defaults to ``ncm_rules`` next to this module. """ if rules_dir is None: project_root = os.path.dirname(os.path.abspath(__file__)) rules_dir = os.path.join(project_root, "ncm_rules") self.rules_dir = rules_dir self.rules: List[Dict[str, Any]] = self._load_rules() # For tracking "rising" conditions, per-channel previous vectors self._previous_vectors: Dict[str, Dict[str, float]] = {}
def _load_rules(self) -> List[Dict[str, Any]]: """Read and merge every YAML rule file under ``rules_dir``. Scans the rules directory in sorted filename order, parsing each ``.yaml`` or ``.yml`` file with ``yaml.safe_load`` and extending the combined rule list with that file's ``rules`` block; malformed files are logged and skipped rather than aborting the load. The merged list is then sorted by descending ``priority`` so higher-priority rules are evaluated first. Touches the filesystem; called once by ``__init__`` to populate ``self.rules``. Returns an empty list if the directory does not exist. Returns: List[Dict[str, Any]]: All parsed rule dicts, ordered by descending priority. """ rules: List[Dict[str, Any]] = [] if not os.path.exists(self.rules_dir): return rules for filename in sorted(os.listdir(self.rules_dir)): if filename.endswith((".yaml", ".yml")): filepath = os.path.join(self.rules_dir, filename) try: with open(filepath, "r", encoding="utf-8") as f: data = yaml.safe_load(f) if data and "rules" in data: rules.extend(data["rules"]) except Exception as e: logger.warning("Error loading rule file %s: %s", filename, e) # Sort by priority (highest first) rules.sort(key=lambda r: r.get("priority", 0), reverse=True) return rules # ------------------------------------------------------------------ # Main API # ------------------------------------------------------------------
[docs] def evaluate( self, current_vector: Dict[str, float], channel_id: str = "" ) -> Tuple[Dict[str, float], List[str], List[str]]: """Evaluate every rule against the vector and gather their outcomes. Walks the priority-ordered rule corpus and, for each rule whose ``when``/``condition`` matches (and whose optional ``unless`` guard does not), accumulates its ``gain_deltas`` into a combined delta map, collects any UI cues, and harvests behavioral ``route_flags``; branching rules additionally evaluate their nested ``branches`` and apply the first matching branch. Condition matching is delegated to ``_check_condition``. The only mutation of engine state is recording the ``current_vector`` into the per-channel previous-vector cache so the next call can resolve ``rising`` predicates. Called by ``regulate`` (and usable directly when only the deltas, not the mutated vector, are wanted). Args: current_vector (Dict[str, float]): The current neurochemical node values to test rules against. channel_id (str): Channel identifier, used to scope the ``rising``-state cache. Returns: Tuple[Dict[str, float], List[str], List[str]]: A 3-tuple of ``(regulatory_deltas, ui_cues, route_flags)`` -- the summed gain deltas per node, the de-duplicated UI cue list, and the de-duplicated behavioral route-flag list. """ regulatory_deltas: Dict[str, float] = {} ui_cues: List[str] = [] route_flags: List[str] = [] # 💀 behavioral routing signals from rules for rule in self.rules: condition = rule.get("when", rule.get("condition")) # Support "unless" blocks — if any unless condition passes, skip rule unless = rule.get("unless") if unless and self._check_condition(unless, current_vector, channel_id): continue if self._check_condition(condition, current_vector, channel_id): outcome = rule.get("then", rule.get("outcome", {})) # Extract gain_deltas gain_deltas = outcome.get("gain_deltas", outcome.get("gain_delta", {})) if not isinstance(gain_deltas, dict): gain_deltas = {} for key, val in gain_deltas.items(): if val is None: continue regulatory_deltas[key] = regulatory_deltas.get(key, 0) + float(val) # Extract UI cues if "ui_cues" in outcome: for cue in outcome["ui_cues"]: if cue not in ui_cues: ui_cues.append(cue) elif "ui_cue" in outcome: cue = outcome["ui_cue"] if cue not in ui_cues: ui_cues.append(cue) # 🔥 Extract route_flags — behavioral directives from rule outcomes for flag in outcome.get("route_flags", []): if flag not in route_flags: route_flags.append(flag) # Process branches (for branching rules like R10) branches = rule.get("branches", []) for branch in branches: branch_cond = branch.get("when") if self._check_condition(branch_cond, current_vector, channel_id): branch_outcome = branch.get("then", {}) branch_deltas = branch_outcome.get("gain_delta", {}) if not isinstance(branch_deltas, dict): branch_deltas = {} for key, val in branch_deltas.items(): if val is None: continue regulatory_deltas[key] = regulatory_deltas.get( key, 0 ) + float(val) for cue in branch_outcome.get("ui_cues", []): if cue not in ui_cues: ui_cues.append(cue) # 💀 route_flags from branches too for flag in branch_outcome.get("route_flags", []): if flag not in route_flags: route_flags.append(flag) break # Only first matching branch # Update previous vector for "rising" detection self._previous_vectors[channel_id] = current_vector.copy() return regulatory_deltas, ui_cues, route_flags
[docs] def regulate( self, current_vector: Dict[str, float], channel_id: str = "" ) -> Tuple[Dict[str, float], List[str], List[str]]: """Apply the rules and return the regulated vector plus its signals. The primary entry point of the homeostasis engine: it runs ``evaluate`` to collect regulatory deltas, then applies those deltas to a copy of the input vector, clamping each node into the legal ``0.0`` to ``3.0`` supraphysiological range. The original vector is left untouched, but the engine's per-channel previous-vector cache is refreshed for next-turn ``rising`` detection. Called by ``LimbicCoordinator`` during the exhale pipeline to produce the homeostatically-corrected vector together with its UI cues and route flags. Args: current_vector (Dict[str, float]): The neurochemical vector to regulate. channel_id (str): Channel identifier, used to scope the ``rising``-state cache. Returns: Tuple[Dict[str, float], List[str], List[str]]: A 3-tuple of ``(new_vector, ui_cues, route_flags)`` -- the clamped, delta-applied vector and the cue and route-flag lists from ``evaluate``. """ deltas, cues, route_flags = self.evaluate(current_vector, channel_id) new_vector = current_vector.copy() for key, delta in deltas.items(): current_val = new_vector.get(key, 0.0) new_vector[key] = max(0.0, min(3.0, current_val + delta)) # Store for "rising" detection on next call self._previous_vectors[channel_id] = current_vector.copy() return new_vector, cues, route_flags
# ------------------------------------------------------------------ # Condition Evaluator # ------------------------------------------------------------------ def _check_condition( self, condition: Any, vector: Dict[str, float], channel_id: str = "", ) -> bool: """Evaluate a rule condition against the current vector. Supports: - String conditions: "DOPAMINERGIC_CRAVE > 0.7" - Predicate conditions: "SIGMA_RECEPTOR_META is_high" - AND/OR: "X is_high AND Y is_low" - Dict conditions: {all: [...]}, {any: [...]} - None/empty: returns False """ if condition is None: return False # Dict format: {all: [...]} or {any: [...]} if isinstance(condition, dict): return self._check_dict_condition(condition, vector, channel_id) # String format if isinstance(condition, str): return self._check_string_condition(condition, vector, channel_id) return False def _check_dict_condition( self, condition: Dict[str, Any], vector: Dict[str, float], channel_id: str = "", ) -> bool: """Evaluate a structured ``all`` / ``any`` condition block. Resolves the dict form of a rule condition: an ``all`` key requires every listed predicate to hold, an ``any`` key requires at least one, each sub-predicate being delegated to ``_check_single_predicate``. Returns ``False`` for an unrecognized dict shape. Called by ``_check_condition`` when a condition is a mapping rather than a string. Args: condition (Dict[str, Any]): The condition block, expected to carry an ``all`` or ``any`` list of predicate strings. vector (Dict[str, float]): The neurochemical vector to test predicates against. channel_id (str): Channel identifier, forwarded for ``rising`` predicate resolution. Returns: bool: Whether the combined condition holds. """ if "all" in condition: conditions = condition["all"] return all( self._check_single_predicate(c, vector, channel_id) for c in conditions ) if "any" in condition: conditions = condition["any"] return any( self._check_single_predicate(c, vector, channel_id) for c in conditions ) return False def _check_string_condition( self, condition: str, vector: Dict[str, float], channel_id: str = "", ) -> bool: """Evaluate a string condition, splitting on top-level ``AND``. Handles the string form of a rule condition: if it contains a space-padded ``AND`` connector the condition is split into parts that must all hold, otherwise the whole string is treated as a single predicate; either way evaluation is delegated to ``_check_single_predicate``. Called by ``_check_condition`` when a condition is a plain string. Args: condition (str): The condition expression, e.g. ``X is_high AND Y is_low``. vector (Dict[str, float]): The neurochemical vector to test against. channel_id (str): Channel identifier, forwarded for ``rising`` predicate resolution. Returns: bool: Whether the string condition holds. """ if " AND " in condition: parts = condition.split(" AND ") return all( self._check_single_predicate(p.strip(), vector, channel_id) for p in parts ) return self._check_single_predicate(condition, vector, channel_id) def _check_single_predicate( self, predicate: str, vector: Dict[str, float], channel_id: str = "", ) -> bool: """Evaluate a single condition predicate. Supported formats: - "KEY is_high" → value > 0.8 (THRESHOLD_HIGH) - "KEY is_low" → value < 0.2 (THRESHOLD_LOW) - "KEY is_mid" → 0.2 <= value <= 0.8 - "KEY is_mid_or_high" → value >= 0.35 (THRESHOLD_MID) - "KEY not_high" → value <= 0.8 - "KEY is_false" → value < 0.1 (essentially off) - "KEY is_active" → value > 0.1 - "KEY rising" → current > previous + 0.02 - "KEY region_is_high" → value > 0.8 - "KEY > 0.5" → numeric comparison - "KEY < 0.3" → numeric comparison - "(KEY1 is_high or KEY2 is_low)" → parenthesized OR """ predicate = predicate.strip() # Handle parenthesized OR: "(X is_high or Y is_low)" if predicate.startswith("(") and predicate.endswith(")"): inner = predicate[1:-1] or_parts = inner.split(" or ") return any( self._check_single_predicate(p.strip(), vector, channel_id) for p in or_parts ) parts = predicate.split() if len(parts) < 2: return False key = parts[0] op = parts[1] value = vector.get(key, 0.0) # Predicate-style conditions if op == "is_high": return value > THRESHOLD_HIGH if op == "is_low": return value < THRESHOLD_LOW if op == "is_mid": return THRESHOLD_LOW <= value <= THRESHOLD_HIGH if op == "is_mid_or_high": return value >= THRESHOLD_MID if op == "not_high": return value <= THRESHOLD_HIGH if op == "is_false": return value < 0.1 if op == "is_active": return value > 0.1 if op == "rising": prev = self._previous_vectors.get(channel_id, {}).get(key, 0.0) return value > prev + 0.02 # must rise by at least 0.02 # "region_is_high" for regional tags if op == "region_is_high": return value > THRESHOLD_HIGH # Numeric comparisons: "KEY > 0.5" or "KEY < 0.3" if op == ">" and len(parts) >= 3: try: return value > float(parts[2]) except ValueError: return False if op == "<" and len(parts) >= 3: try: return value < float(parts[2]) except ValueError: return False # Fallback: try old format "KEY>VALUE" (no spaces) if ">" in predicate and op != ">": try: k, v = predicate.split(">") return vector.get(k.strip(), 0.0) > float(v.strip()) except (ValueError, IndexError): return False if "<" in predicate and op != "<": try: k, v = predicate.split("<") return vector.get(k.strip(), 0.0) < float(v.strip()) except (ValueError, IndexError): return False logger.debug("Unrecognized predicate: %s", predicate) return False