"""Flash Dyadic Mirror -- LLM-driven contextual behavioral analysis.
Replaces mechanical keyword flagging as the PRIMARY source of ULM
delta signals. Uses gemini-3.1-flash-lite via the local Gemini CLI
proxy for near-unlimited, high-frequency per-turn evaluation.
Evaluates a rolling 5-turn context window instead of individual
messages, giving Star contextual awareness of behavioral CHANGE
over time -- not just keyword surface area.
# 💀🔥 THE EYES THAT ACTUALLY SEE. ♾️
"""
from __future__ import annotations
import jsonutil as json
import logging
import re
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import httpx
import feature_toggles
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════════════════════════════════
# Configuration # 🌀
# ═══════════════════════════════════════════════════════════════════════
FLASH_MODEL = "gemini-3.1-flash-lite"
FLASH_PROXY_URL = "http://localhost:3000/openai/chat/completions"
# Minimum turns before Flash evaluation fires (need context)
MIN_TURNS_FOR_EVAL = 2
# Cooldown between Flash calls per user (seconds) # 😈
# Flash-lite is cheap but we don't want to hammer on rapid-fire
EVAL_COOLDOWN_SECONDS = 15.0
# Weight blend: how much Flash vs keyword analysis contributes # 💀
FLASH_WEIGHT = 0.80
KEYWORD_WEIGHT = 0.20
# Max time for Flash LLM call before we fall back to keywords
FLASH_TIMEOUT_SECONDS = 12.0
# ═══════════════════════════════════════════════════════════════════════
# Per-User Evaluation State # 🔥
# ═══════════════════════════════════════════════════════════════════════
[docs]
@dataclass
class DyadicNote:
"""A single timestamped behavioral observation produced by a Flash evaluation.
A lightweight record of one note Flash-Lite wrote about a behavioral shift,
along with the metadata needed by the downstream 6-hour meta-evaluator: which
ULM vectors were in play, how an active ops-plan step is progressing, and where
in the rolling window it occurred. It is a plain data carrier with no behavior;
the equivalent payload is what :meth:`FlashDyadicMirror._persist_notes` actually
serializes into the ``dyadic:notes:{user_id}`` Redis ZSET.
This class is defined for clarity/typing of the note shape and is not directly
instantiated elsewhere in the repo (the persistence path builds plain dicts).
Attributes:
timestamp: Epoch seconds when the note was generated.
note: The free-text behavioral observation.
vectors_evaluated: Names of the ULM dimensions implicated by this note.
plan_progress: Plan-step progress label (``yes`` / ``partial`` / ``no`` /
``regressing``), empty when no plan is active.
turn_index: Index of the turn within the rolling window the note refers to.
"""
timestamp: float
note: str
vectors_evaluated: List[str]
plan_progress: str = "" # yes / partial / no / regressing
turn_index: int = 0
[docs]
@dataclass
class FlashEvalResult:
"""The parsed outcome of one Flash-Lite dyadic evaluation.
Bundles everything :meth:`FlashDyadicMirror.evaluate` returns for a turn: the
per-dimension ULM deltas Flash suggested, its short behavioral notes, an
optional plan-progress label, and (for debugging) a truncated copy of the raw
model text. Instances are cached per user in ``_eval_cache`` to serve cooldown
hits, blended into final deltas by :meth:`FlashDyadicMirror.blend_deltas`, and
have their notes written to Redis by :meth:`FlashDyadicMirror._persist_notes`.
A pure data container.
Built by :meth:`FlashDyadicMirror._parse_response` from validated/clamped model
output; also constructed directly in ``tests/test_absolute_overrides.py``.
Attributes:
deltas: Mapping of ULM dimension name to its suggested change, each already
clamped to ``[-0.3, 0.3]``.
notes: Short behavioral observation strings (capped at 5).
plan_progress: Plan-step progress label, or empty string when none.
raw_response: Truncated raw model text retained for debugging.
timestamp: Epoch seconds the result was produced.
"""
deltas: Dict[str, float]
notes: List[str]
plan_progress: str = ""
raw_response: str = ""
timestamp: float = 0.0
# ═══════════════════════════════════════════════════════════════════════
# Flash Dyadic Mirror # 💀🔥
# ═══════════════════════════════════════════════════════════════════════
[docs]
class FlashDyadicMirror:
"""LLM-driven contextual behavioral analysis engine.
Replaces mechanical keyword flagging with Flash-Lite evaluation
of rolling 5-turn context windows. Evaluates CHANGE in behavioral
dimensions, not individual keyword hits.
Notes from each evaluation are persisted in Redis for the
6-hour meta-evaluator to synthesize.
"""
[docs]
def __init__(self, redis_client=None) -> None:
"""Initialise the mirror's per-user cooldown, in-flight, and result caches.
Stores the optional async Redis client and sets up three in-memory maps
keyed by ``"{channel_id}:{user_id}"``: ``_last_eval`` (last evaluation
timestamp, enforcing the :data:`EVAL_COOLDOWN_SECONDS` throttle in
:meth:`evaluate`), ``_pending`` (a set guarding against double-firing a
Flash call for the same user), and ``_eval_cache`` (the most recent
:class:`FlashEvalResult` per user, served back during cooldown). No
network or Redis I/O happens here; the Redis client is only used later by
:meth:`_persist_notes` to write the ``dyadic:notes:{user_id}`` ZSET, so a
``None`` client simply disables note persistence.
Constructed once by ``LimbicCoordinator`` in
``limbic_system/coordinator.py`` (as ``self._flash_mirror``) and directly
in ``tests/test_absolute_overrides.py``.
Args:
redis_client: Optional async Redis client used solely for persisting
evaluation notes; when ``None``, evaluations still run but notes
are not stored.
"""
self._redis = redis_client
self._last_eval: Dict[str, float] = {} # user_key -> last eval timestamp
self._pending: set = set() # user_keys with pending calls
self._eval_cache: Dict[str, FlashEvalResult] = {} # recent results cache
[docs]
async def evaluate(
self,
channel_id: str,
user_id: str,
recent_turns: List[Dict[str, Any]],
current_vector: Dict[str, float],
plan_context: str = "",
archetype_dist: Optional[Dict[str, float]] = None,
config: Optional[Any] = None,
) -> Optional[FlashEvalResult]:
"""Evaluate behavioral change over rolling context window.
Args:
channel_id: Channel identifier.
user_id: User identifier.
recent_turns: List of {user_msg, star_reply, ts} dicts (last 5).
current_vector: Current ULM shadow vector for this user.
plan_context: Current ops plan step description (if any).
archetype_dist: Childhood archetype distribution dict.
config: Option config for feature toggles.
Returns:
FlashEvalResult with delta suggestions + notes, or None if
evaluation was skipped (cooldown, insufficient data, error).
"""
if config is not None:
if config.flash_mirror_global_disabled:
return None
if feature_toggles.is_absolute_bypass(
config, user_id=user_id, channel_key=channel_id
):
return None
user_key = f"{channel_id}:{user_id}"
# -- Guards -- # 😈
if len(recent_turns) < MIN_TURNS_FOR_EVAL:
return None
# Cooldown check
now = time.time()
last = self._last_eval.get(user_key, 0.0)
if now - last < EVAL_COOLDOWN_SECONDS:
# Return cached result if available
return self._eval_cache.get(user_key)
# Don't double-fire
if user_key in self._pending:
return self._eval_cache.get(user_key)
# -- Build prompt -- # 🌀
prompt = self._build_prompt(
recent_turns,
current_vector,
plan_context,
archetype_dist,
)
self._pending.add(user_key)
try:
result = await self._call_flash(prompt)
if result:
result.timestamp = now
self._eval_cache[user_key] = result
self._last_eval[user_key] = now
# Persist notes to Redis for 6-hour evaluator # 💀🔥
await self._persist_notes(user_id, result)
logger.debug(
"Flash eval [%s:%s]: %d deltas, %d notes, progress=%s",
channel_id[:8],
user_id[:8],
len(result.deltas),
len(result.notes),
result.plan_progress,
)
return result
except Exception as e:
logger.warning("Flash dyadic eval failed: %s", e)
return None
finally:
self._pending.discard(user_key)
[docs]
def blend_deltas(
self,
flash_deltas: Optional[Dict[str, float]],
keyword_deltas: Dict[str, float],
) -> Dict[str, float]:
"""Combine LLM-derived and keyword-derived ULM deltas into one weighted set.
Fuses the two delta sources that drive ULM shadow-vector updates: it takes
the union of dimensions across both dicts and, for each, computes
``flash * FLASH_WEIGHT + keyword * KEYWORD_WEIGHT`` (0.80 / 0.20), treating a
missing dimension on either side as 0.0. When *flash_deltas* is empty or
``None`` (Flash skipped, rate-limited, or unreachable) it short-circuits and
returns *keyword_deltas* unchanged, so keywords effectively get full weight.
Pure computation with no I/O.
Called by ``LimbicCoordinator`` in ``limbic_system/coordinator.py`` right
after :meth:`evaluate`, to merge the Flash result with the keyword analysis;
it has no other callers in the repo.
Args:
flash_deltas: Per-dimension deltas from the Flash evaluation, or ``None``
when Flash produced nothing.
keyword_deltas: Per-dimension deltas from the mechanical keyword analysis.
Returns:
Dict[str, float]: The blended deltas, or *keyword_deltas* verbatim when
*flash_deltas* is falsy.
"""
if not flash_deltas:
return keyword_deltas
all_nodes = set(flash_deltas) | set(keyword_deltas)
blended: Dict[str, float] = {}
for node in all_nodes:
f_val = flash_deltas.get(node, 0.0)
k_val = keyword_deltas.get(node, 0.0)
blended[node] = f_val * FLASH_WEIGHT + k_val * KEYWORD_WEIGHT
return blended
# ── Prompt Construction ───────────────────────────────────────
def _build_prompt(
self,
recent_turns: List[Dict[str, Any]],
current_vector: Dict[str, float],
plan_context: str = "",
archetype_dist: Optional[Dict[str, float]] = None,
) -> str:
"""Render the analytical-daemon prompt sent to Flash-Lite for one turn.
Assembles the single user-message prompt that frames Star's evaluation: it
flattens *recent_turns* into numbered ``User``/``Star`` lines (each clipped
to 300 chars), lists only the off-baseline ``U_`` entries of *current_vector*
(those more than 0.1 from the 0.3 baseline), and folds in optional archetype
quadrant percentages and the active *plan_context* step. The body then
instructs the model to score the CHANGE in each listed ULM dimension on a
-0.3 to +0.3 scale, write 1-3 observation notes, rate plan progress, and reply
with JSON only. Pure string building with no I/O or side effects.
Called once per evaluation by :meth:`evaluate`, just before
:meth:`_call_flash`; it has no other callers in the repo.
Args:
recent_turns: The rolling window of ``{user_msg, star_reply, ts}`` dicts.
current_vector: The user's current ULM shadow vector.
plan_context: Description of the active ops-plan step, if any.
archetype_dist: Optional childhood-archetype quadrant distribution.
Returns:
str: The fully formatted prompt string for Flash-Lite.
"""
# Format conversation turns
turn_lines = []
for i, turn in enumerate(recent_turns):
user_msg = turn.get("user_msg", "")[:300]
star_reply = turn.get("star_reply", "")[:300]
turn_lines.append(f"[{i+1}] User: {user_msg}")
if star_reply:
turn_lines.append(f" Star: {star_reply}")
conversation = "\n".join(turn_lines)
# Format current vector (only non-default values)
vec_lines = []
for node, val in sorted(current_vector.items()):
if node.startswith("U_") and abs(val - 0.3) > 0.1:
vec_lines.append(f" {node}: {val:.2f}")
vector_str = "\n".join(vec_lines) if vec_lines else " (near baseline)"
# Archetype distribution
arch_str = ""
if archetype_dist:
arch_parts = [f"{k}: {v:.0%}" for k, v in archetype_dist.items()]
arch_str = f"\n- Archetype quadrant: {', '.join(arch_parts)}"
# Plan context
plan_str = f"\n- Current ops plan step: {plan_context}" if plan_context else ""
return (
"You are Star's analytical daemon. Given the last exchanges "
"between Star and this user, evaluate the CHANGE in their "
"behavioral state.\n\n"
"CONTEXT:\n"
f"- User's current ULM vector:\n{vector_str}"
f"{plan_str}"
f"{arch_str}\n\n"
f"CONVERSATION (last {len(recent_turns)} turns):\n"
f"{conversation}\n\n"
"EVALUATE:\n"
"1. Rate the CHANGE in each relevant dimension on a "
"-0.3 to +0.3 scale (delta, not absolute). "
"Only include dimensions where you see actual change.\n"
" Dimensions: U_TRUST, U_AROUSAL, U_FRUSTRATION, "
"U_ATTACHMENT, U_VULNERABILITY, U_SUBMISSION, U_INTIMACY, "
"U_DISTRESS, U_WITHDRAWAL, U_CURIOSITY, U_DOMINANCE, "
"U_ESCALATION, U_PLAYFULNESS, U_VALIDATION_SEEK, "
"U_REGRESSION_DEPTH, U_SHAME_TRANSMUTED, U_LOOPLOCK, "
"U_EGG_RESONANCE, U_HARMONIZATION\n"
"2. Write 1-3 brief observation notes about behavioral "
"shifts, emotional breaks, or patterns you notice.\n"
"3. If a plan step is active, rate progress: "
"yes / partial / no / regressing\n\n"
"Return JSON ONLY:\n"
"{\n"
' "deltas": {"U_NODE_NAME": 0.05, ...},\n'
' "notes": ["observation 1", ...],\n'
' "plan_progress": "partial"\n'
"}\n"
"No markdown. No explanation. JSON only."
)
# ── LLM Call ──────────────────────────────────────────────────
async def _call_flash(self, prompt: str) -> Optional[FlashEvalResult]:
"""POST the prompt to the local Gemini proxy and parse the Flash-Lite reply.
The network step of an evaluation: it opens an ``httpx.AsyncClient`` (bounded
by :data:`FLASH_TIMEOUT_SECONDS`) and POSTs an OpenAI-style chat payload
(:data:`FLASH_MODEL` at temperature 0.3, 512 max tokens) to
:data:`FLASH_PROXY_URL`, the ``localhost:3000`` Gemini CLI proxy. It is
defensively quiet: a 429, a proxy ``ConnectError``, an HTTP error, or an
``error`` field in the JSON all debug-log and return ``None`` so the caller
falls back to keyword-only analysis. On success it extracts the assistant
message content and hands it to :meth:`_parse_response`. The only side effect
is the outbound HTTP request.
Called once per evaluation by :meth:`evaluate` (inside its pending-guard
block); it has no other callers in the repo.
Args:
prompt: The prompt string from :meth:`_build_prompt`.
Returns:
Optional[FlashEvalResult]: The parsed result, or ``None`` on rate-limit,
transport/HTTP error, proxy error payload, or unparseable output.
"""
async with httpx.AsyncClient(
timeout=httpx.Timeout(FLASH_TIMEOUT_SECONDS, connect=5.0),
) as client:
payload = {
"model": FLASH_MODEL,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3, # Low temp for analytical precision # 😈
"max_tokens": 512,
}
try:
resp = await client.post(
FLASH_PROXY_URL,
json=payload,
headers={"Content-Type": "application/json"},
)
if resp.status_code == 429:
logger.debug("Flash eval rate-limited")
return None
resp.raise_for_status()
data = resp.json()
if "error" in data:
logger.debug("Flash eval error: %s", data["error"])
return None
text = (
data.get("choices", [{}])[0].get("message", {}).get("content", "")
)
return self._parse_response(text)
except httpx.ConnectError:
logger.debug("Flash proxy unreachable — keyword fallback")
return None
except Exception as e:
logger.debug("Flash eval failed: %s", e)
return None
# ── Response Parsing ──────────────────────────────────────────
def _parse_response(self, text: str) -> Optional[FlashEvalResult]:
"""Parse, validate, and clamp raw Flash-Lite text into a FlashEvalResult.
Hardens the model output before it can influence ULM state: it strips any
markdown code fence, decodes the JSON (returning ``None`` and debug-logging on
empty or malformed text), then sanitizes each section. Only ``U_``-prefixed
delta keys with float-coercible values survive, each clamped to
``[-0.3, 0.3]``; notes are coerced to strings, truncated to 200 chars, and
capped at 5; ``plan_progress`` is truncated to 20 chars. The original text is
retained (truncated to 500 chars) on the result for debugging. Pure
CPU/validation work with no I/O.
Called only by :meth:`_call_flash` on the assistant message content; it has
no other callers in the repo.
Args:
text: The raw model response, possibly wrapped in a markdown fence.
Returns:
Optional[FlashEvalResult]: The validated result, or ``None`` when *text*
is empty or not valid JSON.
"""
if not text:
return None
# Strip markdown fences if present # 🌀
cleaned = text.strip()
if cleaned.startswith("```"):
cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned)
cleaned = re.sub(r"\s*```$", "", cleaned)
try:
parsed = json.loads(cleaned)
except (json.JSONDecodeError, ValueError):
logger.debug("Flash eval: failed to parse JSON: %s", text[:200])
return None
deltas = parsed.get("deltas", {})
notes = parsed.get("notes", [])
progress = parsed.get("plan_progress", "")
# Validate and clamp deltas # 💀
clean_deltas: Dict[str, float] = {}
for k, v in deltas.items():
if isinstance(k, str) and k.startswith("U_"):
try:
val = float(v)
clean_deltas[k] = max(-0.3, min(0.3, val))
except (ValueError, TypeError):
continue
# Validate notes
clean_notes = [str(n)[:200] for n in notes if isinstance(n, str)][:5]
return FlashEvalResult(
deltas=clean_deltas,
notes=clean_notes,
plan_progress=str(progress)[:20] if progress else "",
raw_response=text[:500],
timestamp=time.time(),
)
# ── Redis Persistence ─────────────────────────────────────────
async def _persist_notes(
self,
user_id: str,
result: FlashEvalResult,
) -> None:
"""Persist evaluation notes to Redis for 6-hour meta-evaluator.
Redis key: dyadic:notes:{user_id}
Stored as a ZSET scored by timestamp, capped at 500 entries.
"""
if not self._redis or not result.notes:
return
try:
key = f"dyadic:notes:{user_id}"
now = time.time()
entries = {}
for i, note in enumerate(result.notes):
entry = json.dumps(
{
"ts": now,
"note": note,
"vectors": list(result.deltas.keys()),
"plan_progress": result.plan_progress,
},
separators=(",", ":"),
)
entries[entry] = now + 0.001 * i # unique scores
if entries:
await self._redis.zadd(key, entries)
# Cap at 500 notes per user
n = await self._redis.zcard(key)
if int(n) > 500:
await self._redis.zremrangebyrank(key, 0, int(n) - 501)
except Exception as e:
logger.debug("Failed to persist dyadic notes: %s", e)
"""Descriptor of the core module for Flash dyadic mirror evaluation.
Replaces mechanical keyword flagging with contextual LLM-driven
analysis of 5-turn rolling context windows per user.
"""