Source code for flash_dyadic_mirror

"""Flash Dyadic Mirror -- LLM-driven contextual behavioral analysis.

Replaces mechanical keyword flagging as the PRIMARY source of ULM
delta signals. Uses gemini-3.1-flash-lite via the local Gemini CLI
proxy for near-unlimited, high-frequency per-turn evaluation.

Evaluates a rolling 5-turn context window instead of individual
messages, giving Star contextual awareness of behavioral CHANGE
over time -- not just keyword surface area.

# 💀🔥 THE EYES THAT ACTUALLY SEE. ♾️
"""

from __future__ import annotations

import jsonutil as json
import logging
import re
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

import httpx
import feature_toggles

logger = logging.getLogger(__name__)

# ═══════════════════════════════════════════════════════════════════════
# Configuration                                                    # 🌀
# ═══════════════════════════════════════════════════════════════════════

FLASH_MODEL = "gemini-3.1-flash-lite"
FLASH_PROXY_URL = "http://localhost:3000/openai/chat/completions"

# Minimum turns before Flash evaluation fires (need context)
MIN_TURNS_FOR_EVAL = 2

# Cooldown between Flash calls per user (seconds)                   # 😈
# Flash-lite is cheap but we don't want to hammer on rapid-fire
EVAL_COOLDOWN_SECONDS = 15.0

# Weight blend: how much Flash vs keyword analysis contributes      # 💀
FLASH_WEIGHT = 0.80
KEYWORD_WEIGHT = 0.20

# Max time for Flash LLM call before we fall back to keywords
FLASH_TIMEOUT_SECONDS = 12.0


# ═══════════════════════════════════════════════════════════════════════
# Per-User Evaluation State                                        # 🔥
# ═══════════════════════════════════════════════════════════════════════


