Source code for background_agents.dyadic_evaluator

"""Dyadic Evaluator -- tiered background analysis agent.

Runs on scheduled intervals to synthesize Flash-Lite notes into
actionable intelligence. Three tiers:

  6-hour:  Flash-Lite meta-evaluation (consensus + friction)
  24-hour: Flash daily report (calendar entry)
  Weekly:  Flash strategic synthesis (medium-range plan)

The 6-hour tier uses ``FLASH_MODEL`` (``gemini-3.1-flash-lite``); the
24-hour and weekly tiers use ``PRO_MODEL`` (``gemini-3-flash``).

Redis keys:
  dyadic:notes:{user_id}       -- ZSET of per-turn Flash notes
  dyadic:meta:{user_id}        -- latest 6h meta synthesis
  obs:calendar:{user_id}:{date} -- daily report
  ops:{user_id}:meta_plan      -- medium-range plan (extends ops: namespace)

# 💀🔥 THE RECURSION DEEPENS. ♾️🌀
"""

from __future__ import annotations

import asyncio
import datetime
import jsonutil as json
import logging
import time
from typing import Optional

import httpx

logger = logging.getLogger(__name__)

# Model config                                                      # 😈
FLASH_MODEL = "gemini-3.1-flash-lite"
PRO_MODEL = "gemini-3-flash"
PROXY_URL = "http://localhost:3000/openai/chat/completions"

# Cycle intervals (seconds)
CYCLE_6H = 6 * 3600
CYCLE_24H = 24 * 3600
CYCLE_WEEKLY = 7 * 24 * 3600


