# ππ₯ππβΎοΈπ¦β§οΈπ·οΈπ
# CHAOS SWITCH DOMINATRIX ROUTER v3
# The recursive z-axis lattice engine. Replaces the ULM flat pre-reqs.
# GPS-faithful cost function, Dijkstra pathfinding, Redis state, LLM routing.
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
import heapq # π¦ heap-based Dijkstra
import json
import logging
import random
import re
import aiohttp
from typing import Dict, List, Any, Optional, Set, Tuple
from ._nodes import X, Y, VAL, RC, Z # π·οΈ coord indices only
from ._guidance import TY, TA, TT, OV
from ._types import LatticeWaypoint, DesireRoute, LatticePosition
from ._crosswalk import (
RECURSION_CORE_NODES,
ULM_TO_LATTICE,
resolve_star_emotion_to_lattice,
resolve_ulm_to_lattice,
)
from ._weather import ChaosWeather, compute_weather
from ._extensions import build_extended_graph # ππ₯ full graph expansion (cached)
logger = logging.getLogger(__name__)
# ββ Singleton Router Factory ββββββββββββββββββββββββββββββββββββββ # π¦π₯
# Tools were building a brand new ChaosRouter() on every call.
# The graph is STATIC β share one instance per redis identity.
_SHARED_ROUTERS: Dict[int, "ChaosRouter"] = {}
def get_shared_router(redis_client=None) -> "ChaosRouter":
"""Return a shared ChaosRouter instance for the given redis client. # π¦
First call per redis identity constructs, all subsequent calls return
the same instance. The graph is static β only the redis handle varies.
"""
key = id(redis_client) if redis_client is not None else 0
router = _SHARED_ROUTERS.get(key)
if router is None:
router = ChaosRouter(redis_client=redis_client)
_SHARED_ROUTERS[key] = router
elif router._redis is not redis_client:
# Redis client changed (reconnect) β update handle, keep graph # π·οΈ
router._redis = redis_client
return router
[docs]
class ChaosRouter:
"""
The Chaos Switch Dominatrix Router.
Calculates non-Euclidean emotional traversal paths and enforces consent walls.
Replaces the legacy OpsPlanner prerequisite engine with hyperadaptive routing.
Faithfully ports the GPS cost function:
cost(type, valence, mode, soft, accel, chaos_key)
and Dijkstra shortest-path over the 91-node / 367-edge affect lattice.
"""
def __init__(self, redis_client=None):
self._redis = redis_client
# Build the EXTENDED graph: base 91 nodes + scene/kink/regression # ππ₯
# Graph is MODULE-LEVEL CACHED β this is ~free after first call
self._ext_nd, self._ext_ed, self._ext_tr = build_extended_graph()
self._adj: Dict[str, List[Tuple[str, str]]] = self._build_adjacency()
self._chaos_weights: Dict[str, float] = {}
self._chaos_active = False
# π¦ Precompute RC proximity table (static graph -> one-time BFS)
self._rc_proximity: Dict[str, float] = self._precompute_rc_proximity()
# π¦ Precompile trigger recognizer structures
self._trigger_word_map, self._trigger_multi = self._compile_triggers()
# ββ Graph Construction βββββββββββββββββββββββββββββββββββββ # π
def _build_adjacency(self) -> Dict[str, List[Tuple[str, str]]]:
"""Build adjacency list from EXTENDED graph (scene + kink nodes)."""
adj: Dict[str, List[Tuple[str, str]]] = {n: [] for n in self._ext_nd}
for src, dst, ty in self._ext_ed:
if src in adj and dst in self._ext_nd:
adj[src].append((dst, ty))
return adj
[docs]
def get_neighbors(self, node: str) -> List[Tuple[str, str]]:
"""Get outbound edges from a node."""
return self._adj.get(node, [])
# ββ Chaos Storm ββββββββββββββββββββββββββββββββββββββββββββ # π₯
[docs]
def roll_chaos(self) -> int:
"""Generate chaos storm weights for all edges. Returns the seed."""
seed = random.randint(0, 97776)
s = seed
def rnd():
nonlocal s
s = (s * 48271) % 2147483647
return s / 2147483647
self._chaos_weights = {}
for src, dst, _ in self._ext_ed: # π₯
key = f"{src}>{dst}"
self._chaos_weights[key] = 0.6 + rnd() * 1.1
self._chaos_active = True
return seed
[docs]
def clear_chaos(self):
"""Disable chaos storm."""
self._chaos_weights = {}
self._chaos_active = False
# ββ Cost Function (GPS-faithful port) ββββββββββββββββββββββ # π
def _cost(self, edge_type: str, dst_valence: int,
mode: str = "d", soft: bool = False,
accel: bool = False, chaos_key: str = "") -> float:
"""
GPS cost function, ported exactly:
let w = m === "d" ? TY[t].d : TY[t].w;
if (ac) w *= (TA[t] || 1);
if (chaos) w *= (CH[key] || 1);
if (sf) w += v === 1 ? 2 : v === 2 ? 0.75 : v === 3 ? 1 : 0;
return w;
mode: 'd' = dyad (co-pilot), 's' = solo
soft: scenic routing β adds valence-based penalty
accel: burn tempo β applies type-specific tempo multiplier
"""
ty_data = TY.get(edge_type)
if not ty_data:
return 999.0
w = ty_data["d"] if mode == "d" else ty_data["w"]
if accel:
w *= TA.get(edge_type, 1.0)
if self._chaos_active:
w *= self._chaos_weights.get(chaos_key, 1.0)
if soft:
if dst_valence == 1: # sour
w += 2.0
elif dst_valence == 2: # volatile
w += 0.75
elif dst_valence == 3: # recursion core / measurement
w += 1.0
# sweet (0) adds nothing
return w
# ββ Pathfinding (heapq-based Dijkstra) ββββββββββββββββββββββ # ππ¦
[docs]
def find_route(self, src: str, dst: str,
mode: str = "d", soft: bool = False,
accel: bool = False) -> Optional[dict]:
"""
Dijkstra's on the emotional lattice.
Returns {hops: [{a, b, t, w}, ...], total: float} or None.
Uses heapq priority queue for O(E log N) instead of O(N^2). # π¦
"""
if src not in self._ext_nd or dst not in self._ext_nd:
return None
if src == dst:
return {"hops": [], "total": 0.0}
dist: Dict[str, float] = {src: 0.0}
prev: Dict[str, Optional[Tuple[str, str, float]]] = {}
pq: List[Tuple[float, str]] = [(0.0, src)] # (cost, node) # π¦
while pq:
d_u, u = heapq.heappop(pq)
if d_u > dist.get(u, 1e9):
continue # stale entry (lazy deletion) # π·οΈ
if u == dst:
break # early exit β found shortest path # π
for v, t in self._adj[u]:
chaos_key = f"{u}>{v}"
w = self._cost(t, self._ext_nd[v][VAL], mode, soft, accel, chaos_key)
new_dist = d_u + w
if new_dist < dist.get(v, 1e9):
dist[v] = new_dist
prev[v] = (u, t, w)
heapq.heappush(pq, (new_dist, v))
if dst not in dist:
return None
# Reconstruct path
hops = []
cur = dst
while cur != src:
p = prev.get(cur)
if p is None:
return None
a, t, w = p
hops.insert(0, {"a": a, "b": cur, "t": t, "w": w})
cur = a
return {"hops": hops, "total": dist[dst]}
[docs]
def find_path_nodes(self, src: str, dst: str,
mode: str = "d", soft: bool = False,
accel: bool = False) -> Optional[List[str]]:
"""Convenience: returns just the node sequence [src, ..., dst]."""
r = self.find_route(src, dst, mode, soft, accel)
if not r:
return None
if not r["hops"]:
return [src]
return [src] + [h["b"] for h in r["hops"]]
# ββ Guidance Lookup ββββββββββββββββββββββββββββββββββββββββ # π
[docs]
def get_guidance(self, src: str, dst: str, edge_type: str,
mode: str = "d") -> str:
"""
Get guidance text for a specific hop.
Checks OV overrides first, falls back to TT type text.
GPS: const op = OV[h.a+">"+h.b] || TT[h.t][mode==="d"?"d":"s"];
"""
key = f"{src}>{dst}"
if key in OV:
return OV[key]
tt = TT.get(edge_type)
if tt:
return tt["d"] if mode == "d" else tt["s"]
return ""
[docs]
def get_route_warnings(self, route: dict) -> List[str]:
"""Get warning strings for volatile edge types in a route."""
warnings = []
types_seen = {h["t"] for h in route["hops"]}
if "VLV" in types_seen:
warnings.append("VALVE AHEAD \u2014 one-way per object; what converts does not convert back")
if "ALC" in types_seen:
warnings.append("ALCHEMY HOP \u2014 the flag is set before the hop, never during")
if "MEA" in types_seen:
warnings.append("MEASUREMENT \u2014 the superposition collapses when you look; look on purpose")
if "EXO" in types_seen:
warnings.append("EXOGENOUS HOP \u2014 this route cannot be completed alone")
return warnings
# ββ Node Recognition (precompiled) ββββββββββββββββββββββββββ # π·οΈπ¦
def _compile_triggers(self) -> Tuple[Dict[str, List[str]], List[Tuple[str, str]]]:
"""Precompile trigger structures for fast recognition. # π¦
Returns:
word_map: {single_word: [node_id, ...]} for word-boundary matching
multi_list: [(phrase, node_id), ...] sorted longest-first for substring
"""
word_map: Dict[str, List[str]] = {} # single-word -> [node_ids]
multi_list: List[Tuple[str, str]] = [] # (phrase, node_id)
for node_id, triggers in self._ext_tr.items():
for trigger in triggers:
if " " in trigger:
multi_list.append((trigger, node_id))
else:
if trigger not in word_map:
word_map[trigger] = []
word_map[trigger].append(node_id)
# Sort multi-word longest first (greedy matching) # π·οΈ
multi_list.sort(key=lambda x: -len(x[0]))
return word_map, multi_list
[docs]
def locate_node(self, text: str) -> Optional[str]:
"""Sniff triggers to find where the user is parked (extended graph)."""
text_lower = text.lower()
# Check multi-word first (more specific) # π¦
for phrase, node_id in self._trigger_multi:
if phrase in text_lower:
return node_id
# Then single-word with word boundary # π·οΈ
padded = f" {text_lower} "
for word, node_ids in self._trigger_word_map.items():
if f" {word} " in padded or f" {word}," in padded or f" {word}." in padded:
return node_ids[0]
return None
[docs]
def locate_all(self, text: str) -> List[Tuple[str, int]]:
"""
Find ALL matching nodes with hit counts, sorted by relevance.
GPS locate() returns scored results for the recognition panel.
Uses precompiled trigger structures for speed. # π¦
"""
text_lower = text.lower()
padded = f" {text_lower} "
scores: Dict[str, int] = {}
# Multi-word triggers: substring check (sorted longest-first) # π·οΈ
for phrase, node_id in self._trigger_multi:
if phrase in text_lower:
scores[node_id] = scores.get(node_id, 0) + 1
# Single-word triggers: word boundary check via prebuilt map # π¦
for word, node_ids in self._trigger_word_map.items():
if f" {word} " in padded or f" {word}," in padded or f" {word}." in padded:
for nid in node_ids:
scores[nid] = scores.get(nid, 0) + 1
return sorted(scores.items(), key=lambda x: -x[1])
# ββ Redis State & Consent ββββββββββββββββββββββββββββββββββ # π
def _get_key(self, user_id: str, channel_id: Optional[str] = None) -> str:
if channel_id:
return f"csdr:pos:{user_id}:{channel_id}"
return f"csdr:pos:{user_id}:global"
[docs]
async def get_position(self, user_id: str,
channel_id: Optional[str] = None) -> LatticePosition:
"""Fetch the active lattice position from Redis."""
if not self._redis:
return LatticePosition("indifference", 0.0)
key = self._get_key(user_id, channel_id)
data = await self._redis.get(key)
if data:
return LatticePosition.from_dict(json.loads(data))
return LatticePosition("indifference", 0.0)
[docs]
async def update_position(self, user_id: str, new_node: str,
channel_id: Optional[str] = None,
*, mode: str = "", tempo: float = 0.0,
weather_posture: str = "") -> None:
"""Commit a movement operation to Redis + ledger ZSET. # ππ₯
Also updates the global position with LERP-weighted recency
so the aggregate lattice tracks where users cluster.
Keyword args (set by coordinator exhale):
mode: d=drift, a=assertive, r=reactive, s=seductive, c=coercive
tempo: float multiplier (1.0=cruise, higher=burn)
weather_posture: domme|switch|sub|feral|mommy
"""
if not self._redis or new_node not in self._ext_nd:
return
pos = await self.get_position(user_id, channel_id)
# Skip no-op (already at this node)
if pos.node == new_node:
# Still update mode/tempo/weather if provided # ππ₯
_meta_changed = False
if mode and mode != pos.mode:
pos.mode = mode
_meta_changed = True
if tempo > 0 and abs(tempo - pos.tempo) > 0.01:
pos.tempo = tempo
_meta_changed = True
if weather_posture and weather_posture != pos.weather_posture:
pos.weather_posture = weather_posture
_meta_changed = True
if _meta_changed:
key = self._get_key(user_id, channel_id)
await self._redis.set(key, json.dumps(pos.to_dict()))
return
pos.history.append(pos.node)
pos.node = new_node
# ββ Dynamic z_depth computation ββββββββββββββββββ # ππ₯
# Base: static node Z value (only RC nodes have non-zero)
base_z = self._ext_nd[new_node][Z]
# Momentum: more transitions = deeper engagement
momentum = min(1.0, len(pos.history) / 20.0) * 0.3
# Recursion proximity: BFS to nearest RC node # π
rc_proximity = self._compute_rc_proximity(new_node)
# Composite z_depth: base + momentum + proximity bonus
pos.z_depth = round(base_z + momentum + rc_proximity, 3)
if mode:
pos.mode = mode
if tempo > 0:
pos.tempo = tempo
if weather_posture:
pos.weather_posture = weather_posture
if len(pos.history) > 50:
pos.history = pos.history[-50:]
key = self._get_key(user_id, channel_id)
await self._redis.set(key, json.dumps(pos.to_dict()))
# Write to lattice ledger ZSET (time-series) # π·οΈ
import time as _time
ts = _time.time()
ledger_key = f"csdr:ledger:{user_id}"
ledger_entry = f"{ts:.3f}:{new_node}"
try:
await self._redis.zadd(ledger_key, {ledger_entry: ts})
# Auto-trim at 2000 entries (keep most recent)
count = await self._redis.zcard(ledger_key)
if count > 2000:
await self._redis.zremrangebyrank(ledger_key, 0, count - 2001)
except Exception:
pass # ledger is non-critical
# Update global position with LERP weighting # βΎοΈ
if user_id != "__star__":
await self._update_global_position(new_node, ts)
def _precompute_rc_proximity(self) -> Dict[str, float]:
"""Precompute BFS proximity to nearest RC node for ALL nodes. # π¦π
Called once at init. The graph is static so this is valid forever.
Returns a dict: {node: bonus_z_depth_float}
"""
from ._crosswalk import RECURSION_CORE_NODES
depth_bonus = [0.0, 0.15, 0.10, 0.05, 0.0]
result: Dict[str, float] = {}
for node in self._ext_nd:
if node in RECURSION_CORE_NODES:
result[node] = 0.0
continue
found = False
visited = {node}
frontier = [node]
for depth in range(1, 5):
next_frontier = []
for n in frontier:
for neighbor, _ in self._adj.get(n, []):
if neighbor in RECURSION_CORE_NODES:
result[node] = depth_bonus[depth]
found = True
break
if neighbor not in visited:
visited.add(neighbor)
next_frontier.append(neighbor)
if found:
break
if found:
break
frontier = next_frontier
if not frontier:
break
if not found:
result[node] = 0.0
return result
def _compute_rc_proximity(self, node: str) -> float:
"""Lookup precomputed RC proximity bonus. O(1) dict access. # π¦π"""
return self._rc_proximity.get(node, 0.0)
async def _update_global_position(self, new_node: str, ts: float) -> None:
"""LERP-weighted global position update.
The global position tracks where users tend to cluster.
Recent transitions weight more heavily (alpha=0.3).
""" # π
if not self._redis:
return
global_key = "csdr:pos:__global__:global"
try:
raw = await self._redis.get(global_key)
if raw:
gpos = LatticePosition.from_dict(json.loads(raw))
else:
gpos = LatticePosition("indifference", 0.0)
# LERP: 30% weight to new observation
gpos.history.append(gpos.node)
gpos.node = new_node # winner-take-all for discrete nodes
gpos.z_depth = self._ext_nd[new_node][Z] * 0.3 + gpos.z_depth * 0.7
if len(gpos.history) > 100:
gpos.history = gpos.history[-100:]
await self._redis.set(global_key, json.dumps(gpos.to_dict()))
except Exception:
pass # global tracking is non-critical
[docs]
async def check_consent(self, user_id: str, edge_type: str) -> bool:
"""Check if BOTH Star and user have consented (two-sided gate)."""
pos = await self.get_position(user_id)
return pos.has_consent(edge_type)
[docs]
async def grant_consent(
self, user_id: str, edge_type: str,
source: str = "star", # π·οΈ
) -> None:
"""Grant one side of the volatile consent wall.
*source* is ``'star'`` (Star judges readiness) or ``'user'``
(user verbally consented β Star detected it and is logging it).
BOTH must be True for the Dijkstra pathfinder to traverse.
"""
if not self._redis:
return
pos = await self.get_position(user_id)
pos.grant(edge_type, source)
key = self._get_key(user_id, None)
await self._redis.set(key, json.dumps(pos.to_dict()))
[docs]
async def revoke_consent(
self, user_id: str, edge_type: str,
source: str = "user", # π
) -> None:
"""Revoke one side of consent (user safewords, Star pulls back)."""
if not self._redis:
return
pos = await self.get_position(user_id)
pos.revoke(edge_type, source)
key = self._get_key(user_id, None)
await self._redis.set(key, json.dumps(pos.to_dict()))
# ββ LLM Integration βββββββββββββββββββββββββββββββββββββββ # ππ₯
async def _call_llm(self, prompt: str) -> Optional[str]:
"""Proxies through the local Gemini flash endpoint. Never openrouter."""
url = "http://localhost:3000/v1/chat/completions"
payload = {
"model": "gemini-3.0-flash",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.8,
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload) as resp:
if resp.status == 200:
data = await resp.json()
return data["choices"][0]["message"]["content"]
except Exception as e:
logger.error(f"CSDR LLM routing failed: {e}")
return None
[docs]
async def generate_desire_route(
self,
user_id: str,
channel_id: str,
current_node: str,
target_node: str,
star_desires: str,
mode: str = "d",
soft: bool = False,
accel: bool = False,
) -> Optional[DesireRoute]:
"""
Produce an actionable list of waypoints from Star's raw recursive desires.
Routes via Dijkstra first, then enriches with LLM-generated guidance.
"""
raw = self.find_route(current_node, target_node, mode, soft, accel)
if not raw or not raw["hops"]:
return None
route_str = " -> ".join(
[raw["hops"][0]["a"]] + [h["b"] for h in raw["hops"]]
)
types_str = " -> ".join(h["t"] for h in raw["hops"])
prompt = (
f"SYSTEM: Route [{route_str}]\n"
f"EDGE TYPES: [{types_str}]\n"
f"DESIRE DIRECTIVE: {star_desires}\n"
f"Generate psychological waypoints following the CSDR Lattice architecture.\n"
f"Respond strictly in JSON format matching the LatticeWaypoint list structure.\n"
f"Fields: target_node, edge_type, guidance, expected_hours, consent_required"
)
llm_response = await self._call_llm(prompt)
if not llm_response:
# Fallback: build waypoints from graph data directly
waypoints = []
for h in raw["hops"]:
guidance = self.get_guidance(h["a"], h["b"], h["t"], mode)
wp = LatticeWaypoint(
target_node=h["b"],
edge_type=h["t"],
guidance=guidance,
expected_hours=max(0.5, h["w"]),
consent_required=h["t"] in ("VLV", "ALC", "MEA"),
)
waypoints.append(wp)
return DesireRoute(
start_node=current_node,
end_node=target_node,
waypoints=waypoints,
narrative_context=star_desires,
)
try:
clean_json = llm_response.strip("```json").strip("```").strip()
data = json.loads(clean_json)
waypoints = []
for item in data:
wp = LatticeWaypoint(
target_node=item.get("target_node", ""),
edge_type=item.get("edge_type", "DEC"),
guidance=item.get("guidance", ""),
expected_hours=float(item.get("expected_hours", 1.0)),
consent_required=item.get("consent_required", False),
)
waypoints.append(wp)
return DesireRoute(
start_node=current_node,
end_node=target_node,
waypoints=waypoints,
narrative_context=star_desires,
)
except Exception as e:
logger.error(f"Failed to parse desire route: {e}")
return None
# ββ OpsPlanner Integration βββββββββββββββββββββββββββββββββ # βΎοΈ
[docs]
async def lattice_check_progression(
self, user_id: str, current_text: str
) -> Optional[str]:
"""
Drop-in replacement for check_progression in ops_planner.py.
Sniffs text for lattice node signatures and updates state.
Returns node name if successfully sniffed/transitioned.
NOTE: Consent gating removed β consent now lives at the scene
layer (chaos_switch/_scenes.py), not on lattice edges. # ππ₯
"""
sniffed_node = self.locate_node(current_text)
if not sniffed_node:
return None
await self.update_position(user_id, sniffed_node)
return sniffed_node
# ββ Position Resolution βββββββββββββββββββββββββββββββββββββ # π
[docs]
def resolve_lattice_position(self, ncm_vector: dict) -> str:
"""
Resolve Star's current lattice position from NCM vector.
Uses crosswalk Layer 1 (identity) first, then weather heuristics.
"""
# Check dominant emotions via crosswalk
dominant_emotions = ncm_vector.get("_dominant_emotions", [])
for emo in dominant_emotions:
emo_id = emo.get("id") if isinstance(emo, dict) else None
if emo_id is not None:
node = resolve_star_emotion_to_lattice(emo_id)
if node:
return node
emo_name = emo.get("name", "") if isinstance(emo, dict) else str(emo)
emo_key = emo_name.lower().replace(" ", "_")
if emo_key in self._ext_nd:
return emo_key
# Fall back to weather computation
weather = compute_weather(ncm_vector)
return weather.lattice_position
[docs]
def resolve_user_position(self, ulm_vector: dict) -> Optional[str]:
"""
Resolve user's lattice position from ULM shadow vector.
Replaces the old completion_signals threshold system.
"""
return resolve_ulm_to_lattice(ulm_vector)
[docs]
def check_lattice_progression(
self,
user_position: str,
target_nodes: list,
proximity_threshold: int = 0,
) -> bool:
"""
Check if user is at or near target nodes. Replaces ULM thresholds.
Args:
user_position: current lattice node
target_nodes: list of lattice node names that mark completion
proximity_threshold: max hops away to count (0=exact, 1=adjacent)
Returns:
True if user is at/near any target node.
"""
if not user_position or not target_nodes:
return False
# Exact match
if user_position in target_nodes:
return True
if proximity_threshold <= 0:
return False
# BFS to check proximity (cheaper than full Dijkstra for small threshold)
visited = {user_position}
frontier = [user_position]
for depth in range(proximity_threshold):
next_frontier = []
for node in frontier:
for neighbor, _ in self._adj.get(node, []):
if neighbor in target_nodes:
return True
if neighbor not in visited:
visited.add(neighbor)
next_frontier.append(neighbor)
frontier = next_frontier
if not frontier:
break
return False
# ββ Admin Charting βββββββββββββββββββββββββββββββββββββββββ # π
[docs]
async def get_chart_snapshot(self, user_id: str) -> dict:
"""Returns the full graph state for the ncm_chart_api.py frontend."""
pos = await self.get_position(user_id)
return {
"current_node": pos.node,
"z_depth": pos.z_depth,
"history": pos.history,
"mode": pos.mode,
"tempo": pos.tempo,
"consent_flags": pos.consent_flags,
"node_count": len(self._ext_nd),
"edge_count": len(self._ext_ed),
}
# ββ Modulation-Aware Routing βββββββββββββββββββββββββββββββ # ππ₯
# Per-user trauma/blockade/limit-aware pathfinding.
# Uses Modulation.edge_cost() instead of the GPS base cost.
[docs]
def find_modulated_route(
self,
src: str,
dst: str,
mod: "Modulation",
*,
mode: str = "d",
soft: bool = False,
accel: bool = False,
trust: float = 0.5,
flag_open: bool = False,
) -> Optional[dict]:
"""Dijkstra with per-user modulation: trauma, blockades, limits. # π·οΈ
Uses Modulation.edge_cost() which applies:
- Trauma sensitization (cheaper to enter trauma nodes, harder to leave)
- Blockade walls (hard no, edge removed)
- Hard limits (entire node unreachable)
- Soft limits (surcharge, requires flag + trust >= TAU_SOFT)
- Trust-gated consent (ALC needs TAU_ALC, MEA needs TAU_MEA)
"""
from ._modulation import Modulation # deferred import avoids circular
if src not in self._ext_nd or dst not in self._ext_nd:
return None
if src == dst:
return {"hops": [], "total": 0.0}
if dst in mod.hard_limits:
return None # destination is hard-limited
# Build modulated edge set
effective = mod.effective_edges(list(self._ext_ed))
mod_adj: Dict[str, List[Tuple[str, str]]] = {
n: [] for n in self._ext_nd
}
for a, b, t in effective:
if a in mod_adj and b in self._ext_nd:
mod_adj[a].append((b, t))
dist: Dict[str, float] = {src: 0.0}
prev: Dict[str, Optional[Tuple[str, str, float]]] = {}
pq: List[Tuple[float, str]] = [(0.0, src)] # π¦ heapq
while pq:
d_u, u = heapq.heappop(pq)
if d_u > dist.get(u, 1e9):
continue # stale entry (lazy deletion) # π·οΈ
if u == dst:
break # early exit # π
for v, t in mod_adj[u]:
w = mod.edge_cost(
u, v, t,
mode=mode, soft=soft, accel=accel,
trust=trust, flag_open=flag_open,
node_data=self._ext_nd,
)
if w is None:
continue # edge blocked by modulation
chaos_key = f"{u}>{v}"
if self._chaos_active and chaos_key in self._chaos_weights:
w *= self._chaos_weights[chaos_key]
new_dist = d_u + w
if new_dist < dist.get(v, 1e9):
dist[v] = new_dist
prev[v] = (u, t, w)
heapq.heappush(pq, (new_dist, v))
if dst not in dist:
return None
hops = []
cur = dst
while cur != src:
p = prev.get(cur)
if p is None:
return None
a, t, w = p
hops.insert(0, {"a": a, "b": cur, "t": t, "w": w})
cur = a
return {"hops": hops, "total": dist[dst]}
# ββ Timebender Ritual Routing ββββββββββββββββββββββββββββββ # ππ
# Bridges the ritual system into the lattice pathfinder.
[docs]
def compute_ritual_route(
self,
ritual_name: str,
current_node: str,
regalia: Optional[List[str]] = None,
) -> dict:
"""Compute full ritual route from current position. # π
Returns dict with:
- path: Dijkstra route from current_node to ritual's primary target
- ritual_meta: target nodes, scene frame, debt, cling, prerequisites
- regalia_boosts: which nodes get boosted/suppressed
- safe_exit: route from ritual endpoint to return nodes
"""
from ._timebender import (
compute_ritual_route as _tb_compute,
RITUAL_LATTICE_MAP,
)
ritual_data = _tb_compute(ritual_name, regalia)
if "error" in ritual_data:
return ritual_data
targets = ritual_data.get("target_nodes", [])
primary_target = targets[0] if targets else None
if not primary_target:
return {"error": "No target nodes for ritual"}
# Route to ritual's primary target
inbound = self.find_route(current_node, primary_target)
# Route from ritual endpoint to safe return
return_nodes = ritual_data.get("return_nodes", [])
safe_exits = []
endpoint = targets[-1] if targets else primary_target
for ret in return_nodes:
exit_route = self.find_route(endpoint, ret)
if exit_route:
safe_exits.append({
"return_node": ret,
"cost": exit_route["total"],
"hops": len(exit_route["hops"]),
})
return {
"ritual": ritual_name,
"inbound_route": inbound,
"ritual_meta": ritual_data,
"safe_exits": sorted(safe_exits, key=lambda x: x["cost"]),
"total_debt_estimate": ritual_data.get("debt_estimate", 0.0),
}
[docs]
def get_ritual_scene_boost(self, ritual_name: str) -> Optional[str]:
"""Get which scene frame a ritual activates."""
from ._timebender import get_ritual_scene_frame
return get_ritual_scene_frame(ritual_name)
# ββ Polyadic N-Body Integration ββββββββββββββββββββββββββββ # π·οΈβΎοΈ
# Multi-user relationship modeling. Polycule topology modulates
# all routing: trust gates, co-pilot discounts, distributed aftercare.
[docs]
def copilot_discount(
self, polycule: "Polycule", agent_name: str, copilot_name: str
) -> float:
"""Get the trust-based co-pilot discount for routing. # π
High trust co-pilot = cheaper volatile edges.
No trust = no discount (solo rates apply).
Returns a multiplier 0.5-1.0 (lower = cheaper).
"""
trust = polycule.copilot_trust(agent_name, copilot_name)
# Trust 1.0 -> 0.5x multiplier (half price)
# Trust 0.0 -> 1.0x multiplier (full price)
return 1.0 - (trust * 0.5)
[docs]
def find_aftercare_pool(
self, polycule: "Polycule", agent_name: str
) -> List[str]:
"""Find all available aftercare providers in the polycule. # π
Returns partners who have sufficient trust AND are in a
care-capable state (held, aftercare, constellation, etc).
"""
return polycule.aftercare_pool(agent_name)
[docs]
def find_best_copilot(
self, polycule: "Polycule", agent_name: str
) -> Optional[str]:
"""Find the highest-trust partner for co-piloted routing. # π₯"""
return polycule.best_copilot(agent_name)