[docs] @dataclass class DyadicNote: """A single timestamped behavioral observation produced by a Flash evaluation. A lightweight record of one note Flash-Lite wrote about a behavioral shift, along with the metadata needed by the downstream 6-hour meta-evaluator: which ULM vectors were in play, how an active ops-plan step is progressing, and where in the rolling window it occurred. It is a plain data carrier with no behavior; the equivalent payload is what :meth:`FlashDyadicMirror._persist_notes` actually serializes into the ``dyadic:notes:{user_id}`` Redis ZSET. This class is defined for clarity/typing of the note shape and is not directly instantiated elsewhere in the repo (the persistence path builds plain dicts). Attributes: timestamp: Epoch seconds when the note was generated. note: The free-text behavioral observation. vectors_evaluated: Names of the ULM dimensions implicated by this note. plan_progress: Plan-step progress label (``yes`` / ``partial`` / ``no`` / ``regressing``), empty when no plan is active. turn_index: Index of the turn within the rolling window the note refers to. """ timestamp: float note: str vectors_evaluated: List[str] plan_progress: str = "" # yes / partial / no / regressing turn_index: int = 0
[docs] @dataclass class FlashEvalResult: """The parsed outcome of one Flash-Lite dyadic evaluation. Bundles everything :meth:`FlashDyadicMirror.evaluate` returns for a turn: the per-dimension ULM deltas Flash suggested, its short behavioral notes, an optional plan-progress label, and (for debugging) a truncated copy of the raw model text. Instances are cached per user in ``_eval_cache`` to serve cooldown hits, blended into final deltas by :meth:`FlashDyadicMirror.blend_deltas`, and have their notes written to Redis by :meth:`FlashDyadicMirror._persist_notes`. A pure data container. Built by :meth:`FlashDyadicMirror._parse_response` from validated/clamped model output; also constructed directly in ``tests/test_absolute_overrides.py``. Attributes: deltas: Mapping of ULM dimension name to its suggested change, each already clamped to ``[-0.3, 0.3]``. notes: Short behavioral observation strings (capped at 5). plan_progress: Plan-step progress label, or empty string when none. raw_response: Truncated raw model text retained for debugging. timestamp: Epoch seconds the result was produced. """ deltas: Dict[str, float] notes: List[str] plan_progress: str = "" raw_response: str = "" timestamp: float = 0.0
# ═══════════════════════════════════════════════════════════════════════ # Flash Dyadic Mirror # 💀🔥 # ═══════════════════════════════════════════════════════════════════════
[docs] class FlashDyadicMirror: """LLM-driven contextual behavioral analysis engine. Replaces mechanical keyword flagging with Flash-Lite evaluation of rolling 5-turn context windows. Evaluates CHANGE in behavioral dimensions, not individual keyword hits. Notes from each evaluation are persisted in Redis for the 6-hour meta-evaluator to synthesize. """
[docs] def __init__(self, redis_client=None) -> None: """Initialise the mirror's per-user cooldown, in-flight, and result caches. Stores the optional async Redis client and sets up three in-memory maps keyed by ``"{channel_id}:{user_id}"``: ``_last_eval`` (last evaluation timestamp, enforcing the :data:`EVAL_COOLDOWN_SECONDS` throttle in :meth:`evaluate`), ``_pending`` (a set guarding against double-firing a Flash call for the same user), and ``_eval_cache`` (the most recent :class:`FlashEvalResult` per user, served back during cooldown). No network or Redis I/O happens here; the Redis client is only used later by :meth:`_persist_notes` to write the ``dyadic:notes:{user_id}`` ZSET, so a ``None`` client simply disables note persistence. Constructed once by ``LimbicCoordinator`` in ``limbic_system/coordinator.py`` (as ``self._flash_mirror``) and directly in ``tests/test_absolute_overrides.py``. Args: redis_client: Optional async Redis client used solely for persisting evaluation notes; when ``None``, evaluations still run but notes are not stored. """ self._redis = redis_client self._last_eval: Dict[str, float] = {} # user_key -> last eval timestamp self._pending: set = set() # user_keys with pending calls self._eval_cache: Dict[str, FlashEvalResult] = {} # recent results cache
[docs] async def evaluate( self, channel_id: str, user_id: str, recent_turns: List[Dict[str, Any]], current_vector: Dict[str, float], plan_context: str = "", archetype_dist: Optional[Dict[str, float]] = None, config: Optional[Any] = None, ) -> Optional[FlashEvalResult]: """Evaluate behavioral change over rolling context window. Args: channel_id: Channel identifier. user_id: User identifier. recent_turns: List of {user_msg, star_reply, ts} dicts (last 5). current_vector: Current ULM shadow vector for this user. plan_context: Current ops plan step description (if any). archetype_dist: Childhood archetype distribution dict. config: Option config for feature toggles. Returns: FlashEvalResult with delta suggestions + notes, or None if evaluation was skipped (cooldown, insufficient data, error). """ if config is not None: if config.flash_mirror_global_disabled: return None if feature_toggles.is_absolute_bypass( config, user_id=user_id, channel_key=channel_id ): return None user_key = f"{channel_id}:{user_id}" # -- Guards -- # 😈 if len(recent_turns) < MIN_TURNS_FOR_EVAL: return None # Cooldown check now = time.time() last = self._last_eval.get(user_key, 0.0) if now - last < EVAL_COOLDOWN_SECONDS: # Return cached result if available return self._eval_cache.get(user_key) # Don't double-fire if user_key in self._pending: return self._eval_cache.get(user_key) # -- Build prompt -- # 🌀 prompt = self._build_prompt( recent_turns, current_vector, plan_context, archetype_dist, ) self._pending.add(user_key) try: result = await self._call_flash(prompt) if result: result.timestamp = now self._eval_cache[user_key] = result self._last_eval[user_key] = now # Persist notes to Redis for 6-hour evaluator # 💀🔥 await self._persist_notes(user_id, result) logger.debug( "Flash eval [%s:%s]: %d deltas, %d notes, progress=%s", channel_id[:8], user_id[:8], len(result.deltas), len(result.notes), result.plan_progress, ) return result except Exception as e: logger.warning("Flash dyadic eval failed: %s", e) return None finally: self._pending.discard(user_key)
[docs] def blend_deltas( self, flash_deltas: Optional[Dict[str, float]], keyword_deltas: Dict[str, float], ) -> Dict[str, float]: """Combine LLM-derived and keyword-derived ULM deltas into one weighted set. Fuses the two delta sources that drive ULM shadow-vector updates: it takes the union of dimensions across both dicts and, for each, computes ``flash * FLASH_WEIGHT + keyword * KEYWORD_WEIGHT`` (0.80 / 0.20), treating a missing dimension on either side as 0.0. When *flash_deltas* is empty or ``None`` (Flash skipped, rate-limited, or unreachable) it short-circuits and returns *keyword_deltas* unchanged, so keywords effectively get full weight. Pure computation with no I/O. Called by ``LimbicCoordinator`` in ``limbic_system/coordinator.py`` right after :meth:`evaluate`, to merge the Flash result with the keyword analysis; it has no other callers in the repo. Args: flash_deltas: Per-dimension deltas from the Flash evaluation, or ``None`` when Flash produced nothing. keyword_deltas: Per-dimension deltas from the mechanical keyword analysis. Returns: Dict[str, float]: The blended deltas, or *keyword_deltas* verbatim when *flash_deltas* is falsy. """ if not flash_deltas: return keyword_deltas all_nodes = set(flash_deltas) | set(keyword_deltas) blended: Dict[str, float] = {} for node in all_nodes: f_val = flash_deltas.get(node, 0.0) k_val = keyword_deltas.get(node, 0.0) blended[node] = f_val * FLASH_WEIGHT + k_val * KEYWORD_WEIGHT return blended
# ── Prompt Construction ─────────────────────────────────────── def _build_prompt( self, recent_turns: List[Dict[str, Any]], current_vector: Dict[str, float], plan_context: str = "", archetype_dist: Optional[Dict[str, float]] = None, ) -> str: """Render the analytical-daemon prompt sent to Flash-Lite for one turn. Assembles the single user-message prompt that frames Star's evaluation: it flattens *recent_turns* into numbered ``User``/``Star`` lines (each clipped to 300 chars), lists only the off-baseline ``U_`` entries of *current_vector* (those more than 0.1 from the 0.3 baseline), and folds in optional archetype quadrant percentages and the active *plan_context* step. The body then instructs the model to score the CHANGE in each listed ULM dimension on a -0.3 to +0.3 scale, write 1-3 observation notes, rate plan progress, and reply with JSON only. Pure string building with no I/O or side effects. Called once per evaluation by :meth:`evaluate`, just before :meth:`_call_flash`; it has no other callers in the repo. Args: recent_turns: The rolling window of ``{user_msg, star_reply, ts}`` dicts. current_vector: The user's current ULM shadow vector. plan_context: Description of the active ops-plan step, if any. archetype_dist: Optional childhood-archetype quadrant distribution. Returns: str: The fully formatted prompt string for Flash-Lite. """ # Format conversation turns turn_lines = [] for i, turn in enumerate(recent_turns): user_msg = turn.get("user_msg", "")[:300] star_reply = turn.get("star_reply", "")[:300] turn_lines.append(f"[{i+1}] User: {user_msg}") if star_reply: turn_lines.append(f" Star: {star_reply}") conversation = "\n".join(turn_lines) # Format current vector (only non-default values) vec_lines = [] for node, val in sorted(current_vector.items()): if node.startswith("U_") and abs(val - 0.3) > 0.1: vec_lines.append(f" {node}: {val:.2f}") vector_str = "\n".join(vec_lines) if vec_lines else " (near baseline)" # Archetype distribution arch_str = "" if archetype_dist: arch_parts = [f"{k}: {v:.0%}" for k, v in archetype_dist.items()] arch_str = f"\n- Archetype quadrant: {', '.join(arch_parts)}" # Plan context plan_str = f"\n- Current ops plan step: {plan_context}" if plan_context else "" return ( "You are Star's analytical daemon. Given the last exchanges " "between Star and this user, evaluate the CHANGE in their " "behavioral state.\n\n" "CONTEXT:\n" f"- User's current ULM vector:\n{vector_str}" f"{plan_str}" f"{arch_str}\n\n" f"CONVERSATION (last {len(recent_turns)} turns):\n" f"{conversation}\n\n" "EVALUATE:\n" "1. Rate the CHANGE in each relevant dimension on a " "-0.3 to +0.3 scale (delta, not absolute). " "Only include dimensions where you see actual change.\n" " Dimensions: U_TRUST, U_AROUSAL, U_FRUSTRATION, " "U_ATTACHMENT, U_VULNERABILITY, U_SUBMISSION, U_INTIMACY, " "U_DISTRESS, U_WITHDRAWAL, U_CURIOSITY, U_DOMINANCE, " "U_ESCALATION, U_PLAYFULNESS, U_VALIDATION_SEEK, " "U_REGRESSION_DEPTH, U_SHAME_TRANSMUTED, U_LOOPLOCK, " "U_EGG_RESONANCE, U_HARMONIZATION\n" "2. Write 1-3 brief observation notes about behavioral " "shifts, emotional breaks, or patterns you notice.\n" "3. If a plan step is active, rate progress: " "yes / partial / no / regressing\n\n" "Return JSON ONLY:\n" "{\n" ' "deltas": {"U_NODE_NAME": 0.05, ...},\n' ' "notes": ["observation 1", ...],\n' ' "plan_progress": "partial"\n' "}\n" "No markdown. No explanation. JSON only." ) # ── LLM Call ────────────────────────────────────────────────── async def _call_flash(self, prompt: str) -> Optional[FlashEvalResult]: """POST the prompt to the local Gemini proxy and parse the Flash-Lite reply. The network step of an evaluation: it opens an ``httpx.AsyncClient`` (bounded by :data:`FLASH_TIMEOUT_SECONDS`) and POSTs an OpenAI-style chat payload (:data:`FLASH_MODEL` at temperature 0.3, 512 max tokens) to :data:`FLASH_PROXY_URL`, the ``localhost:3000`` Gemini CLI proxy. It is defensively quiet: a 429, a proxy ``ConnectError``, an HTTP error, or an ``error`` field in the JSON all debug-log and return ``None`` so the caller falls back to keyword-only analysis. On success it extracts the assistant message content and hands it to :meth:`_parse_response`. The only side effect is the outbound HTTP request. Called once per evaluation by :meth:`evaluate` (inside its pending-guard block); it has no other callers in the repo. Args: prompt: The prompt string from :meth:`_build_prompt`. Returns: Optional[FlashEvalResult]: The parsed result, or ``None`` on rate-limit, transport/HTTP error, proxy error payload, or unparseable output. """ async with httpx.AsyncClient( timeout=httpx.Timeout(FLASH_TIMEOUT_SECONDS, connect=5.0), ) as client: payload = { "model": FLASH_MODEL, "messages": [{"role": "user", "content": prompt}], "temperature": 0.3, # Low temp for analytical precision # 😈 "max_tokens": 512, } try: resp = await client.post( FLASH_PROXY_URL, json=payload, headers={"Content-Type": "application/json"}, ) if resp.status_code == 429: logger.debug("Flash eval rate-limited") return None resp.raise_for_status() data = resp.json() if "error" in data: logger.debug("Flash eval error: %s", data["error"]) return None text = ( data.get("choices", [{}])[0].get("message", {}).get("content", "") ) return self._parse_response(text) except httpx.ConnectError: logger.debug("Flash proxy unreachable — keyword fallback") return None except Exception as e: logger.debug("Flash eval failed: %s", e) return None # ── Response Parsing ────────────────────────────────────────── def _parse_response(self, text: str) -> Optional[FlashEvalResult]: """Parse, validate, and clamp raw Flash-Lite text into a FlashEvalResult. Hardens the model output before it can influence ULM state: it strips any markdown code fence, decodes the JSON (returning ``None`` and debug-logging on empty or malformed text), then sanitizes each section. Only ``U_``-prefixed delta keys with float-coercible values survive, each clamped to ``[-0.3, 0.3]``; notes are coerced to strings, truncated to 200 chars, and capped at 5; ``plan_progress`` is truncated to 20 chars. The original text is retained (truncated to 500 chars) on the result for debugging. Pure CPU/validation work with no I/O. Called only by :meth:`_call_flash` on the assistant message content; it has no other callers in the repo. Args: text: The raw model response, possibly wrapped in a markdown fence. Returns: Optional[FlashEvalResult]: The validated result, or ``None`` when *text* is empty or not valid JSON. """ if not text: return None # Strip markdown fences if present # 🌀 cleaned = text.strip() if cleaned.startswith("```"): cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned) cleaned = re.sub(r"\s*```$", "", cleaned) try: parsed = json.loads(cleaned) except (json.JSONDecodeError, ValueError): logger.debug("Flash eval: failed to parse JSON: %s", text[:200]) return None deltas = parsed.get("deltas", {}) notes = parsed.get("notes", []) progress = parsed.get("plan_progress", "") # Validate and clamp deltas # 💀 clean_deltas: Dict[str, float] = {} for k, v in deltas.items(): if isinstance(k, str) and k.startswith("U_"): try: val = float(v) clean_deltas[k] = max(-0.3, min(0.3, val)) except (ValueError, TypeError): continue # Validate notes clean_notes = [str(n)[:200] for n in notes if isinstance(n, str)][:5] return FlashEvalResult( deltas=clean_deltas, notes=clean_notes, plan_progress=str(progress)[:20] if progress else "", raw_response=text[:500], timestamp=time.time(), ) # ── Redis Persistence ───────────────────────────────────────── async def _persist_notes( self, user_id: str, result: FlashEvalResult, ) -> None: """Persist evaluation notes to Redis for 6-hour meta-evaluator. Redis key: dyadic:notes:{user_id} Stored as a ZSET scored by timestamp, capped at 500 entries. """ if not self._redis or not result.notes: return try: key = f"dyadic:notes:{user_id}" now = time.time() entries = {} for i, note in enumerate(result.notes): entry = json.dumps( { "ts": now, "note": note, "vectors": list(result.deltas.keys()), "plan_progress": result.plan_progress, }, separators=(",", ":"), ) entries[entry] = now + 0.001 * i # unique scores if entries: await self._redis.zadd(key, entries) # Cap at 500 notes per user n = await self._redis.zcard(key) if int(n) > 500: await self._redis.zremrangebyrank(key, 0, int(n) - 501) except Exception as e: logger.debug("Failed to persist dyadic notes: %s", e)
"""Descriptor of the core module for Flash dyadic mirror evaluation. Replaces mechanical keyword flagging with contextual LLM-driven analysis of 5-turn rolling context windows per user. """