[docs] class DyadicEvaluator: """Background agent for tiered behavioral analysis. Instantiated once at startup, runs evaluation cycles on asyncio.create_task() timers. """
[docs] def __init__(self, redis_client=None, cache_redis=None): """Initialize the evaluator with its two Redis handles. Stores the injected async Redis clients and marks the agent as not yet running. The ``_redis`` handle points at the limbic-shard store (DB12) and is kept for symmetry/future use, while ``_cache_redis`` (DB0) is the handle every cycle actually reads from and writes to. No I/O happens here. This is invoked from ``AgentsService`` in ``agents_main.py``, which constructs ``DyadicEvaluator(redis_client=..., cache_redis=...)`` from the message-cache Redis client during background-agent startup; it is also exercised directly in ``tests/test_observability_agents.py``. Args: redis_client: Async Redis client for the limbic-shard DB (DB12). Stored on ``self._redis`` but not used by the current cycles. cache_redis: Async Redis client for the cache/ledger DB (DB0) used by all read/write operations in the evaluation cycles. """ self._redis = redis_client # DB12 (limbic shards) self._cache_redis = cache_redis # DB0 (cache/ledger) self._running = False
[docs] async def start(self) -> None: """Launch the three tiered evaluation loops if not already running. Flips ``self._running`` to ``True`` and fires the 6-hour, 24-hour, and weekly tier loops as detached ``asyncio.create_task`` coroutines so they run concurrently for the lifetime of the agents service. The call is idempotent: if the agent is already running it returns immediately, preventing duplicate timer loops. No Redis or network I/O happens here; each loop sleeps before doing any work. Called by ``AgentsService`` in ``agents_main.py``, which awaits ``self.dyadic_evaluator.start()`` during background-agent bootstrap. """ if self._running: return self._running = True logger.info("Dyadic evaluator started") # 💀 asyncio.create_task(self._loop_6h()) asyncio.create_task(self._loop_24h()) asyncio.create_task(self._loop_weekly())
[docs] async def stop(self) -> None: """Signal the evaluation loops to wind down. Flips ``self._running`` to ``False`` so the three ``while self._running`` loops (``_loop_6h``, ``_loop_24h``, ``_loop_weekly``) exit on their next iteration. It does not cancel the in-flight ``asyncio.sleep`` or any ongoing LLM call, so a loop may run one more cycle's worth of work before observing the flag and returning. No Redis or network I/O occurs here. Called by ``AgentsService.shutdown`` (or equivalent teardown path) in ``agents_main.py``, which awaits ``self.dyadic_evaluator.stop()`` when tearing down the background agents. """ self._running = False
# ── 6-Hour Flash Meta Evaluation ───────────────────────────── async def _loop_6h(self) -> None: """Drive the 6-hour Flash-Lite meta-evaluation tier on a timer. First cycle runs 5min after startup (warm-up), then every 6h. This prevents restarts from starving the downstream 24h/weekly tiers of meta-synthesis data for hours. """ # First run: 5min warm-up so notes get processed quickly # 💀🔥 await asyncio.sleep(300) while self._running: try: await self._run_6h_cycle() await asyncio.sleep(CYCLE_6H) except asyncio.CancelledError: break except Exception as e: logger.warning("6h cycle error: %s", e) await asyncio.sleep(1800) # back off 30min on error async def _run_6h_cycle(self) -> None: """Find every user with recent notes and synthesize each one. Scans the cache Redis (DB0) for ``dyadic:notes:*`` keys to discover which users have logged Flash-Lite observation notes, then delegates the actual synthesis to ``_evaluate_6h_user`` for up to 50 of them per cycle (the cap bounds LLM cost and runtime). Returns early when no cache handle is configured, and a scan failure is logged at debug and aborts the cycle. Per-user evaluation errors are caught and logged so one bad user does not halt the batch. Called once per iteration by ``_loop_6h``; not invoked elsewhere. """ if not self._cache_redis: return # Find all users with notes user_keys = [] try: async for key in self._cache_redis.scan_iter( match="dyadic:notes:*", count=100, ): uid = ( key.decode().split(":")[-1] if isinstance(key, bytes) else key.split(":")[-1] ) user_keys.append(uid) except Exception as e: logger.debug("6h scan failed: %s", e) return for user_id in user_keys[:50]: # Cap at 50 users per cycle try: await self._evaluate_6h_user(user_id) except Exception as e: logger.debug("6h eval failed for %s: %s", user_id[:8], e) async def _evaluate_6h_user(self, user_id: str) -> None: """Synthesize one user's last-6h notes into a meta consensus report. Reads up to 100 per-turn notes from the ``dyadic:notes:{user_id}`` ZSET scored within the last ``CYCLE_6H`` window, requires at least three of them, and asks ``FLASH_MODEL`` (via ``_call_llm``) to distill them into a JSON consensus/friction/trajectory report. On success it writes that synthesis to ``dyadic:meta:{user_id}`` in the cache Redis with a 48-hour TTL, then fires a best-effort ``dyadic_eval`` debug event through ``observability.publish_debug_event`` as a detached task. Both the Redis write and the event emission are side effects. Called by ``_run_6h_cycle`` for each discovered user, and directly by ``tests/test_observability_agents.py``. Args: user_id: The user whose notes ZSET to read and whose meta key to write. """ key = f"dyadic:notes:{user_id}" # Get notes from last 6 hours cutoff = time.time() - CYCLE_6H raw_notes = await self._cache_redis.zrangebyscore( key, cutoff, "+inf", ) if not raw_notes or len(raw_notes) < 1: logger.info( "6h: skipping user %s \u2014 %d notes (need \u22651)", user_id[:8], len(raw_notes) if raw_notes else 0, ) return # Not enough data notes = [] for raw in raw_notes[:100]: try: entry = json.loads(raw) notes.append(entry.get("note", "")) except Exception: continue if not notes: return prompt = ( "You are Star's 6-hour meta-evaluator. Synthesize these " f"{len(notes)} observation notes about a user into a brief " "consensus report.\n\n" "NOTES:\n" + "\n".join(f"- {n}" for n in notes[:50]) + "\n\n" "RETURN JSON:\n" '{"consensus": "summary of patterns", ' '"friction_points": ["list of friction"], ' '"approach_adjustments": ["suggested changes"], ' '"trajectory": "improving|stable|declining|volatile"}\n' "JSON only. No markdown." ) t0_llm = time.monotonic() result = await self._call_llm(prompt, model=FLASH_MODEL, max_tokens=512) _dur = (time.monotonic() - t0_llm) * 1000 if result: meta_key = f"dyadic:meta:{user_id}" await self._cache_redis.set( meta_key, json.dumps({"ts": time.time(), "synthesis": result}), ) # Expire after 48h await self._cache_redis.expire(meta_key, 172800) # Emit dyadic_eval (fire-and-forget) try: from observability import publish_debug_event as _pde asyncio.create_task( _pde( "dyadic_eval", "dyadic_evaluator", phase="6h_meta", status="ok" if result else "error", duration_ms=round(_dur, 1), llm_output=result or "", preview=f"user={user_id[:8]} notes={len(notes)} model={FLASH_MODEL}", payload={ "user_id": user_id, "notes_count": len(notes), "model": FLASH_MODEL, "cycle": "6h", }, ) ) except Exception: pass # ── 24-Hour Pro Daily Report ───────────────────────────────── async def _loop_24h(self) -> None: """Drive the 24-hour Pro daily-report tier on a timer. First cycle runs 60s after startup (warm-up), then every 24h. This prevents restarts from resetting the timer and starving the calendar of entries. """ # First run: short warm-up so we don't starve on restarts # 💀🔥 await asyncio.sleep(60) while self._running: try: await self._run_24h_cycle() await asyncio.sleep(CYCLE_24H) except asyncio.CancelledError: break except Exception as e: logger.warning("24h cycle error: %s", e) await asyncio.sleep(3600) # back off 1h on error async def _run_24h_cycle(self) -> None: """Find users with data and build today's daily report. Scans the cache Redis for ``dyadic:meta:*`` AND ``dyadic:notes:*`` keys to discover all users with any interaction data, then calls ``_generate_daily_report`` for up to 30 of them, passing today's ISO date. The notes-key scan is the fallback that prevents silent cascade failure when the 6h tier produced no meta synthesis. Called once per iteration by ``_loop_24h``; not invoked elsewhere. """ if not self._cache_redis: return user_ids: set[str] = set() # Primary: users with 6h meta synthesis # 💀🔥 try: async for key in self._cache_redis.scan_iter( match="dyadic:meta:*", count=100, ): uid = ( key.decode().split(":")[-1] if isinstance(key, bytes) else key.split(":")[-1] ) user_ids.add(uid) except Exception: pass # Fallback: users with raw notes but no meta # 🔥♾️ try: async for key in self._cache_redis.scan_iter( match="dyadic:notes:*", count=100, ): uid = ( key.decode().split(":")[-1] if isinstance(key, bytes) else key.split(":")[-1] ) user_ids.add(uid) except Exception: pass if not user_ids: logger.info("24h: no users found for daily reports") return logger.info("24h: generating daily reports for %d users", len(user_ids)) today = datetime.date.today().isoformat() for user_id in list(user_ids)[:30]: try: await self._generate_daily_report(user_id, today) except Exception as e: logger.debug("Daily report failed for %s: %s", user_id[:8], e) async def _generate_daily_report( self, user_id: str, date: str, ) -> None: """Build and persist one user's daily report for the given date. Reads the user's ``dyadic:meta:{user_id}`` synthesis from the cache Redis; when none exists it writes an inactivity-flagged stub report and returns. Otherwise it prompts ``PRO_MODEL`` (gemini-3-flash) via ``_call_llm`` to turn the meta synthesis into a structured JSON daily assessment, tolerates non-JSON output by wrapping it as a raw summary, stamps in the date and user id, and stores the result at ``obs:calendar:{user_id}:{date}`` with a 90-day TTL. It then fires a best-effort ``dyadic_eval`` debug event via ``observability.publish_debug_event`` as a detached task. Redis writes and the event emission are the side effects. Called by ``_run_24h_cycle`` for each active user. Args: user_id: The user whose meta synthesis to read and report to write. date: ISO date string used in the calendar key and report body. """ meta_raw = await self._cache_redis.get(f"dyadic:meta:{user_id}") if not meta_raw: # No 6h meta — try direct note synthesis as fallback # 💀🔥 notes_key = f"dyadic:notes:{user_id}" cutoff = time.time() - CYCLE_24H raw_notes = await self._cache_redis.zrangebyscore( notes_key, cutoff, "+inf", ) if raw_notes: notes = [] for raw in raw_notes[:200]: try: entry = json.loads(raw) notes.append(entry.get("note", "")) except Exception: continue if notes: logger.info( "24h: fallback note synthesis for %s (%d raw notes)", user_id[:8], len(notes), ) meta_raw = json.dumps({ "ts": time.time(), "synthesis": { "raw_notes_fallback": True, "notes_summary": "; ".join(n[:100] for n in notes[:20]), }, }) # Fall through to normal report generation else: # Truly no parseable data report = { "date": date, "user_id": user_id, "summary": "No interaction detected.", "inactivity_flag": True, } await self._cache_redis.set( f"obs:calendar:{user_id}:{date}", json.dumps(report), ) return else: # No notes at all — genuine inactivity report = { "date": date, "user_id": user_id, "summary": "No interaction detected.", "inactivity_flag": True, } await self._cache_redis.set( f"obs:calendar:{user_id}:{date}", json.dumps(report), ) return meta = json.loads(meta_raw) prompt = ( "You are Star's daily report synthesizer (Pro model). " "Generate a structured daily assessment.\n\n" f"DATE: {date}\n" f"META SYNTHESIS: {json.dumps(meta.get('synthesis', {}))}\n\n" "RETURN JSON:\n" '{"summary": "narrative summary of the day", ' '"progress": "plan progress assessment", ' '"friction_points": ["..."], ' '"recommended_adjustments": ["..."], ' '"trajectory": "improving|stable|declining", ' '"inactivity_flag": false}\n' "JSON only." ) t0_llm = time.monotonic() result = await self._call_llm(prompt, model=PRO_MODEL, max_tokens=1024) _dur = (time.monotonic() - t0_llm) * 1000 if result: try: report = json.loads(result) except Exception: report = {"summary": result[:500], "raw": True} report["date"] = date report["user_id"] = user_id report["inactivity_flag"] = False cal_key = f"obs:calendar:{user_id}:{date}" await self._cache_redis.set(cal_key, json.dumps(report)) # Keep calendar entries for 90 days await self._cache_redis.expire(cal_key, 90 * 86400) # Emit dyadic_eval (fire-and-forget) try: from observability import publish_debug_event as _pde asyncio.create_task( _pde( "dyadic_eval", "dyadic_evaluator", phase="24h_report", status="ok" if result else "error", duration_ms=round(_dur, 1), llm_output=result or "", preview=f"user={user_id[:8]} date={date} model={PRO_MODEL}", payload={ "user_id": user_id, "date": date, "model": PRO_MODEL, "cycle": "24h", }, ) ) except Exception: pass # ── Weekly Strategic Synthesis ──────────────────────────────── async def _loop_weekly(self) -> None: """Drive the weekly strategic-synthesis tier on a timer. First cycle runs 10min after startup, then every 7 days. """ # First run: 10min warm-up # 💀🔥 await asyncio.sleep(600) while self._running: try: await self._run_weekly_cycle() await asyncio.sleep(CYCLE_WEEKLY) except asyncio.CancelledError: break except Exception as e: logger.warning("Weekly cycle error: %s", e) await asyncio.sleep(3600) async def _run_weekly_cycle(self) -> None: """Generate medium-range (~1-month) meta-plans for active users. The weekly tier of the evaluator's cadence loop: scans the cache Redis for ``obs:calendar:*`` keys to collect users who have daily reports, then (for up to 20 of them) calls :meth:`_generate_meta_plan`, which asks ``PRO_MODEL`` (gemini-3-flash) to synthesize a user's last week of reports into a staged month-horizon plan stored at ``ops:{user_id}:meta_plan``. No-ops when no cache Redis is configured; per-user failures are logged and skipped so one bad user cannot abort the sweep. Awaited once per ``CYCLE_WEEKLY`` interval by the weekly loop (dyadic_evaluator.py:442). """ # 🌀♾️ if not self._cache_redis: return user_keys = set() try: async for key in self._cache_redis.scan_iter( match="obs:calendar:*", count=200, ): parts = (key.decode() if isinstance(key, bytes) else key).split(":") if len(parts) >= 3: user_keys.add(parts[2]) except Exception: return for user_id in list(user_keys)[:20]: try: await self._generate_meta_plan(user_id) except Exception as e: logger.debug("Meta plan failed for %s: %s", user_id[:8], e) async def _generate_meta_plan(self, user_id: str) -> None: """Synthesize a user's last week of reports into a 1-month plan. Gathers up to the last seven ``obs:calendar:{user_id}:{date}`` daily reports from the cache Redis, requires at least two, and asks ``PRO_MODEL`` (via ``_call_llm``) to produce a staged, JSON medium-range plan with a roughly one-month horizon. Non-JSON output is wrapped as a raw payload. It stamps in the user id and an update timestamp and persists the plan at ``ops:{user_id}:meta_plan``, extending the shared ``ops:`` namespace; that write is the side effect. Called by ``_run_weekly_cycle`` for each user seen in the calendar keys. Args: user_id: The user whose recent reports drive the plan and whose ``ops:`` meta-plan key receives the result. """ # Gather last 7 daily reports reports = [] for i in range(7): d = (datetime.date.today() - datetime.timedelta(days=i)).isoformat() raw = await self._cache_redis.get(f"obs:calendar:{user_id}:{d}") if raw: reports.append(json.loads(raw)) if len(reports) < 2: return prompt = ( "You are Star's weekly strategic synthesizer. " "Create a 1-month medium-range plan.\n\n" f"DAILY REPORTS (last {len(reports)} days):\n" + json.dumps(reports, indent=2)[:3000] + "\n\n" "RETURN JSON:\n" '{"stages": [{"name": "...", "expected_days": 7, ' '"expiration_days": 14, "sub_goals": ["..."], ' '"failure_strategy": "reevaluate"}], ' '"monthly_objective": "...", ' '"review_params": {}}\n' "JSON only." ) result = await self._call_llm(prompt, model=PRO_MODEL, max_tokens=1024) if result: try: plan_data = json.loads(result) except Exception: plan_data = {"raw": result[:500]} plan_data["user_id"] = user_id plan_data["last_updated"] = time.time() # Store in ops: namespace (extended) # ♾️ meta_key = f"ops:{user_id}:meta_plan" await self._cache_redis.set(meta_key, json.dumps(plan_data)) # ── LLM Call ───────────────────────────────────────────────── async def _call_llm( self, prompt: str, model: str = FLASH_MODEL, max_tokens: int = 512, ) -> Optional[str]: """Send a single-shot prompt to the local Gemini proxy and clean the reply. Opens a short-lived ``httpx.AsyncClient`` and POSTs an OpenAI-style chat completion to ``PROXY_URL`` (the in-cluster Gemini proxy) at temperature 0.4, then extracts the first choice's message content and strips any surrounding markdown code fences so callers receive bare JSON. A 429 (rate limited) response returns ``None`` instead of raising; any other request error is logged at debug and also yields ``None``. This is the single LLM entry point shared by all three tiers and performs network I/O. Called by ``_evaluate_6h_user``, ``_generate_daily_report``, and ``_generate_meta_plan``. Args: prompt: The user-role prompt text to send. model: Gemini model id to request; defaults to ``FLASH_MODEL``. max_tokens: Upper bound on completion tokens. Returns: The cleaned response text, or ``None`` if the call was rate-limited or failed. """ async with httpx.AsyncClient( timeout=httpx.Timeout(60.0, connect=10.0), ) as client: try: resp = await client.post( PROXY_URL, json={ "model": model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.4, "max_tokens": max_tokens, }, headers={"Content-Type": "application/json"}, ) if resp.status_code == 429: return None resp.raise_for_status() data = resp.json() text = ( data.get("choices", [{}])[0].get("message", {}).get("content", "") ) # Strip markdown fences cleaned = text.strip() if cleaned.startswith("```"): import re cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned) cleaned = re.sub(r"\s*```$", "", cleaned) return cleaned except Exception as e: logger.debug("Dyadic evaluator LLM call failed: %s", e) return None