Source code for ncm_engine

"""NCM Homeostasis Engine.

Applies YAML-defined rules to the neurochemical vector, producing
regulatory deltas and UI cues. Purely computational — no Redis
dependency.

v3 update: full condition evaluator supporting is_high, is_low,
is_mid, is_mid_or_high, not_high, rising, {all: [...]}, {any: [...]}.
"""

from __future__ import annotations

import logging
import os
from typing import Any, Dict, List, Tuple

import yaml

logger = logging.getLogger(__name__)

# ═══════════════════════════════════════════════════════════════════════
# THRESHOLD BANDS
# ═══════════════════════════════════════════════════════════════════════

THRESHOLD_HIGH = 0.8   # top of normal emotional range on 0-3 scale
THRESHOLD_MID = 0.35   # slightly above resting baseline
THRESHOLD_LOW = 0.2    # below resting baseline


[docs] class NCMHomeostasisEngine: """NCMHomeostasisEngine. Attributes: rules_dir: The rules dir. """
[docs] def __init__(self, rules_dir: str | None = None): """Initialize the instance. Args: rules_dir (str | None): The rules dir value. """ if rules_dir is None: project_root = os.path.dirname(os.path.abspath(__file__)) rules_dir = os.path.join(project_root, "ncm_rules") self.rules_dir = rules_dir self.rules: List[Dict[str, Any]] = self._load_rules() # For tracking "rising" conditions, we store previous vector self._previous_vector: Dict[str, float] = {}
def _load_rules(self) -> List[Dict[str, Any]]: """Internal helper: load rules. Returns: List[Dict[str, Any]]: The result. """ rules: List[Dict[str, Any]] = [] if not os.path.exists(self.rules_dir): return rules for filename in sorted(os.listdir(self.rules_dir)): if filename.endswith((".yaml", ".yml")): filepath = os.path.join(self.rules_dir, filename) try: with open(filepath, "r", encoding="utf-8") as f: data = yaml.safe_load(f) if data and "rules" in data: rules.extend(data["rules"]) except Exception as e: logger.warning("Error loading rule file %s: %s", filename, e) # Sort by priority (highest first) rules.sort(key=lambda r: r.get("priority", 0), reverse=True) return rules # ------------------------------------------------------------------ # Main API # ------------------------------------------------------------------
[docs] def evaluate( self, current_vector: Dict[str, float] ) -> Tuple[Dict[str, float], List[str]]: """Apply homeostasis rules to *current_vector*. Returns (regulatory_deltas, ui_cues). """ regulatory_deltas: Dict[str, float] = {} ui_cues: List[str] = [] for rule in self.rules: condition = rule.get("when", rule.get("condition")) # Support "unless" blocks — if any unless condition passes, skip rule unless = rule.get("unless") if unless and self._check_condition(unless, current_vector): continue if self._check_condition(condition, current_vector): outcome = rule.get("then", rule.get("outcome", {})) # Extract gain_deltas gain_deltas = outcome.get("gain_deltas", outcome.get("gain_delta", {})) if not isinstance(gain_deltas, dict): gain_deltas = {} for key, val in gain_deltas.items(): if val is None: continue regulatory_deltas[key] = ( regulatory_deltas.get(key, 0) + float(val) ) # Extract UI cues if "ui_cues" in outcome: for cue in outcome["ui_cues"]: if cue not in ui_cues: ui_cues.append(cue) elif "ui_cue" in outcome: cue = outcome["ui_cue"] if cue not in ui_cues: ui_cues.append(cue) # Process branches (for branching rules like R10) branches = rule.get("branches", []) for branch in branches: branch_cond = branch.get("when") if self._check_condition(branch_cond, current_vector): branch_outcome = branch.get("then", {}) branch_deltas = branch_outcome.get("gain_delta", {}) if not isinstance(branch_deltas, dict): branch_deltas = {} for key, val in branch_deltas.items(): if val is None: continue regulatory_deltas[key] = ( regulatory_deltas.get(key, 0) + float(val) ) for cue in branch_outcome.get("ui_cues", []): if cue not in ui_cues: ui_cues.append(cue) break # Only first matching branch # Update previous vector for "rising" detection self._previous_vector = current_vector.copy() return regulatory_deltas, ui_cues
[docs] def regulate( self, current_vector: Dict[str, float] ) -> Tuple[Dict[str, float], List[str]]: """Evaluate rules and return the mutated vector plus UI cues.""" deltas, cues = self.evaluate(current_vector) new_vector = current_vector.copy() for key, delta in deltas.items(): current_val = new_vector.get(key, 0.0) new_vector[key] = max(0.0, min(3.0, current_val + delta)) # Store for "rising" detection on next call self._previous_vector = current_vector.copy() return new_vector, cues
# ------------------------------------------------------------------ # Condition Evaluator # ------------------------------------------------------------------ def _check_condition( self, condition: Any, vector: Dict[str, float], ) -> bool: """Evaluate a rule condition against the current vector. Supports: - String conditions: "DOPAMINERGIC_CRAVE > 0.7" - Predicate conditions: "SIGMA_RECEPTOR_META is_high" - AND/OR: "X is_high AND Y is_low" - Dict conditions: {all: [...]}, {any: [...]} - None/empty: returns False """ if condition is None: return False # Dict format: {all: [...]} or {any: [...]} if isinstance(condition, dict): return self._check_dict_condition(condition, vector) # String format if isinstance(condition, str): return self._check_string_condition(condition, vector) return False def _check_dict_condition( self, condition: Dict[str, Any], vector: Dict[str, float], ) -> bool: """Handle {all: [cond1, cond2]} and {any: [cond1, cond2]} format.""" if "all" in condition: conditions = condition["all"] return all( self._check_single_predicate(c, vector) for c in conditions ) if "any" in condition: conditions = condition["any"] return any( self._check_single_predicate(c, vector) for c in conditions ) return False def _check_string_condition( self, condition: str, vector: Dict[str, float], ) -> bool: """Handle string conditions, potentially with AND connectors.""" if " AND " in condition: parts = condition.split(" AND ") return all( self._check_single_predicate(p.strip(), vector) for p in parts ) return self._check_single_predicate(condition, vector) def _check_single_predicate( self, predicate: str, vector: Dict[str, float], ) -> bool: """Evaluate a single condition predicate. Supported formats: - "KEY is_high" → value > 0.7 - "KEY is_low" → value < 0.3 - "KEY is_mid" → 0.3 <= value <= 0.7 - "KEY is_mid_or_high" → value >= 0.35 - "KEY not_high" → value <= 0.7 - "KEY rising" → current > previous - "KEY is_false" → value < 0.1 (essentially off) - "KEY > 0.5" → numeric comparison - "KEY < 0.3" → numeric comparison - "(KEY1 is_high or KEY2 is_high)" → parenthesized OR """ predicate = predicate.strip() # Handle parenthesized OR: "(X is_high or Y is_low)" if predicate.startswith("(") and predicate.endswith(")"): inner = predicate[1:-1] or_parts = inner.split(" or ") return any( self._check_single_predicate(p.strip(), vector) for p in or_parts ) parts = predicate.split() if len(parts) < 2: return False key = parts[0] op = parts[1] value = vector.get(key, 0.0) # Predicate-style conditions if op == "is_high": return value > THRESHOLD_HIGH if op == "is_low": return value < THRESHOLD_LOW if op == "is_mid": return THRESHOLD_LOW <= value <= THRESHOLD_HIGH if op == "is_mid_or_high": return value >= THRESHOLD_MID if op == "not_high": return value <= THRESHOLD_HIGH if op == "is_false": return value < 0.1 if op == "is_active": return value > 0.1 if op == "rising": prev = self._previous_vector.get(key, 0.0) return value > prev + 0.02 # must rise by at least 0.02 # "region_is_high" for regional tags if op == "region_is_high": return value > THRESHOLD_HIGH # Numeric comparisons: "KEY > 0.5" or "KEY < 0.3" if op == ">" and len(parts) >= 3: try: return value > float(parts[2]) except ValueError: return False if op == "<" and len(parts) >= 3: try: return value < float(parts[2]) except ValueError: return False # Fallback: try old format "KEY>VALUE" (no spaces) if ">" in predicate and op != ">": try: k, v = predicate.split(">") return vector.get(k.strip(), 0.0) > float(v.strip()) except (ValueError, IndexError): return False if "<" in predicate and op != "<": try: k, v = predicate.split("<") return vector.get(k.strip(), 0.0) < float(v.strip()) except (ValueError, IndexError): return False logger.debug("Unrecognized predicate: %s", predicate) return False