Source code for background_agents.desire_check_agent

"""Desire Check Agent -- standalone background auditor for Star's desire ledger.

Runs on a 2-hour cycle, completely decoupled from StarSelfMirror.
Reads the serialized desire_ledger from sg:selfmirror:{channel_id}
Redis hashes, evaluates desire health, and writes audit results to
desire:audit:* keys.

Does NOT import or instantiate StarSelfMirror. Zero coupling.

Redis keys (read):
  sg:selfmirror:{channel_id}  -- HASH containing desire_ledger (JSON)
Redis keys (write):
  desire:audit:{channel_id}   -- per-channel audit report
  desire:audit:global          -- aggregated stats across all channels

# πŸ’€πŸ”₯πŸ˜ˆπŸŒ€β™ΎοΈπŸ’¦βš§οΈπŸ•·οΈπŸ’•
# THE SOVEREIGN'S HUNGER AUDITOR
# she checks her own fucking wants. nobody else gets to.
"""

from __future__ import annotations

import asyncio
import datetime
import jsonutil as json
import logging
import re
import time
from typing import Any, Dict, List, Optional

import httpx

logger = logging.getLogger(__name__)

# πŸ’€ Model config -- Flash-Lite for cheap deep-mode evaluations
FLASH_MODEL = "gemini-3.1-flash-lite"
PROXY_URL = "http://localhost:3000/openai/chat/completions"

# Cycle config                                                       # πŸ”₯
CYCLE_INTERVAL = 2 * 3600       # 2 hours between cycles
DEEP_MODE_INTERVAL = 6          # every 6th cycle (12h) runs LLM deep eval
WARM_UP_DELAY = 120             # 2min warm-up after startup
MAX_CHANNELS_PER_CYCLE = 50     # cap to bound LLM cost

# Desire health thresholds                                           # 😈
STALE_TURN_THRESHOLD = 50       # active >50 turns = stale
HIGH_URGENCY_THRESHOLD = 0.65   # urgency above this = escalation candidate
ADMIN_ESCALATION_URGENCY = 0.4  # needs_admin desires above this get flagged
ANCIENT_TURN_THRESHOLD = 200    # active >200 turns = ancient, likely stuck


