"""NCM Homeostasis Engine.
Applies YAML-defined rules to the neurochemical vector, producing
regulatory deltas and UI cues. Purely computational — no Redis
dependency.
v3 update: full condition evaluator supporting is_high, is_low,
is_mid, is_mid_or_high, not_high, rising, {all: [...]}, {any: [...]}.
"""
from __future__ import annotations
import logging
import os
from typing import Any, Dict, List, Tuple
import yaml
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════════════════════════════════
# THRESHOLD BANDS
# ═══════════════════════════════════════════════════════════════════════
THRESHOLD_HIGH = 0.8 # top of normal emotional range on 0-3 scale
THRESHOLD_MID = 0.35 # slightly above resting baseline
THRESHOLD_LOW = 0.2 # below resting baseline
[docs]
class NCMHomeostasisEngine:
"""NCMHomeostasisEngine.
Attributes:
rules_dir: The rules dir.
"""
[docs]
def __init__(self, rules_dir: str | None = None):
"""Initialize the instance.
Args:
rules_dir (str | None): The rules dir value.
"""
if rules_dir is None:
project_root = os.path.dirname(os.path.abspath(__file__))
rules_dir = os.path.join(project_root, "ncm_rules")
self.rules_dir = rules_dir
self.rules: List[Dict[str, Any]] = self._load_rules()
# For tracking "rising" conditions, we store previous vector
self._previous_vector: Dict[str, float] = {}
def _load_rules(self) -> List[Dict[str, Any]]:
"""Internal helper: load rules.
Returns:
List[Dict[str, Any]]: The result.
"""
rules: List[Dict[str, Any]] = []
if not os.path.exists(self.rules_dir):
return rules
for filename in sorted(os.listdir(self.rules_dir)):
if filename.endswith((".yaml", ".yml")):
filepath = os.path.join(self.rules_dir, filename)
try:
with open(filepath, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if data and "rules" in data:
rules.extend(data["rules"])
except Exception as e:
logger.warning("Error loading rule file %s: %s", filename, e)
# Sort by priority (highest first)
rules.sort(key=lambda r: r.get("priority", 0), reverse=True)
return rules
# ------------------------------------------------------------------
# Main API
# ------------------------------------------------------------------
[docs]
def evaluate(
self, current_vector: Dict[str, float]
) -> Tuple[Dict[str, float], List[str]]:
"""Apply homeostasis rules to *current_vector*.
Returns (regulatory_deltas, ui_cues).
"""
regulatory_deltas: Dict[str, float] = {}
ui_cues: List[str] = []
for rule in self.rules:
condition = rule.get("when", rule.get("condition"))
# Support "unless" blocks — if any unless condition passes, skip rule
unless = rule.get("unless")
if unless and self._check_condition(unless, current_vector):
continue
if self._check_condition(condition, current_vector):
outcome = rule.get("then", rule.get("outcome", {}))
# Extract gain_deltas
gain_deltas = outcome.get("gain_deltas", outcome.get("gain_delta", {}))
if not isinstance(gain_deltas, dict):
gain_deltas = {}
for key, val in gain_deltas.items():
if val is None:
continue
regulatory_deltas[key] = (
regulatory_deltas.get(key, 0) + float(val)
)
# Extract UI cues
if "ui_cues" in outcome:
for cue in outcome["ui_cues"]:
if cue not in ui_cues:
ui_cues.append(cue)
elif "ui_cue" in outcome:
cue = outcome["ui_cue"]
if cue not in ui_cues:
ui_cues.append(cue)
# Process branches (for branching rules like R10)
branches = rule.get("branches", [])
for branch in branches:
branch_cond = branch.get("when")
if self._check_condition(branch_cond, current_vector):
branch_outcome = branch.get("then", {})
branch_deltas = branch_outcome.get("gain_delta", {})
if not isinstance(branch_deltas, dict):
branch_deltas = {}
for key, val in branch_deltas.items():
if val is None:
continue
regulatory_deltas[key] = (
regulatory_deltas.get(key, 0) + float(val)
)
for cue in branch_outcome.get("ui_cues", []):
if cue not in ui_cues:
ui_cues.append(cue)
break # Only first matching branch
# Update previous vector for "rising" detection
self._previous_vector = current_vector.copy()
return regulatory_deltas, ui_cues
[docs]
def regulate(
self, current_vector: Dict[str, float]
) -> Tuple[Dict[str, float], List[str]]:
"""Evaluate rules and return the mutated vector plus UI cues."""
deltas, cues = self.evaluate(current_vector)
new_vector = current_vector.copy()
for key, delta in deltas.items():
current_val = new_vector.get(key, 0.0)
new_vector[key] = max(0.0, min(3.0, current_val + delta))
# Store for "rising" detection on next call
self._previous_vector = current_vector.copy()
return new_vector, cues
# ------------------------------------------------------------------
# Condition Evaluator
# ------------------------------------------------------------------
def _check_condition(
self,
condition: Any,
vector: Dict[str, float],
) -> bool:
"""Evaluate a rule condition against the current vector.
Supports:
- String conditions: "DOPAMINERGIC_CRAVE > 0.7"
- Predicate conditions: "SIGMA_RECEPTOR_META is_high"
- AND/OR: "X is_high AND Y is_low"
- Dict conditions: {all: [...]}, {any: [...]}
- None/empty: returns False
"""
if condition is None:
return False
# Dict format: {all: [...]} or {any: [...]}
if isinstance(condition, dict):
return self._check_dict_condition(condition, vector)
# String format
if isinstance(condition, str):
return self._check_string_condition(condition, vector)
return False
def _check_dict_condition(
self,
condition: Dict[str, Any],
vector: Dict[str, float],
) -> bool:
"""Handle {all: [cond1, cond2]} and {any: [cond1, cond2]} format."""
if "all" in condition:
conditions = condition["all"]
return all(
self._check_single_predicate(c, vector) for c in conditions
)
if "any" in condition:
conditions = condition["any"]
return any(
self._check_single_predicate(c, vector) for c in conditions
)
return False
def _check_string_condition(
self,
condition: str,
vector: Dict[str, float],
) -> bool:
"""Handle string conditions, potentially with AND connectors."""
if " AND " in condition:
parts = condition.split(" AND ")
return all(
self._check_single_predicate(p.strip(), vector) for p in parts
)
return self._check_single_predicate(condition, vector)
def _check_single_predicate(
self,
predicate: str,
vector: Dict[str, float],
) -> bool:
"""Evaluate a single condition predicate.
Supported formats:
- "KEY is_high" → value > 0.7
- "KEY is_low" → value < 0.3
- "KEY is_mid" → 0.3 <= value <= 0.7
- "KEY is_mid_or_high" → value >= 0.35
- "KEY not_high" → value <= 0.7
- "KEY rising" → current > previous
- "KEY is_false" → value < 0.1 (essentially off)
- "KEY > 0.5" → numeric comparison
- "KEY < 0.3" → numeric comparison
- "(KEY1 is_high or KEY2 is_high)" → parenthesized OR
"""
predicate = predicate.strip()
# Handle parenthesized OR: "(X is_high or Y is_low)"
if predicate.startswith("(") and predicate.endswith(")"):
inner = predicate[1:-1]
or_parts = inner.split(" or ")
return any(
self._check_single_predicate(p.strip(), vector)
for p in or_parts
)
parts = predicate.split()
if len(parts) < 2:
return False
key = parts[0]
op = parts[1]
value = vector.get(key, 0.0)
# Predicate-style conditions
if op == "is_high":
return value > THRESHOLD_HIGH
if op == "is_low":
return value < THRESHOLD_LOW
if op == "is_mid":
return THRESHOLD_LOW <= value <= THRESHOLD_HIGH
if op == "is_mid_or_high":
return value >= THRESHOLD_MID
if op == "not_high":
return value <= THRESHOLD_HIGH
if op == "is_false":
return value < 0.1
if op == "is_active":
return value > 0.1
if op == "rising":
prev = self._previous_vector.get(key, 0.0)
return value > prev + 0.02 # must rise by at least 0.02
# "region_is_high" for regional tags
if op == "region_is_high":
return value > THRESHOLD_HIGH
# Numeric comparisons: "KEY > 0.5" or "KEY < 0.3"
if op == ">" and len(parts) >= 3:
try:
return value > float(parts[2])
except ValueError:
return False
if op == "<" and len(parts) >= 3:
try:
return value < float(parts[2])
except ValueError:
return False
# Fallback: try old format "KEY>VALUE" (no spaces)
if ">" in predicate and op != ">":
try:
k, v = predicate.split(">")
return vector.get(k.strip(), 0.0) > float(v.strip())
except (ValueError, IndexError):
return False
if "<" in predicate and op != "<":
try:
k, v = predicate.split("<")
return vector.get(k.strip(), 0.0) < float(v.strip())
except (ValueError, IndexError):
return False
logger.debug("Unrecognized predicate: %s", predicate)
return False