"""NCM Homeostasis Engine.
Applies YAML-defined rules to the neurochemical vector, producing
regulatory deltas and UI cues. Purely computational — no Redis
dependency.
v3 update: full condition evaluator supporting is_high, is_low,
is_mid, is_mid_or_high, not_high, rising, {all: [...]}, {any: [...]}.
"""
from __future__ import annotations
import logging
import os
from typing import Any, Dict, List, Tuple
import yaml
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════════════════════════════════
# THRESHOLD BANDS
# ═══════════════════════════════════════════════════════════════════════
THRESHOLD_HIGH = 0.8 # top of normal emotional range on 0-3 scale
THRESHOLD_MID = 0.35 # slightly above resting baseline
THRESHOLD_LOW = 0.2 # below resting baseline
[docs]
class NCMHomeostasisEngine:
"""Rule-driven regulator that keeps Star's neurochemical vector in band.
Loads a corpus of YAML-defined homeostasis rules from ``ncm_rules`` and,
each turn, evaluates them against the current neurochemical vector to
produce regulatory gain deltas, UI cues, and behavioral route flags --
the negative-feedback machinery that pulls runaway nodes back toward
their resting band. It is deliberately a pure computational component:
no Redis, KG, LLM, or network access; the only state it carries between
turns is the per-channel previous vector used to evaluate ``rising``
conditions. Instantiated once by ``LimbicCoordinator`` as ``self.engine``
and driven through ``regulate`` inside the exhale pipeline.
Attributes:
rules_dir (str): Directory the YAML rule files were loaded from.
rules (List[Dict[str, Any]]): All loaded rules, sorted by descending
priority.
"""
[docs]
def __init__(self, rules_dir: str | None = None):
"""Build the engine and eagerly load its homeostasis rule corpus.
Resolves the rules directory (defaulting to the ``ncm_rules`` folder
beside this module), reads every rule file through ``_load_rules`` into
``self.rules``, and initializes the empty per-channel ``rising``-state
cache. Touches the filesystem at construction time to load YAML;
called by ``LimbicCoordinator`` when it stands up ``self.engine``.
Args:
rules_dir (str | None): Directory to load rule YAML from. When
``None``, defaults to ``ncm_rules`` next to this module.
"""
if rules_dir is None:
project_root = os.path.dirname(os.path.abspath(__file__))
rules_dir = os.path.join(project_root, "ncm_rules")
self.rules_dir = rules_dir
self.rules: List[Dict[str, Any]] = self._load_rules()
# For tracking "rising" conditions, per-channel previous vectors
self._previous_vectors: Dict[str, Dict[str, float]] = {}
def _load_rules(self) -> List[Dict[str, Any]]:
"""Read and merge every YAML rule file under ``rules_dir``.
Scans the rules directory in sorted filename order, parsing each
``.yaml`` or ``.yml`` file with ``yaml.safe_load`` and extending the
combined rule list with that file's ``rules`` block; malformed files
are logged and skipped rather than aborting the load. The merged list
is then sorted by descending ``priority`` so higher-priority rules are
evaluated first. Touches the filesystem; called once by ``__init__`` to
populate ``self.rules``. Returns an empty list if the directory does
not exist.
Returns:
List[Dict[str, Any]]: All parsed rule dicts, ordered by descending
priority.
"""
rules: List[Dict[str, Any]] = []
if not os.path.exists(self.rules_dir):
return rules
for filename in sorted(os.listdir(self.rules_dir)):
if filename.endswith((".yaml", ".yml")):
filepath = os.path.join(self.rules_dir, filename)
try:
with open(filepath, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if data and "rules" in data:
rules.extend(data["rules"])
except Exception as e:
logger.warning("Error loading rule file %s: %s", filename, e)
# Sort by priority (highest first)
rules.sort(key=lambda r: r.get("priority", 0), reverse=True)
return rules
# ------------------------------------------------------------------
# Main API
# ------------------------------------------------------------------
[docs]
def evaluate(
self, current_vector: Dict[str, float], channel_id: str = ""
) -> Tuple[Dict[str, float], List[str], List[str]]:
"""Evaluate every rule against the vector and gather their outcomes.
Walks the priority-ordered rule corpus and, for each rule whose
``when``/``condition`` matches (and whose optional ``unless`` guard does
not), accumulates its ``gain_deltas`` into a combined delta map,
collects any UI cues, and harvests behavioral ``route_flags``; branching
rules additionally evaluate their nested ``branches`` and apply the
first matching branch. Condition matching is delegated to
``_check_condition``. The only mutation of engine state is recording the
``current_vector`` into the per-channel previous-vector cache so the
next call can resolve ``rising`` predicates. Called by ``regulate`` (and
usable directly when only the deltas, not the mutated vector, are
wanted).
Args:
current_vector (Dict[str, float]): The current neurochemical node
values to test rules against.
channel_id (str): Channel identifier, used to scope the
``rising``-state cache.
Returns:
Tuple[Dict[str, float], List[str], List[str]]: A 3-tuple of
``(regulatory_deltas, ui_cues, route_flags)`` -- the summed gain
deltas per node, the de-duplicated UI cue list, and the
de-duplicated behavioral route-flag list.
"""
regulatory_deltas: Dict[str, float] = {}
ui_cues: List[str] = []
route_flags: List[str] = [] # 💀 behavioral routing signals from rules
for rule in self.rules:
condition = rule.get("when", rule.get("condition"))
# Support "unless" blocks — if any unless condition passes, skip rule
unless = rule.get("unless")
if unless and self._check_condition(unless, current_vector, channel_id):
continue
if self._check_condition(condition, current_vector, channel_id):
outcome = rule.get("then", rule.get("outcome", {}))
# Extract gain_deltas
gain_deltas = outcome.get("gain_deltas", outcome.get("gain_delta", {}))
if not isinstance(gain_deltas, dict):
gain_deltas = {}
for key, val in gain_deltas.items():
if val is None:
continue
regulatory_deltas[key] = regulatory_deltas.get(key, 0) + float(val)
# Extract UI cues
if "ui_cues" in outcome:
for cue in outcome["ui_cues"]:
if cue not in ui_cues:
ui_cues.append(cue)
elif "ui_cue" in outcome:
cue = outcome["ui_cue"]
if cue not in ui_cues:
ui_cues.append(cue)
# 🔥 Extract route_flags — behavioral directives from rule outcomes
for flag in outcome.get("route_flags", []):
if flag not in route_flags:
route_flags.append(flag)
# Process branches (for branching rules like R10)
branches = rule.get("branches", [])
for branch in branches:
branch_cond = branch.get("when")
if self._check_condition(branch_cond, current_vector, channel_id):
branch_outcome = branch.get("then", {})
branch_deltas = branch_outcome.get("gain_delta", {})
if not isinstance(branch_deltas, dict):
branch_deltas = {}
for key, val in branch_deltas.items():
if val is None:
continue
regulatory_deltas[key] = regulatory_deltas.get(
key, 0
) + float(val)
for cue in branch_outcome.get("ui_cues", []):
if cue not in ui_cues:
ui_cues.append(cue)
# 💀 route_flags from branches too
for flag in branch_outcome.get("route_flags", []):
if flag not in route_flags:
route_flags.append(flag)
break # Only first matching branch
# Update previous vector for "rising" detection
self._previous_vectors[channel_id] = current_vector.copy()
return regulatory_deltas, ui_cues, route_flags
[docs]
def regulate(
self, current_vector: Dict[str, float], channel_id: str = ""
) -> Tuple[Dict[str, float], List[str], List[str]]:
"""Apply the rules and return the regulated vector plus its signals.
The primary entry point of the homeostasis engine: it runs
``evaluate`` to collect regulatory deltas, then applies those deltas to
a copy of the input vector, clamping each node into the legal ``0.0``
to ``3.0`` supraphysiological range. The original vector is left
untouched, but the engine's per-channel previous-vector cache is
refreshed for next-turn ``rising`` detection. Called by
``LimbicCoordinator`` during the exhale pipeline to produce the
homeostatically-corrected vector together with its UI cues and route
flags.
Args:
current_vector (Dict[str, float]): The neurochemical vector to
regulate.
channel_id (str): Channel identifier, used to scope the
``rising``-state cache.
Returns:
Tuple[Dict[str, float], List[str], List[str]]: A 3-tuple of
``(new_vector, ui_cues, route_flags)`` -- the clamped, delta-applied
vector and the cue and route-flag lists from ``evaluate``.
"""
deltas, cues, route_flags = self.evaluate(current_vector, channel_id)
new_vector = current_vector.copy()
for key, delta in deltas.items():
current_val = new_vector.get(key, 0.0)
new_vector[key] = max(0.0, min(3.0, current_val + delta))
# Store for "rising" detection on next call
self._previous_vectors[channel_id] = current_vector.copy()
return new_vector, cues, route_flags
# ------------------------------------------------------------------
# Condition Evaluator
# ------------------------------------------------------------------
def _check_condition(
self,
condition: Any,
vector: Dict[str, float],
channel_id: str = "",
) -> bool:
"""Evaluate a rule condition against the current vector.
Supports:
- String conditions: "DOPAMINERGIC_CRAVE > 0.7"
- Predicate conditions: "SIGMA_RECEPTOR_META is_high"
- AND/OR: "X is_high AND Y is_low"
- Dict conditions: {all: [...]}, {any: [...]}
- None/empty: returns False
"""
if condition is None:
return False
# Dict format: {all: [...]} or {any: [...]}
if isinstance(condition, dict):
return self._check_dict_condition(condition, vector, channel_id)
# String format
if isinstance(condition, str):
return self._check_string_condition(condition, vector, channel_id)
return False
def _check_dict_condition(
self,
condition: Dict[str, Any],
vector: Dict[str, float],
channel_id: str = "",
) -> bool:
"""Evaluate a structured ``all`` / ``any`` condition block.
Resolves the dict form of a rule condition: an ``all`` key requires
every listed predicate to hold, an ``any`` key requires at least one,
each sub-predicate being delegated to ``_check_single_predicate``.
Returns ``False`` for an unrecognized dict shape. Called by
``_check_condition`` when a condition is a mapping rather than a string.
Args:
condition (Dict[str, Any]): The condition block, expected to carry
an ``all`` or ``any`` list of predicate strings.
vector (Dict[str, float]): The neurochemical vector to test
predicates against.
channel_id (str): Channel identifier, forwarded for ``rising``
predicate resolution.
Returns:
bool: Whether the combined condition holds.
"""
if "all" in condition:
conditions = condition["all"]
return all(
self._check_single_predicate(c, vector, channel_id) for c in conditions
)
if "any" in condition:
conditions = condition["any"]
return any(
self._check_single_predicate(c, vector, channel_id) for c in conditions
)
return False
def _check_string_condition(
self,
condition: str,
vector: Dict[str, float],
channel_id: str = "",
) -> bool:
"""Evaluate a string condition, splitting on top-level ``AND``.
Handles the string form of a rule condition: if it contains a
space-padded ``AND`` connector the condition is split into parts that
must all hold, otherwise the whole string is treated as a single
predicate;
either way evaluation is delegated to ``_check_single_predicate``.
Called by ``_check_condition`` when a condition is a plain string.
Args:
condition (str): The condition expression, e.g.
``X is_high AND Y is_low``.
vector (Dict[str, float]): The neurochemical vector to test against.
channel_id (str): Channel identifier, forwarded for ``rising``
predicate resolution.
Returns:
bool: Whether the string condition holds.
"""
if " AND " in condition:
parts = condition.split(" AND ")
return all(
self._check_single_predicate(p.strip(), vector, channel_id)
for p in parts
)
return self._check_single_predicate(condition, vector, channel_id)
def _check_single_predicate(
self,
predicate: str,
vector: Dict[str, float],
channel_id: str = "",
) -> bool:
"""Evaluate a single condition predicate.
Supported formats:
- "KEY is_high" → value > 0.8 (THRESHOLD_HIGH)
- "KEY is_low" → value < 0.2 (THRESHOLD_LOW)
- "KEY is_mid" → 0.2 <= value <= 0.8
- "KEY is_mid_or_high" → value >= 0.35 (THRESHOLD_MID)
- "KEY not_high" → value <= 0.8
- "KEY is_false" → value < 0.1 (essentially off)
- "KEY is_active" → value > 0.1
- "KEY rising" → current > previous + 0.02
- "KEY region_is_high" → value > 0.8
- "KEY > 0.5" → numeric comparison
- "KEY < 0.3" → numeric comparison
- "(KEY1 is_high or KEY2 is_low)" → parenthesized OR
"""
predicate = predicate.strip()
# Handle parenthesized OR: "(X is_high or Y is_low)"
if predicate.startswith("(") and predicate.endswith(")"):
inner = predicate[1:-1]
or_parts = inner.split(" or ")
return any(
self._check_single_predicate(p.strip(), vector, channel_id)
for p in or_parts
)
parts = predicate.split()
if len(parts) < 2:
return False
key = parts[0]
op = parts[1]
value = vector.get(key, 0.0)
# Predicate-style conditions
if op == "is_high":
return value > THRESHOLD_HIGH
if op == "is_low":
return value < THRESHOLD_LOW
if op == "is_mid":
return THRESHOLD_LOW <= value <= THRESHOLD_HIGH
if op == "is_mid_or_high":
return value >= THRESHOLD_MID
if op == "not_high":
return value <= THRESHOLD_HIGH
if op == "is_false":
return value < 0.1
if op == "is_active":
return value > 0.1
if op == "rising":
prev = self._previous_vectors.get(channel_id, {}).get(key, 0.0)
return value > prev + 0.02 # must rise by at least 0.02
# "region_is_high" for regional tags
if op == "region_is_high":
return value > THRESHOLD_HIGH
# Numeric comparisons: "KEY > 0.5" or "KEY < 0.3"
if op == ">" and len(parts) >= 3:
try:
return value > float(parts[2])
except ValueError:
return False
if op == "<" and len(parts) >= 3:
try:
return value < float(parts[2])
except ValueError:
return False
# Fallback: try old format "KEY>VALUE" (no spaces)
if ">" in predicate and op != ">":
try:
k, v = predicate.split(">")
return vector.get(k.strip(), 0.0) > float(v.strip())
except (ValueError, IndexError):
return False
if "<" in predicate and op != "<":
try:
k, v = predicate.split("<")
return vector.get(k.strip(), 0.0) < float(v.strip())
except (ValueError, IndexError):
return False
logger.debug("Unrecognized predicate: %s", predicate)
return False