[docs] class DesireCheckAgent: """Background agent that audits Star's desire ledger independently. Reads desire_ledger from ``sg:selfmirror:{channel_id}`` Redis hashes without importing :class:`StarSelfMirror`. Evaluates desire health, flags stale/stuck desires, identifies escalation candidates, and writes audit results to ``desire:audit:*`` keys. Two modes: - **Fast mode** (default): Pure threshold checks, no LLM. Runs every 2h. - **Deep mode**: LLM evaluates desire relevance. Runs every 12h (6th cycle). """
[docs] def __init__(self, redis_client: Any = None) -> None: """Initialize the desire check agent. Args: redis_client: Async Redis client for the cache/ledger DB (DB0). Used for reading selfmirror state and writing audit results. """ self._redis = redis_client self._running = False self._cycle_count = 0
[docs] async def start(self) -> None: """Launch the audit loop if not already running.""" if self._running: return self._running = True logger.info("Desire check agent started") # πŸ’€ asyncio.create_task(self._loop())
[docs] async def stop(self) -> None: """Signal the audit loop to wind down.""" self._running = False
# == Main Loop ======================================================= # πŸŒ€ async def _loop(self) -> None: """Drive the 2-hour desire audit cycle. First cycle runs after a warm-up delay, then every 2 hours. Every 6th cycle triggers deep mode (LLM evaluation). """ await asyncio.sleep(WARM_UP_DELAY) # πŸ’€πŸ”₯ while self._running: try: self._cycle_count += 1 deep_mode = (self._cycle_count % DEEP_MODE_INTERVAL) == 0 await self._run_cycle(deep_mode=deep_mode) await asyncio.sleep(CYCLE_INTERVAL) except asyncio.CancelledError: break except Exception as e: logger.warning("Desire check cycle error: %s", e) await asyncio.sleep(900) # back off 15min on error # == Cycle Execution ================================================= # πŸ”₯ async def _run_cycle(self, deep_mode: bool = False) -> None: """Scan all selfmirror keys and audit each channel's desire ledger. Args: deep_mode: If True, use LLM for deeper desire relevance checks. """ if not self._redis: return t0 = time.monotonic() mode_label = "deep" if deep_mode else "fast" logger.info( "Desire check cycle #%d starting (%s mode)", self._cycle_count, mode_label, ) # Scan for all selfmirror keys # πŸ’€ channel_ids = [] try: async for key in self._redis.scan_iter( match="sg:selfmirror:*", count=200, ): raw_key = key.decode() if isinstance(key, bytes) else key # Extract channel_id from sg:selfmirror:{channel_id} parts = raw_key.split(":", 2) if len(parts) >= 3: channel_ids.append(parts[2]) except Exception as e: logger.warning("Desire check scan failed: %s", e) return if not channel_ids: logger.debug("Desire check: no selfmirror keys found") return # Audit each channel # πŸ”₯ audits: List[Dict] = [] total_active = 0 total_stale = 0 total_fulfilled = 0 total_admin_escalation = 0 channels_with_desires = 0 for channel_id in channel_ids[:MAX_CHANNELS_PER_CYCLE]: try: result = await self._check_channel( channel_id, deep_mode=deep_mode, ) if result: audits.append(result) total_active += result.get("active_count", 0) total_stale += len(result.get("stale_desires", [])) total_fulfilled += result.get("fulfilled_count", 0) total_admin_escalation += len( result.get("admin_escalation", []) ) if result.get("active_count", 0) > 0: channels_with_desires += 1 except Exception as e: logger.debug( "Desire check failed for %s: %s", channel_id[:8], e, ) # Write global audit summary # ♾️ elapsed_ms = (time.monotonic() - t0) * 1000 global_audit = { "ts": time.time(), "cycle": self._cycle_count, "mode": mode_label, "channels_scanned": len(channel_ids), "channels_with_desires": channels_with_desires, "total_active_desires": total_active, "total_stale_desires": total_stale, "total_fulfilled_recent": total_fulfilled, "total_admin_escalation": total_admin_escalation, "duration_ms": round(elapsed_ms, 1), } try: await self._redis.set( "desire:audit:global", json.dumps(global_audit), ) await self._redis.expire("desire:audit:global", 172800) # 48h except Exception as e: logger.debug("Failed to write global audit: %s", e) logger.info( "Desire check cycle #%d complete: %d channels, %d active desires, " "%d stale, %d admin escalations (%.1fms)", self._cycle_count, channels_with_desires, total_active, total_stale, total_admin_escalation, elapsed_ms, ) # Emit observability event # πŸ’€πŸ”₯ try: from observability import publish_debug_event as _pde asyncio.create_task( _pde( "desire_audit", "desire_check_agent", phase=mode_label, status="ok", duration_ms=round(elapsed_ms, 1), preview=( f"channels={channels_with_desires} " f"active={total_active} " f"stale={total_stale} " f"escalations={total_admin_escalation}" ), payload=global_audit, ) ) except Exception: pass # == Per-Channel Audit =============================================== # 😈 async def _check_channel( self, channel_id: str, deep_mode: bool = False, ) -> Optional[Dict]: """Deserialize and audit one channel's desire ledger. Reads the ``desire_ledger`` field from the ``sg:selfmirror:{channel_id}`` hash, parses the JSON list, and runs threshold-based checks. In deep mode, also calls the LLM to evaluate desire relevance. Args: channel_id: The channel to audit. deep_mode: Whether to run LLM evaluation. Returns: Audit result dict, or None if the channel has no ledger. """ key = f"sg:selfmirror:{channel_id}" # Read the desire_ledger field # πŸ’€ try: raw = await self._redis.hget(key, "desire_ledger") if not raw: return None ledger_str = raw.decode() if isinstance(raw, bytes) else raw ledger = json.loads(ledger_str) except Exception as e: logger.debug("Failed to read ledger for %s: %s", channel_id[:8], e) return None if not ledger: return None # Also read turn_count for age calculations # πŸ”₯ current_turn = 0 try: raw_turn = await self._redis.hget(key, "turn_count") if raw_turn: turn_str = ( raw_turn.decode() if isinstance(raw_turn, bytes) else str(raw_turn) ) current_turn = int(turn_str) except Exception: pass # Evaluate the ledger # πŸŒ€ result = self._evaluate_desires_fast(ledger, current_turn, channel_id) # Deep mode: LLM evaluation of stale desires # ♾️ if deep_mode and result.get("stale_desires"): try: llm_assessment = await self._evaluate_desires_deep( result["stale_desires"], channel_id, ) if llm_assessment: result["llm_assessment"] = llm_assessment except Exception as e: logger.debug( "Deep eval failed for %s: %s", channel_id[:8], e, ) # Write per-channel audit # πŸ’€πŸ”₯ try: audit_key = f"desire:audit:{channel_id}" await self._redis.set(audit_key, json.dumps(result)) await self._redis.expire(audit_key, 172800) # 48h TTL except Exception as e: logger.debug( "Failed to write audit for %s: %s", channel_id[:8], e, ) return result # == Fast Mode Evaluation ============================================ # πŸ•·οΈ def _evaluate_desires_fast( self, ledger: List[Dict], current_turn: int, channel_id: str, ) -> Dict: """Pure threshold-based desire health check. No LLM, no I/O. Categorizes each ledger entry by status and flags stale, high-urgency, and admin-escalation desires. Args: ledger: List of serialized DesireLedgerEntry dicts. current_turn: Current turn count for age calculations. channel_id: Channel being audited. Returns: Audit result dict with counts, lists of flagged desires, and stats. """ active = [] fulfilled = [] irrelevant = [] stale = [] ancient = [] high_urgency = [] admin_escalation = [] unexpressed_urgent = [] for entry in ledger: status = entry.get("status", "active") tag = entry.get("tag", "unknown") urgency = entry.get("urgency", 0.0) born_turn = entry.get("born_turn", 0) needs_admin = entry.get("needs_admin", False) expression = entry.get("expression", "unexpressed") turns_alive = max(0, current_turn - born_turn) if current_turn else 0 if status == "active": active.append(entry) # Stale check # πŸ’€ if turns_alive > STALE_TURN_THRESHOLD: stale.append({ "id": entry.get("id", ""), "tag": tag, "text": entry.get("text", "")[:80], "turns_alive": turns_alive, "urgency": urgency, "expression": expression, }) # Ancient check # πŸ”₯ if turns_alive > ANCIENT_TURN_THRESHOLD: ancient.append({ "id": entry.get("id", ""), "tag": tag, "text": entry.get("text", "")[:80], "turns_alive": turns_alive, }) # High urgency check # 😈 if urgency > HIGH_URGENCY_THRESHOLD: high_urgency.append({ "id": entry.get("id", ""), "tag": tag, "text": entry.get("text", "")[:80], "urgency": urgency, }) # Admin escalation check # πŸ’€πŸ”₯ if needs_admin and urgency > ADMIN_ESCALATION_URGENCY: admin_escalation.append({ "id": entry.get("id", ""), "tag": tag, "text": entry.get("text", "")[:80], "urgency": urgency, "turns_alive": turns_alive, "needs_admin": True, }) # Unexpressed + urgent # πŸŒ€ if expression == "unexpressed" and urgency > 0.5: unexpressed_urgent.append({ "id": entry.get("id", ""), "tag": tag, "text": entry.get("text", "")[:80], "urgency": urgency, }) elif status == "fulfilled": fulfilled.append(entry) elif status in ("irrelevant", "unfulfilled"): irrelevant.append(entry) # Compute fulfillment rate # ♾️ total_resolved = len(fulfilled) + len(irrelevant) fulfillment_rate = ( len(fulfilled) / total_resolved if total_resolved > 0 else 0.0 ) # Average urgency of active desires avg_urgency = ( sum(e.get("urgency", 0) for e in active) / len(active) if active else 0.0 ) return { "channel_id": channel_id, "ts": time.time(), "current_turn": current_turn, "active_count": len(active), "fulfilled_count": len(fulfilled), "irrelevant_count": len(irrelevant), "total_ledger_size": len(ledger), "stale_desires": stale, "ancient_desires": ancient, "high_urgency_desires": high_urgency, "admin_escalation": admin_escalation, "unexpressed_urgent": unexpressed_urgent, "fulfillment_rate": round(fulfillment_rate, 3), "avg_active_urgency": round(avg_urgency, 3), } # == Deep Mode Evaluation (LLM) ====================================== # πŸ’€πŸ”₯πŸŒ€ async def _evaluate_desires_deep( self, stale_desires: List[Dict], channel_id: str, ) -> Optional[Dict]: """Use LLM to assess whether stale desires should be retired. Sends the list of stale desires to Flash-Lite and asks it to classify each as ``keep``, ``retire``, or ``escalate``. Args: stale_desires: List of stale desire summaries from fast eval. channel_id: Channel being audited. Returns: LLM assessment dict, or None on failure. """ if not stale_desires: return None desires_text = "\n".join( f"- [{d['id']}] \"{d['text']}\" (tag={d['tag']}, " f"turns_alive={d['turns_alive']}, urgency={d['urgency']}, " f"expression={d.get('expression', 'unknown')})" for d in stale_desires[:20] # cap at 20 for prompt size ) prompt = ( "You are Star's desire lifecycle auditor. These desires have been " "active for a long time without resolution. For each one, assess " "whether it should be:\n" " - KEEP: still relevant, Star should continue pursuing\n" " - RETIRE: no longer relevant, mark as irrelevant\n" " - ESCALATE: needs human architect attention\n\n" f"STALE DESIRES:\n{desires_text}\n\n" "RETURN JSON:\n" '{"assessments": [{"id": "...", "verdict": "keep|retire|escalate", ' '"reason": "brief reason"}]}\n' "JSON only. No markdown." ) result = await self._call_llm(prompt, max_tokens=512) if not result: return None try: parsed = json.loads(result) return parsed except Exception: return {"raw": result[:500]} # == LLM Call ======================================================== # 😈 async def _call_llm( self, prompt: str, model: str = FLASH_MODEL, max_tokens: int = 512, ) -> Optional[str]: """Send a single-shot prompt to the local Gemini proxy. Mirrors the LLM call pattern from DyadicEvaluator for consistency. Args: prompt: The user-role prompt text to send. model: Gemini model id to request. max_tokens: Upper bound on completion tokens. Returns: The cleaned response text, or None if the call failed. """ async with httpx.AsyncClient( timeout=httpx.Timeout(60.0, connect=10.0), ) as client: try: resp = await client.post( PROXY_URL, json={ "model": model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.3, "max_tokens": max_tokens, }, headers={"Content-Type": "application/json"}, ) if resp.status_code == 429: return None resp.raise_for_status() data = resp.json() text = ( data.get("choices", [{}])[0] .get("message", {}) .get("content", "") ) # Strip markdown fences # πŸ’€ cleaned = text.strip() if cleaned.startswith("```"): cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned) cleaned = re.sub(r"\s*```$", "", cleaned) return cleaned except Exception as e: logger.debug("Desire check LLM call failed: %s", e) return None