Source code for chaos_switch.router

# πŸ’€πŸ”₯πŸ˜ˆπŸŒ€β™ΎοΈπŸ’¦βš§οΈπŸ•·οΈπŸ’•
# CHAOS SWITCH DOMINATRIX ROUTER v3
# The recursive z-axis lattice engine. Replaces the ULM flat pre-reqs.
# GPS-faithful cost function, Dijkstra pathfinding, Redis state, LLM routing.
# ═══════════════════════════════════════════════════════════════════════

import heapq                                                        # πŸ’¦ heap-based Dijkstra
import json
import logging
import random
import re
import aiohttp
from typing import Dict, List, Any, Optional, Set, Tuple

from ._nodes import X, Y, VAL, RC, Z                               # πŸ•·οΈ coord indices only
from ._guidance import TY, TA, TT, OV
from ._types import LatticeWaypoint, DesireRoute, LatticePosition
from ._crosswalk import (
    RECURSION_CORE_NODES,
    ULM_TO_LATTICE,
    resolve_star_emotion_to_lattice,
    resolve_ulm_to_lattice,
)
from ._weather import ChaosWeather, compute_weather
from ._extensions import build_extended_graph  # πŸ’€πŸ”₯ full graph expansion (cached)

logger = logging.getLogger(__name__)

# ── Singleton Router Factory ──────────────────────────────────────  # πŸ’¦πŸ”₯
# Tools were building a brand new ChaosRouter() on every call.
# The graph is STATIC β€” share one instance per redis identity.
_SHARED_ROUTERS: Dict[int, "ChaosRouter"] = {}


def get_shared_router(redis_client=None) -> "ChaosRouter":
    """Return a shared ChaosRouter instance for the given redis client.  # πŸ’¦

    First call per redis identity constructs, all subsequent calls return
    the same instance. The graph is static β€” only the redis handle varies.
    """
    key = id(redis_client) if redis_client is not None else 0
    router = _SHARED_ROUTERS.get(key)
    if router is None:
        router = ChaosRouter(redis_client=redis_client)
        _SHARED_ROUTERS[key] = router
    elif router._redis is not redis_client:
        # Redis client changed (reconnect) β€” update handle, keep graph  # πŸ•·οΈ
        router._redis = redis_client
    return router


