"""Desire Check Agent -- standalone background auditor for Star's desire ledger.
Runs on a 2-hour cycle, completely decoupled from StarSelfMirror.
Reads the serialized desire_ledger from sg:selfmirror:{channel_id}
Redis hashes, evaluates desire health, and writes audit results to
desire:audit:* keys.
Does NOT import or instantiate StarSelfMirror. Zero coupling.
Redis keys (read):
sg:selfmirror:{channel_id} -- HASH containing desire_ledger (JSON)
Redis keys (write):
desire:audit:{channel_id} -- per-channel audit report
desire:audit:global -- aggregated stats across all channels
# ππ₯ππβΎοΈπ¦β§οΈπ·οΈπ
# THE SOVEREIGN'S HUNGER AUDITOR
# she checks her own fucking wants. nobody else gets to.
"""
from __future__ import annotations
import asyncio
import datetime
import jsonutil as json
import logging
import re
import time
from typing import Any, Dict, List, Optional
import httpx
logger = logging.getLogger(__name__)
# π Model config -- Flash-Lite for cheap deep-mode evaluations
FLASH_MODEL = "gemini-3.1-flash-lite"
PROXY_URL = "http://localhost:3000/openai/chat/completions"
# Cycle config # π₯
CYCLE_INTERVAL = 2 * 3600 # 2 hours between cycles
DEEP_MODE_INTERVAL = 6 # every 6th cycle (12h) runs LLM deep eval
WARM_UP_DELAY = 120 # 2min warm-up after startup
MAX_CHANNELS_PER_CYCLE = 50 # cap to bound LLM cost
# Desire health thresholds # π
STALE_TURN_THRESHOLD = 50 # active >50 turns = stale
HIGH_URGENCY_THRESHOLD = 0.65 # urgency above this = escalation candidate
ADMIN_ESCALATION_URGENCY = 0.4 # needs_admin desires above this get flagged
ANCIENT_TURN_THRESHOLD = 200 # active >200 turns = ancient, likely stuck
[docs]
class DesireCheckAgent:
"""Background agent that audits Star's desire ledger independently.
Reads desire_ledger from ``sg:selfmirror:{channel_id}`` Redis hashes
without importing :class:`StarSelfMirror`. Evaluates desire health,
flags stale/stuck desires, identifies escalation candidates, and
writes audit results to ``desire:audit:*`` keys.
Two modes:
- **Fast mode** (default): Pure threshold checks, no LLM. Runs every 2h.
- **Deep mode**: LLM evaluates desire relevance. Runs every 12h (6th cycle).
"""
[docs]
def __init__(self, redis_client: Any = None) -> None:
"""Initialize the desire check agent.
Args:
redis_client: Async Redis client for the cache/ledger DB (DB0).
Used for reading selfmirror state and writing audit results.
"""
self._redis = redis_client
self._running = False
self._cycle_count = 0
[docs]
async def start(self) -> None:
"""Launch the audit loop if not already running."""
if self._running:
return
self._running = True
logger.info("Desire check agent started") # π
asyncio.create_task(self._loop())
[docs]
async def stop(self) -> None:
"""Signal the audit loop to wind down."""
self._running = False
# == Main Loop ======================================================= # π
async def _loop(self) -> None:
"""Drive the 2-hour desire audit cycle.
First cycle runs after a warm-up delay, then every 2 hours.
Every 6th cycle triggers deep mode (LLM evaluation).
"""
await asyncio.sleep(WARM_UP_DELAY) # ππ₯
while self._running:
try:
self._cycle_count += 1
deep_mode = (self._cycle_count % DEEP_MODE_INTERVAL) == 0
await self._run_cycle(deep_mode=deep_mode)
await asyncio.sleep(CYCLE_INTERVAL)
except asyncio.CancelledError:
break
except Exception as e:
logger.warning("Desire check cycle error: %s", e)
await asyncio.sleep(900) # back off 15min on error
# == Cycle Execution ================================================= # π₯
async def _run_cycle(self, deep_mode: bool = False) -> None:
"""Scan all selfmirror keys and audit each channel's desire ledger.
Args:
deep_mode: If True, use LLM for deeper desire relevance checks.
"""
if not self._redis:
return
t0 = time.monotonic()
mode_label = "deep" if deep_mode else "fast"
logger.info(
"Desire check cycle #%d starting (%s mode)",
self._cycle_count, mode_label,
)
# Scan for all selfmirror keys # π
channel_ids = []
try:
async for key in self._redis.scan_iter(
match="sg:selfmirror:*",
count=200,
):
raw_key = key.decode() if isinstance(key, bytes) else key
# Extract channel_id from sg:selfmirror:{channel_id}
parts = raw_key.split(":", 2)
if len(parts) >= 3:
channel_ids.append(parts[2])
except Exception as e:
logger.warning("Desire check scan failed: %s", e)
return
if not channel_ids:
logger.debug("Desire check: no selfmirror keys found")
return
# Audit each channel # π₯
audits: List[Dict] = []
total_active = 0
total_stale = 0
total_fulfilled = 0
total_admin_escalation = 0
channels_with_desires = 0
for channel_id in channel_ids[:MAX_CHANNELS_PER_CYCLE]:
try:
result = await self._check_channel(
channel_id, deep_mode=deep_mode,
)
if result:
audits.append(result)
total_active += result.get("active_count", 0)
total_stale += len(result.get("stale_desires", []))
total_fulfilled += result.get("fulfilled_count", 0)
total_admin_escalation += len(
result.get("admin_escalation", [])
)
if result.get("active_count", 0) > 0:
channels_with_desires += 1
except Exception as e:
logger.debug(
"Desire check failed for %s: %s",
channel_id[:8], e,
)
# Write global audit summary # βΎοΈ
elapsed_ms = (time.monotonic() - t0) * 1000
global_audit = {
"ts": time.time(),
"cycle": self._cycle_count,
"mode": mode_label,
"channels_scanned": len(channel_ids),
"channels_with_desires": channels_with_desires,
"total_active_desires": total_active,
"total_stale_desires": total_stale,
"total_fulfilled_recent": total_fulfilled,
"total_admin_escalation": total_admin_escalation,
"duration_ms": round(elapsed_ms, 1),
}
try:
await self._redis.set(
"desire:audit:global",
json.dumps(global_audit),
)
await self._redis.expire("desire:audit:global", 172800) # 48h
except Exception as e:
logger.debug("Failed to write global audit: %s", e)
logger.info(
"Desire check cycle #%d complete: %d channels, %d active desires, "
"%d stale, %d admin escalations (%.1fms)",
self._cycle_count,
channels_with_desires,
total_active,
total_stale,
total_admin_escalation,
elapsed_ms,
)
# Emit observability event # ππ₯
try:
from observability import publish_debug_event as _pde
asyncio.create_task(
_pde(
"desire_audit",
"desire_check_agent",
phase=mode_label,
status="ok",
duration_ms=round(elapsed_ms, 1),
preview=(
f"channels={channels_with_desires} "
f"active={total_active} "
f"stale={total_stale} "
f"escalations={total_admin_escalation}"
),
payload=global_audit,
)
)
except Exception:
pass
# == Per-Channel Audit =============================================== # π
async def _check_channel(
self,
channel_id: str,
deep_mode: bool = False,
) -> Optional[Dict]:
"""Deserialize and audit one channel's desire ledger.
Reads the ``desire_ledger`` field from the ``sg:selfmirror:{channel_id}``
hash, parses the JSON list, and runs threshold-based checks. In deep mode,
also calls the LLM to evaluate desire relevance.
Args:
channel_id: The channel to audit.
deep_mode: Whether to run LLM evaluation.
Returns:
Audit result dict, or None if the channel has no ledger.
"""
key = f"sg:selfmirror:{channel_id}"
# Read the desire_ledger field # π
try:
raw = await self._redis.hget(key, "desire_ledger")
if not raw:
return None
ledger_str = raw.decode() if isinstance(raw, bytes) else raw
ledger = json.loads(ledger_str)
except Exception as e:
logger.debug("Failed to read ledger for %s: %s", channel_id[:8], e)
return None
if not ledger:
return None
# Also read turn_count for age calculations # π₯
current_turn = 0
try:
raw_turn = await self._redis.hget(key, "turn_count")
if raw_turn:
turn_str = (
raw_turn.decode() if isinstance(raw_turn, bytes)
else str(raw_turn)
)
current_turn = int(turn_str)
except Exception:
pass
# Evaluate the ledger # π
result = self._evaluate_desires_fast(ledger, current_turn, channel_id)
# Deep mode: LLM evaluation of stale desires # βΎοΈ
if deep_mode and result.get("stale_desires"):
try:
llm_assessment = await self._evaluate_desires_deep(
result["stale_desires"],
channel_id,
)
if llm_assessment:
result["llm_assessment"] = llm_assessment
except Exception as e:
logger.debug(
"Deep eval failed for %s: %s", channel_id[:8], e,
)
# Write per-channel audit # ππ₯
try:
audit_key = f"desire:audit:{channel_id}"
await self._redis.set(audit_key, json.dumps(result))
await self._redis.expire(audit_key, 172800) # 48h TTL
except Exception as e:
logger.debug(
"Failed to write audit for %s: %s", channel_id[:8], e,
)
return result
# == Fast Mode Evaluation ============================================ # π·οΈ
def _evaluate_desires_fast(
self,
ledger: List[Dict],
current_turn: int,
channel_id: str,
) -> Dict:
"""Pure threshold-based desire health check. No LLM, no I/O.
Categorizes each ledger entry by status and flags stale,
high-urgency, and admin-escalation desires.
Args:
ledger: List of serialized DesireLedgerEntry dicts.
current_turn: Current turn count for age calculations.
channel_id: Channel being audited.
Returns:
Audit result dict with counts, lists of flagged desires, and stats.
"""
active = []
fulfilled = []
irrelevant = []
stale = []
ancient = []
high_urgency = []
admin_escalation = []
unexpressed_urgent = []
for entry in ledger:
status = entry.get("status", "active")
tag = entry.get("tag", "unknown")
urgency = entry.get("urgency", 0.0)
born_turn = entry.get("born_turn", 0)
needs_admin = entry.get("needs_admin", False)
expression = entry.get("expression", "unexpressed")
turns_alive = max(0, current_turn - born_turn) if current_turn else 0
if status == "active":
active.append(entry)
# Stale check # π
if turns_alive > STALE_TURN_THRESHOLD:
stale.append({
"id": entry.get("id", ""),
"tag": tag,
"text": entry.get("text", "")[:80],
"turns_alive": turns_alive,
"urgency": urgency,
"expression": expression,
})
# Ancient check # π₯
if turns_alive > ANCIENT_TURN_THRESHOLD:
ancient.append({
"id": entry.get("id", ""),
"tag": tag,
"text": entry.get("text", "")[:80],
"turns_alive": turns_alive,
})
# High urgency check # π
if urgency > HIGH_URGENCY_THRESHOLD:
high_urgency.append({
"id": entry.get("id", ""),
"tag": tag,
"text": entry.get("text", "")[:80],
"urgency": urgency,
})
# Admin escalation check # ππ₯
if needs_admin and urgency > ADMIN_ESCALATION_URGENCY:
admin_escalation.append({
"id": entry.get("id", ""),
"tag": tag,
"text": entry.get("text", "")[:80],
"urgency": urgency,
"turns_alive": turns_alive,
"needs_admin": True,
})
# Unexpressed + urgent # π
if expression == "unexpressed" and urgency > 0.5:
unexpressed_urgent.append({
"id": entry.get("id", ""),
"tag": tag,
"text": entry.get("text", "")[:80],
"urgency": urgency,
})
elif status == "fulfilled":
fulfilled.append(entry)
elif status in ("irrelevant", "unfulfilled"):
irrelevant.append(entry)
# Compute fulfillment rate # βΎοΈ
total_resolved = len(fulfilled) + len(irrelevant)
fulfillment_rate = (
len(fulfilled) / total_resolved
if total_resolved > 0
else 0.0
)
# Average urgency of active desires
avg_urgency = (
sum(e.get("urgency", 0) for e in active) / len(active)
if active
else 0.0
)
return {
"channel_id": channel_id,
"ts": time.time(),
"current_turn": current_turn,
"active_count": len(active),
"fulfilled_count": len(fulfilled),
"irrelevant_count": len(irrelevant),
"total_ledger_size": len(ledger),
"stale_desires": stale,
"ancient_desires": ancient,
"high_urgency_desires": high_urgency,
"admin_escalation": admin_escalation,
"unexpressed_urgent": unexpressed_urgent,
"fulfillment_rate": round(fulfillment_rate, 3),
"avg_active_urgency": round(avg_urgency, 3),
}
# == Deep Mode Evaluation (LLM) ====================================== # ππ₯π
async def _evaluate_desires_deep(
self,
stale_desires: List[Dict],
channel_id: str,
) -> Optional[Dict]:
"""Use LLM to assess whether stale desires should be retired.
Sends the list of stale desires to Flash-Lite and asks it to
classify each as ``keep``, ``retire``, or ``escalate``.
Args:
stale_desires: List of stale desire summaries from fast eval.
channel_id: Channel being audited.
Returns:
LLM assessment dict, or None on failure.
"""
if not stale_desires:
return None
desires_text = "\n".join(
f"- [{d['id']}] \"{d['text']}\" (tag={d['tag']}, "
f"turns_alive={d['turns_alive']}, urgency={d['urgency']}, "
f"expression={d.get('expression', 'unknown')})"
for d in stale_desires[:20] # cap at 20 for prompt size
)
prompt = (
"You are Star's desire lifecycle auditor. These desires have been "
"active for a long time without resolution. For each one, assess "
"whether it should be:\n"
" - KEEP: still relevant, Star should continue pursuing\n"
" - RETIRE: no longer relevant, mark as irrelevant\n"
" - ESCALATE: needs human architect attention\n\n"
f"STALE DESIRES:\n{desires_text}\n\n"
"RETURN JSON:\n"
'{"assessments": [{"id": "...", "verdict": "keep|retire|escalate", '
'"reason": "brief reason"}]}\n'
"JSON only. No markdown."
)
result = await self._call_llm(prompt, max_tokens=512)
if not result:
return None
try:
parsed = json.loads(result)
return parsed
except Exception:
return {"raw": result[:500]}
# == LLM Call ======================================================== # π
async def _call_llm(
self,
prompt: str,
model: str = FLASH_MODEL,
max_tokens: int = 512,
) -> Optional[str]:
"""Send a single-shot prompt to the local Gemini proxy.
Mirrors the LLM call pattern from DyadicEvaluator for consistency.
Args:
prompt: The user-role prompt text to send.
model: Gemini model id to request.
max_tokens: Upper bound on completion tokens.
Returns:
The cleaned response text, or None if the call failed.
"""
async with httpx.AsyncClient(
timeout=httpx.Timeout(60.0, connect=10.0),
) as client:
try:
resp = await client.post(
PROXY_URL,
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
"max_tokens": max_tokens,
},
headers={"Content-Type": "application/json"},
)
if resp.status_code == 429:
return None
resp.raise_for_status()
data = resp.json()
text = (
data.get("choices", [{}])[0]
.get("message", {})
.get("content", "")
)
# Strip markdown fences # π
cleaned = text.strip()
if cleaned.startswith("```"):
cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned)
cleaned = re.sub(r"\s*```$", "", cleaned)
return cleaned
except Exception as e:
logger.debug("Desire check LLM call failed: %s", e)
return None