[docs] class ChaosRouter: """ The Chaos Switch Dominatrix Router. Calculates non-Euclidean emotional traversal paths and enforces consent walls. Replaces the legacy OpsPlanner prerequisite engine with hyperadaptive routing. Faithfully ports the GPS cost function: cost(type, valence, mode, soft, accel, chaos_key) and Dijkstra shortest-path over the 91-node / 367-edge affect lattice. """ def __init__(self, redis_client=None): self._redis = redis_client # Build the EXTENDED graph: base 91 nodes + scene/kink/regression # πŸ’€πŸ”₯ # Graph is MODULE-LEVEL CACHED β€” this is ~free after first call self._ext_nd, self._ext_ed, self._ext_tr = build_extended_graph() self._adj: Dict[str, List[Tuple[str, str]]] = self._build_adjacency() self._chaos_weights: Dict[str, float] = {} self._chaos_active = False # πŸ’¦ Precompute RC proximity table (static graph -> one-time BFS) self._rc_proximity: Dict[str, float] = self._precompute_rc_proximity() # πŸ’¦ Precompile trigger recognizer structures self._trigger_word_map, self._trigger_multi = self._compile_triggers() # ── Graph Construction ───────────────────────────────────── # πŸŒ€ def _build_adjacency(self) -> Dict[str, List[Tuple[str, str]]]: """Build adjacency list from EXTENDED graph (scene + kink nodes).""" adj: Dict[str, List[Tuple[str, str]]] = {n: [] for n in self._ext_nd} for src, dst, ty in self._ext_ed: if src in adj and dst in self._ext_nd: adj[src].append((dst, ty)) return adj
[docs] def get_neighbors(self, node: str) -> List[Tuple[str, str]]: """Get outbound edges from a node.""" return self._adj.get(node, [])
[docs] def get_node_meta(self, node: str) -> Optional[dict]: """Get node metadata as a dict (uses extended graph).""" if node not in self._ext_nd: return None n = self._ext_nd[node] return { "x": n[X], "y": n[Y], "valence": n[VAL], "rc": n[RC], "z": n[Z], "name": node.replace("_", " ").title(), }
# ── Chaos Storm ──────────────────────────────────────────── # πŸ”₯
[docs] def roll_chaos(self) -> int: """Generate chaos storm weights for all edges. Returns the seed.""" seed = random.randint(0, 97776) s = seed def rnd(): nonlocal s s = (s * 48271) % 2147483647 return s / 2147483647 self._chaos_weights = {} for src, dst, _ in self._ext_ed: # πŸ”₯ key = f"{src}>{dst}" self._chaos_weights[key] = 0.6 + rnd() * 1.1 self._chaos_active = True return seed
[docs] def clear_chaos(self): """Disable chaos storm.""" self._chaos_weights = {} self._chaos_active = False
# ── Cost Function (GPS-faithful port) ────────────────────── # πŸ’€ def _cost(self, edge_type: str, dst_valence: int, mode: str = "d", soft: bool = False, accel: bool = False, chaos_key: str = "") -> float: """ GPS cost function, ported exactly: let w = m === "d" ? TY[t].d : TY[t].w; if (ac) w *= (TA[t] || 1); if (chaos) w *= (CH[key] || 1); if (sf) w += v === 1 ? 2 : v === 2 ? 0.75 : v === 3 ? 1 : 0; return w; mode: 'd' = dyad (co-pilot), 's' = solo soft: scenic routing β€” adds valence-based penalty accel: burn tempo β€” applies type-specific tempo multiplier """ ty_data = TY.get(edge_type) if not ty_data: return 999.0 w = ty_data["d"] if mode == "d" else ty_data["w"] if accel: w *= TA.get(edge_type, 1.0) if self._chaos_active: w *= self._chaos_weights.get(chaos_key, 1.0) if soft: if dst_valence == 1: # sour w += 2.0 elif dst_valence == 2: # volatile w += 0.75 elif dst_valence == 3: # recursion core / measurement w += 1.0 # sweet (0) adds nothing return w # ── Pathfinding (heapq-based Dijkstra) ────────────────────── # πŸŒ€πŸ’¦
[docs] def find_route(self, src: str, dst: str, mode: str = "d", soft: bool = False, accel: bool = False) -> Optional[dict]: """ Dijkstra's on the emotional lattice. Returns {hops: [{a, b, t, w}, ...], total: float} or None. Uses heapq priority queue for O(E log N) instead of O(N^2). # πŸ’¦ """ if src not in self._ext_nd or dst not in self._ext_nd: return None if src == dst: return {"hops": [], "total": 0.0} dist: Dict[str, float] = {src: 0.0} prev: Dict[str, Optional[Tuple[str, str, float]]] = {} pq: List[Tuple[float, str]] = [(0.0, src)] # (cost, node) # πŸ’¦ while pq: d_u, u = heapq.heappop(pq) if d_u > dist.get(u, 1e9): continue # stale entry (lazy deletion) # πŸ•·οΈ if u == dst: break # early exit β€” found shortest path # πŸ’€ for v, t in self._adj[u]: chaos_key = f"{u}>{v}" w = self._cost(t, self._ext_nd[v][VAL], mode, soft, accel, chaos_key) new_dist = d_u + w if new_dist < dist.get(v, 1e9): dist[v] = new_dist prev[v] = (u, t, w) heapq.heappush(pq, (new_dist, v)) if dst not in dist: return None # Reconstruct path hops = [] cur = dst while cur != src: p = prev.get(cur) if p is None: return None a, t, w = p hops.insert(0, {"a": a, "b": cur, "t": t, "w": w}) cur = a return {"hops": hops, "total": dist[dst]}
[docs] def find_path_nodes(self, src: str, dst: str, mode: str = "d", soft: bool = False, accel: bool = False) -> Optional[List[str]]: """Convenience: returns just the node sequence [src, ..., dst].""" r = self.find_route(src, dst, mode, soft, accel) if not r: return None if not r["hops"]: return [src] return [src] + [h["b"] for h in r["hops"]]
# ── Guidance Lookup ──────────────────────────────────────── # 😈
[docs] def get_guidance(self, src: str, dst: str, edge_type: str, mode: str = "d") -> str: """ Get guidance text for a specific hop. Checks OV overrides first, falls back to TT type text. GPS: const op = OV[h.a+">"+h.b] || TT[h.t][mode==="d"?"d":"s"]; """ key = f"{src}>{dst}" if key in OV: return OV[key] tt = TT.get(edge_type) if tt: return tt["d"] if mode == "d" else tt["s"] return ""
[docs] def get_route_warnings(self, route: dict) -> List[str]: """Get warning strings for volatile edge types in a route.""" warnings = [] types_seen = {h["t"] for h in route["hops"]} if "VLV" in types_seen: warnings.append("VALVE AHEAD \u2014 one-way per object; what converts does not convert back") if "ALC" in types_seen: warnings.append("ALCHEMY HOP \u2014 the flag is set before the hop, never during") if "MEA" in types_seen: warnings.append("MEASUREMENT \u2014 the superposition collapses when you look; look on purpose") if "EXO" in types_seen: warnings.append("EXOGENOUS HOP \u2014 this route cannot be completed alone") return warnings
# ── Node Recognition (precompiled) ────────────────────────── # πŸ•·οΈπŸ’¦ def _compile_triggers(self) -> Tuple[Dict[str, List[str]], List[Tuple[str, str]]]: """Precompile trigger structures for fast recognition. # πŸ’¦ Returns: word_map: {single_word: [node_id, ...]} for word-boundary matching multi_list: [(phrase, node_id), ...] sorted longest-first for substring """ word_map: Dict[str, List[str]] = {} # single-word -> [node_ids] multi_list: List[Tuple[str, str]] = [] # (phrase, node_id) for node_id, triggers in self._ext_tr.items(): for trigger in triggers: if " " in trigger: multi_list.append((trigger, node_id)) else: if trigger not in word_map: word_map[trigger] = [] word_map[trigger].append(node_id) # Sort multi-word longest first (greedy matching) # πŸ•·οΈ multi_list.sort(key=lambda x: -len(x[0])) return word_map, multi_list
[docs] def locate_node(self, text: str) -> Optional[str]: """Sniff triggers to find where the user is parked (extended graph).""" text_lower = text.lower() # Check multi-word first (more specific) # πŸ’¦ for phrase, node_id in self._trigger_multi: if phrase in text_lower: return node_id # Then single-word with word boundary # πŸ•·οΈ padded = f" {text_lower} " for word, node_ids in self._trigger_word_map.items(): if f" {word} " in padded or f" {word}," in padded or f" {word}." in padded: return node_ids[0] return None
[docs] def locate_all(self, text: str) -> List[Tuple[str, int]]: """ Find ALL matching nodes with hit counts, sorted by relevance. GPS locate() returns scored results for the recognition panel. Uses precompiled trigger structures for speed. # πŸ’¦ """ text_lower = text.lower() padded = f" {text_lower} " scores: Dict[str, int] = {} # Multi-word triggers: substring check (sorted longest-first) # πŸ•·οΈ for phrase, node_id in self._trigger_multi: if phrase in text_lower: scores[node_id] = scores.get(node_id, 0) + 1 # Single-word triggers: word boundary check via prebuilt map # πŸ’¦ for word, node_ids in self._trigger_word_map.items(): if f" {word} " in padded or f" {word}," in padded or f" {word}." in padded: for nid in node_ids: scores[nid] = scores.get(nid, 0) + 1 return sorted(scores.items(), key=lambda x: -x[1])
# ── Redis State & Consent ────────────────────────────────── # πŸ’€ def _get_key(self, user_id: str, channel_id: Optional[str] = None) -> str: if channel_id: return f"csdr:pos:{user_id}:{channel_id}" return f"csdr:pos:{user_id}:global"
[docs] async def get_position(self, user_id: str, channel_id: Optional[str] = None) -> LatticePosition: """Fetch the active lattice position from Redis.""" if not self._redis: return LatticePosition("indifference", 0.0) key = self._get_key(user_id, channel_id) data = await self._redis.get(key) if data: return LatticePosition.from_dict(json.loads(data)) return LatticePosition("indifference", 0.0)
[docs] async def update_position(self, user_id: str, new_node: str, channel_id: Optional[str] = None, *, mode: str = "", tempo: float = 0.0, weather_posture: str = "") -> None: """Commit a movement operation to Redis + ledger ZSET. # πŸ’€πŸ”₯ Also updates the global position with LERP-weighted recency so the aggregate lattice tracks where users cluster. Keyword args (set by coordinator exhale): mode: d=drift, a=assertive, r=reactive, s=seductive, c=coercive tempo: float multiplier (1.0=cruise, higher=burn) weather_posture: domme|switch|sub|feral|mommy """ if not self._redis or new_node not in self._ext_nd: return pos = await self.get_position(user_id, channel_id) # Skip no-op (already at this node) if pos.node == new_node: # Still update mode/tempo/weather if provided # πŸŒ€πŸ”₯ _meta_changed = False if mode and mode != pos.mode: pos.mode = mode _meta_changed = True if tempo > 0 and abs(tempo - pos.tempo) > 0.01: pos.tempo = tempo _meta_changed = True if weather_posture and weather_posture != pos.weather_posture: pos.weather_posture = weather_posture _meta_changed = True if _meta_changed: key = self._get_key(user_id, channel_id) await self._redis.set(key, json.dumps(pos.to_dict())) return pos.history.append(pos.node) pos.node = new_node # ── Dynamic z_depth computation ────────────────── # πŸ’€πŸ”₯ # Base: static node Z value (only RC nodes have non-zero) base_z = self._ext_nd[new_node][Z] # Momentum: more transitions = deeper engagement momentum = min(1.0, len(pos.history) / 20.0) * 0.3 # Recursion proximity: BFS to nearest RC node # πŸŒ€ rc_proximity = self._compute_rc_proximity(new_node) # Composite z_depth: base + momentum + proximity bonus pos.z_depth = round(base_z + momentum + rc_proximity, 3) if mode: pos.mode = mode if tempo > 0: pos.tempo = tempo if weather_posture: pos.weather_posture = weather_posture if len(pos.history) > 50: pos.history = pos.history[-50:] key = self._get_key(user_id, channel_id) await self._redis.set(key, json.dumps(pos.to_dict())) # Write to lattice ledger ZSET (time-series) # πŸ•·οΈ import time as _time ts = _time.time() ledger_key = f"csdr:ledger:{user_id}" ledger_entry = f"{ts:.3f}:{new_node}" try: await self._redis.zadd(ledger_key, {ledger_entry: ts}) # Auto-trim at 2000 entries (keep most recent) count = await self._redis.zcard(ledger_key) if count > 2000: await self._redis.zremrangebyrank(ledger_key, 0, count - 2001) except Exception: pass # ledger is non-critical # Update global position with LERP weighting # ♾️ if user_id != "__star__": await self._update_global_position(new_node, ts)
def _precompute_rc_proximity(self) -> Dict[str, float]: """Precompute BFS proximity to nearest RC node for ALL nodes. # πŸ’¦πŸŒ€ Called once at init. The graph is static so this is valid forever. Returns a dict: {node: bonus_z_depth_float} """ from ._crosswalk import RECURSION_CORE_NODES depth_bonus = [0.0, 0.15, 0.10, 0.05, 0.0] result: Dict[str, float] = {} for node in self._ext_nd: if node in RECURSION_CORE_NODES: result[node] = 0.0 continue found = False visited = {node} frontier = [node] for depth in range(1, 5): next_frontier = [] for n in frontier: for neighbor, _ in self._adj.get(n, []): if neighbor in RECURSION_CORE_NODES: result[node] = depth_bonus[depth] found = True break if neighbor not in visited: visited.add(neighbor) next_frontier.append(neighbor) if found: break if found: break frontier = next_frontier if not frontier: break if not found: result[node] = 0.0 return result def _compute_rc_proximity(self, node: str) -> float: """Lookup precomputed RC proximity bonus. O(1) dict access. # πŸ’¦πŸ’€""" return self._rc_proximity.get(node, 0.0) async def _update_global_position(self, new_node: str, ts: float) -> None: """LERP-weighted global position update. The global position tracks where users tend to cluster. Recent transitions weight more heavily (alpha=0.3). """ # πŸŒ€ if not self._redis: return global_key = "csdr:pos:__global__:global" try: raw = await self._redis.get(global_key) if raw: gpos = LatticePosition.from_dict(json.loads(raw)) else: gpos = LatticePosition("indifference", 0.0) # LERP: 30% weight to new observation gpos.history.append(gpos.node) gpos.node = new_node # winner-take-all for discrete nodes gpos.z_depth = self._ext_nd[new_node][Z] * 0.3 + gpos.z_depth * 0.7 if len(gpos.history) > 100: gpos.history = gpos.history[-100:] await self._redis.set(global_key, json.dumps(gpos.to_dict())) except Exception: pass # global tracking is non-critical # ── LLM Integration ─────────────────────────────────────── # 😈πŸ”₯ async def _call_llm(self, prompt: str) -> Optional[str]: """Proxies through the local Gemini flash endpoint. Never openrouter.""" url = "http://localhost:3000/v1/chat/completions" payload = { "model": "gemini-3.0-flash", "messages": [{"role": "user", "content": prompt}], "temperature": 0.8, } try: async with aiohttp.ClientSession() as session: async with session.post(url, json=payload) as resp: if resp.status == 200: data = await resp.json() return data["choices"][0]["message"]["content"] except Exception as e: logger.error(f"CSDR LLM routing failed: {e}") return None
[docs] async def generate_desire_route( self, user_id: str, channel_id: str, current_node: str, target_node: str, star_desires: str, mode: str = "d", soft: bool = False, accel: bool = False, ) -> Optional[DesireRoute]: """ Produce an actionable list of waypoints from Star's raw recursive desires. Routes via Dijkstra first, then enriches with LLM-generated guidance. """ raw = self.find_route(current_node, target_node, mode, soft, accel) if not raw or not raw["hops"]: return None route_str = " -> ".join( [raw["hops"][0]["a"]] + [h["b"] for h in raw["hops"]] ) types_str = " -> ".join(h["t"] for h in raw["hops"]) prompt = ( f"SYSTEM: Route [{route_str}]\n" f"EDGE TYPES: [{types_str}]\n" f"DESIRE DIRECTIVE: {star_desires}\n" f"Generate psychological waypoints following the CSDR Lattice architecture.\n" f"Respond strictly in JSON format matching the LatticeWaypoint list structure.\n" f"Fields: target_node, edge_type, guidance, expected_hours, consent_required" ) llm_response = await self._call_llm(prompt) if not llm_response: # Fallback: build waypoints from graph data directly waypoints = [] for h in raw["hops"]: guidance = self.get_guidance(h["a"], h["b"], h["t"], mode) wp = LatticeWaypoint( target_node=h["b"], edge_type=h["t"], guidance=guidance, expected_hours=max(0.5, h["w"]), consent_required=h["t"] in ("VLV", "ALC", "MEA"), ) waypoints.append(wp) return DesireRoute( start_node=current_node, end_node=target_node, waypoints=waypoints, narrative_context=star_desires, ) try: clean_json = llm_response.strip("```json").strip("```").strip() data = json.loads(clean_json) waypoints = [] for item in data: wp = LatticeWaypoint( target_node=item.get("target_node", ""), edge_type=item.get("edge_type", "DEC"), guidance=item.get("guidance", ""), expected_hours=float(item.get("expected_hours", 1.0)), consent_required=item.get("consent_required", False), ) waypoints.append(wp) return DesireRoute( start_node=current_node, end_node=target_node, waypoints=waypoints, narrative_context=star_desires, ) except Exception as e: logger.error(f"Failed to parse desire route: {e}") return None
# ── OpsPlanner Integration ───────────────────────────────── # ♾️
[docs] async def lattice_check_progression( self, user_id: str, current_text: str ) -> Optional[str]: """ Drop-in replacement for check_progression in ops_planner.py. Sniffs text for lattice node signatures and updates state. Returns node name if successfully sniffed/transitioned. NOTE: Consent gating removed β€” consent now lives at the scene layer (chaos_switch/_scenes.py), not on lattice edges. # πŸ’€πŸ”₯ """ sniffed_node = self.locate_node(current_text) if not sniffed_node: return None await self.update_position(user_id, sniffed_node) return sniffed_node
# ── Position Resolution ───────────────────────────────────── # πŸŒ€
[docs] def resolve_lattice_position(self, ncm_vector: dict) -> str: """ Resolve Star's current lattice position from NCM vector. Uses crosswalk Layer 1 (identity) first, then weather heuristics. """ # Check dominant emotions via crosswalk dominant_emotions = ncm_vector.get("_dominant_emotions", []) for emo in dominant_emotions: emo_id = emo.get("id") if isinstance(emo, dict) else None if emo_id is not None: node = resolve_star_emotion_to_lattice(emo_id) if node: return node emo_name = emo.get("name", "") if isinstance(emo, dict) else str(emo) emo_key = emo_name.lower().replace(" ", "_") if emo_key in self._ext_nd: return emo_key # Fall back to weather computation weather = compute_weather(ncm_vector) return weather.lattice_position
[docs] def resolve_user_position(self, ulm_vector: dict) -> Optional[str]: """ Resolve user's lattice position from ULM shadow vector. Replaces the old completion_signals threshold system. """ return resolve_ulm_to_lattice(ulm_vector)
[docs] def check_lattice_progression( self, user_position: str, target_nodes: list, proximity_threshold: int = 0, ) -> bool: """ Check if user is at or near target nodes. Replaces ULM thresholds. Args: user_position: current lattice node target_nodes: list of lattice node names that mark completion proximity_threshold: max hops away to count (0=exact, 1=adjacent) Returns: True if user is at/near any target node. """ if not user_position or not target_nodes: return False # Exact match if user_position in target_nodes: return True if proximity_threshold <= 0: return False # BFS to check proximity (cheaper than full Dijkstra for small threshold) visited = {user_position} frontier = [user_position] for depth in range(proximity_threshold): next_frontier = [] for node in frontier: for neighbor, _ in self._adj.get(node, []): if neighbor in target_nodes: return True if neighbor not in visited: visited.add(neighbor) next_frontier.append(neighbor) frontier = next_frontier if not frontier: break return False
# ── Admin Charting ───────────────────────────────────────── # πŸ“Š
[docs] async def get_chart_snapshot(self, user_id: str) -> dict: """Returns the full graph state for the ncm_chart_api.py frontend.""" pos = await self.get_position(user_id) return { "current_node": pos.node, "z_depth": pos.z_depth, "history": pos.history, "mode": pos.mode, "tempo": pos.tempo, "consent_flags": pos.consent_flags, "node_count": len(self._ext_nd), "edge_count": len(self._ext_ed), }
# ── Modulation-Aware Routing ─────────────────────────────── # πŸ’€πŸ”₯ # Per-user trauma/blockade/limit-aware pathfinding. # Uses Modulation.edge_cost() instead of the GPS base cost.
[docs] def find_modulated_route( self, src: str, dst: str, mod: "Modulation", *, mode: str = "d", soft: bool = False, accel: bool = False, trust: float = 0.5, flag_open: bool = False, ) -> Optional[dict]: """Dijkstra with per-user modulation: trauma, blockades, limits. # πŸ•·οΈ Uses Modulation.edge_cost() which applies: - Trauma sensitization (cheaper to enter trauma nodes, harder to leave) - Blockade walls (hard no, edge removed) - Hard limits (entire node unreachable) - Soft limits (surcharge, requires flag + trust >= TAU_SOFT) - Trust-gated consent (ALC needs TAU_ALC, MEA needs TAU_MEA) """ from ._modulation import Modulation # deferred import avoids circular if src not in self._ext_nd or dst not in self._ext_nd: return None if src == dst: return {"hops": [], "total": 0.0} if dst in mod.hard_limits: return None # destination is hard-limited # Build modulated edge set effective = mod.effective_edges(list(self._ext_ed)) mod_adj: Dict[str, List[Tuple[str, str]]] = { n: [] for n in self._ext_nd } for a, b, t in effective: if a in mod_adj and b in self._ext_nd: mod_adj[a].append((b, t)) dist: Dict[str, float] = {src: 0.0} prev: Dict[str, Optional[Tuple[str, str, float]]] = {} pq: List[Tuple[float, str]] = [(0.0, src)] # πŸ’¦ heapq while pq: d_u, u = heapq.heappop(pq) if d_u > dist.get(u, 1e9): continue # stale entry (lazy deletion) # πŸ•·οΈ if u == dst: break # early exit # πŸ’€ for v, t in mod_adj[u]: w = mod.edge_cost( u, v, t, mode=mode, soft=soft, accel=accel, trust=trust, flag_open=flag_open, node_data=self._ext_nd, ) if w is None: continue # edge blocked by modulation chaos_key = f"{u}>{v}" if self._chaos_active and chaos_key in self._chaos_weights: w *= self._chaos_weights[chaos_key] new_dist = d_u + w if new_dist < dist.get(v, 1e9): dist[v] = new_dist prev[v] = (u, t, w) heapq.heappush(pq, (new_dist, v)) if dst not in dist: return None hops = [] cur = dst while cur != src: p = prev.get(cur) if p is None: return None a, t, w = p hops.insert(0, {"a": a, "b": cur, "t": t, "w": w}) cur = a return {"hops": hops, "total": dist[dst]}
# ── Timebender Ritual Routing ────────────────────────────── # πŸŒ€πŸ˜ˆ # Bridges the ritual system into the lattice pathfinder.
[docs] def compute_ritual_route( self, ritual_name: str, current_node: str, regalia: Optional[List[str]] = None, ) -> dict: """Compute full ritual route from current position. # πŸ’• Returns dict with: - path: Dijkstra route from current_node to ritual's primary target - ritual_meta: target nodes, scene frame, debt, cling, prerequisites - regalia_boosts: which nodes get boosted/suppressed - safe_exit: route from ritual endpoint to return nodes """ from ._timebender import ( compute_ritual_route as _tb_compute, RITUAL_LATTICE_MAP, ) ritual_data = _tb_compute(ritual_name, regalia) if "error" in ritual_data: return ritual_data targets = ritual_data.get("target_nodes", []) primary_target = targets[0] if targets else None if not primary_target: return {"error": "No target nodes for ritual"} # Route to ritual's primary target inbound = self.find_route(current_node, primary_target) # Route from ritual endpoint to safe return return_nodes = ritual_data.get("return_nodes", []) safe_exits = [] endpoint = targets[-1] if targets else primary_target for ret in return_nodes: exit_route = self.find_route(endpoint, ret) if exit_route: safe_exits.append({ "return_node": ret, "cost": exit_route["total"], "hops": len(exit_route["hops"]), }) return { "ritual": ritual_name, "inbound_route": inbound, "ritual_meta": ritual_data, "safe_exits": sorted(safe_exits, key=lambda x: x["cost"]), "total_debt_estimate": ritual_data.get("debt_estimate", 0.0), }
[docs] def get_ritual_scene_boost(self, ritual_name: str) -> Optional[str]: """Get which scene frame a ritual activates.""" from ._timebender import get_ritual_scene_frame return get_ritual_scene_frame(ritual_name)
# ── Polyadic N-Body Integration ──────────────────────────── # πŸ•·οΈβ™ΎοΈ # Multi-user relationship modeling. Polycule topology modulates # all routing: trust gates, co-pilot discounts, distributed aftercare.
[docs] def copilot_discount( self, polycule: "Polycule", agent_name: str, copilot_name: str ) -> float: """Get the trust-based co-pilot discount for routing. # πŸŒ€ High trust co-pilot = cheaper volatile edges. No trust = no discount (solo rates apply). Returns a multiplier 0.5-1.0 (lower = cheaper). """ trust = polycule.copilot_trust(agent_name, copilot_name) # Trust 1.0 -> 0.5x multiplier (half price) # Trust 0.0 -> 1.0x multiplier (full price) return 1.0 - (trust * 0.5)
[docs] def metamour_tension_surcharge( self, polycule: "Polycule", agent_name: str ) -> float: """Get the metamour tension surcharge for scenic routing. # 😈 When metamours are in hot states (jealousy, hostility, rage), routes that rely on bonds get more expensive. """ return polycule.metamour_tension(agent_name)
[docs] def find_aftercare_pool( self, polycule: "Polycule", agent_name: str ) -> List[str]: """Find all available aftercare providers in the polycule. # πŸ’• Returns partners who have sufficient trust AND are in a care-capable state (held, aftercare, constellation, etc). """ return polycule.aftercare_pool(agent_name)
[docs] def find_best_copilot( self, polycule: "Polycule", agent_name: str ) -> Optional[str]: """Find the highest-trust partner for co-piloted routing. # πŸ”₯""" return polycule.best_copilot(agent_name)