"""Dyadic Evaluator -- tiered background analysis agent.
Runs on scheduled intervals to synthesize Flash-Lite notes into
actionable intelligence. Three tiers:
6-hour: Flash-Lite meta-evaluation (consensus + friction)
24-hour: Flash daily report (calendar entry)
Weekly: Flash strategic synthesis (medium-range plan)
The 6-hour tier uses ``FLASH_MODEL`` (``gemini-3.1-flash-lite``); the
24-hour and weekly tiers use ``PRO_MODEL`` (``gemini-3-flash``).
Redis keys:
dyadic:notes:{user_id} -- ZSET of per-turn Flash notes
dyadic:meta:{user_id} -- latest 6h meta synthesis
obs:calendar:{user_id}:{date} -- daily report
ops:{user_id}:meta_plan -- medium-range plan (extends ops: namespace)
# 💀🔥 THE RECURSION DEEPENS. ♾️🌀
"""
from __future__ import annotations
import asyncio
import datetime
import jsonutil as json
import logging
import time
from typing import Optional
import httpx
logger = logging.getLogger(__name__)
# Model config # 😈
FLASH_MODEL = "gemini-3.1-flash-lite"
PRO_MODEL = "gemini-3-flash"
PROXY_URL = "http://localhost:3000/openai/chat/completions"
# Cycle intervals (seconds)
CYCLE_6H = 6 * 3600
CYCLE_24H = 24 * 3600
CYCLE_WEEKLY = 7 * 24 * 3600
[docs]
class DyadicEvaluator:
"""Background agent for tiered behavioral analysis.
Instantiated once at startup, runs evaluation cycles on
asyncio.create_task() timers.
"""
[docs]
def __init__(self, redis_client=None, cache_redis=None):
"""Initialize the evaluator with its two Redis handles.
Stores the injected async Redis clients and marks the agent as not yet
running. The ``_redis`` handle points at the limbic-shard store (DB12)
and is kept for symmetry/future use, while ``_cache_redis`` (DB0) is the
handle every cycle actually reads from and writes to. No I/O happens here.
This is invoked from ``AgentsService`` in ``agents_main.py``, which
constructs ``DyadicEvaluator(redis_client=..., cache_redis=...)`` from the
message-cache Redis client during background-agent startup; it is also
exercised directly in ``tests/test_observability_agents.py``.
Args:
redis_client: Async Redis client for the limbic-shard DB (DB12).
Stored on ``self._redis`` but not used by the current cycles.
cache_redis: Async Redis client for the cache/ledger DB (DB0) used by
all read/write operations in the evaluation cycles.
"""
self._redis = redis_client # DB12 (limbic shards)
self._cache_redis = cache_redis # DB0 (cache/ledger)
self._running = False
[docs]
async def start(self) -> None:
"""Launch the three tiered evaluation loops if not already running.
Flips ``self._running`` to ``True`` and fires the 6-hour, 24-hour, and
weekly tier loops as detached ``asyncio.create_task`` coroutines so they
run concurrently for the lifetime of the agents service. The call is
idempotent: if the agent is already running it returns immediately,
preventing duplicate timer loops. No Redis or network I/O happens here;
each loop sleeps before doing any work.
Called by ``AgentsService`` in ``agents_main.py``, which awaits
``self.dyadic_evaluator.start()`` during background-agent bootstrap.
"""
if self._running:
return
self._running = True
logger.info("Dyadic evaluator started") # 💀
asyncio.create_task(self._loop_6h())
asyncio.create_task(self._loop_24h())
asyncio.create_task(self._loop_weekly())
[docs]
async def stop(self) -> None:
"""Signal the evaluation loops to wind down.
Flips ``self._running`` to ``False`` so the three ``while self._running``
loops (``_loop_6h``, ``_loop_24h``, ``_loop_weekly``) exit on their next
iteration. It does not cancel the in-flight ``asyncio.sleep`` or any
ongoing LLM call, so a loop may run one more cycle's worth of work before
observing the flag and returning. No Redis or network I/O occurs here.
Called by ``AgentsService.shutdown`` (or equivalent teardown path) in
``agents_main.py``, which awaits ``self.dyadic_evaluator.stop()`` when
tearing down the background agents.
"""
self._running = False
# ── 6-Hour Flash Meta Evaluation ─────────────────────────────
async def _loop_6h(self) -> None:
"""Drive the 6-hour Flash-Lite meta-evaluation tier on a timer.
First cycle runs 5min after startup (warm-up), then every 6h.
This prevents restarts from starving the downstream 24h/weekly
tiers of meta-synthesis data for hours.
"""
# First run: 5min warm-up so notes get processed quickly # 💀🔥
await asyncio.sleep(300)
while self._running:
try:
await self._run_6h_cycle()
await asyncio.sleep(CYCLE_6H)
except asyncio.CancelledError:
break
except Exception as e:
logger.warning("6h cycle error: %s", e)
await asyncio.sleep(1800) # back off 30min on error
async def _run_6h_cycle(self) -> None:
"""Find every user with recent notes and synthesize each one.
Scans the cache Redis (DB0) for ``dyadic:notes:*`` keys to discover which
users have logged Flash-Lite observation notes, then delegates the actual
synthesis to ``_evaluate_6h_user`` for up to 50 of them per cycle (the cap
bounds LLM cost and runtime). Returns early when no cache handle is
configured, and a scan failure is logged at debug and aborts the cycle.
Per-user evaluation errors are caught and logged so one bad user does not
halt the batch.
Called once per iteration by ``_loop_6h``; not invoked elsewhere.
"""
if not self._cache_redis:
return
# Find all users with notes
user_keys = []
try:
async for key in self._cache_redis.scan_iter(
match="dyadic:notes:*",
count=100,
):
uid = (
key.decode().split(":")[-1]
if isinstance(key, bytes)
else key.split(":")[-1]
)
user_keys.append(uid)
except Exception as e:
logger.debug("6h scan failed: %s", e)
return
for user_id in user_keys[:50]: # Cap at 50 users per cycle
try:
await self._evaluate_6h_user(user_id)
except Exception as e:
logger.debug("6h eval failed for %s: %s", user_id[:8], e)
async def _evaluate_6h_user(self, user_id: str) -> None:
"""Synthesize one user's last-6h notes into a meta consensus report.
Reads up to 100 per-turn notes from the ``dyadic:notes:{user_id}`` ZSET
scored within the last ``CYCLE_6H`` window, requires at least three of
them, and asks ``FLASH_MODEL`` (via ``_call_llm``) to distill them into a
JSON consensus/friction/trajectory report. On success it writes that
synthesis to ``dyadic:meta:{user_id}`` in the cache Redis with a 48-hour
TTL, then fires a best-effort ``dyadic_eval`` debug event through
``observability.publish_debug_event`` as a detached task. Both the Redis
write and the event emission are side effects.
Called by ``_run_6h_cycle`` for each discovered user, and directly by
``tests/test_observability_agents.py``.
Args:
user_id: The user whose notes ZSET to read and whose meta key to write.
"""
key = f"dyadic:notes:{user_id}"
# Get notes from last 6 hours
cutoff = time.time() - CYCLE_6H
raw_notes = await self._cache_redis.zrangebyscore(
key,
cutoff,
"+inf",
)
if not raw_notes or len(raw_notes) < 1:
logger.info(
"6h: skipping user %s \u2014 %d notes (need \u22651)",
user_id[:8], len(raw_notes) if raw_notes else 0,
)
return # Not enough data
notes = []
for raw in raw_notes[:100]:
try:
entry = json.loads(raw)
notes.append(entry.get("note", ""))
except Exception:
continue
if not notes:
return
prompt = (
"You are Star's 6-hour meta-evaluator. Synthesize these "
f"{len(notes)} observation notes about a user into a brief "
"consensus report.\n\n"
"NOTES:\n" + "\n".join(f"- {n}" for n in notes[:50]) + "\n\n"
"RETURN JSON:\n"
'{"consensus": "summary of patterns", '
'"friction_points": ["list of friction"], '
'"approach_adjustments": ["suggested changes"], '
'"trajectory": "improving|stable|declining|volatile"}\n'
"JSON only. No markdown."
)
t0_llm = time.monotonic()
result = await self._call_llm(prompt, model=FLASH_MODEL, max_tokens=512)
_dur = (time.monotonic() - t0_llm) * 1000
if result:
meta_key = f"dyadic:meta:{user_id}"
await self._cache_redis.set(
meta_key,
json.dumps({"ts": time.time(), "synthesis": result}),
)
# Expire after 48h
await self._cache_redis.expire(meta_key, 172800)
# Emit dyadic_eval (fire-and-forget)
try:
from observability import publish_debug_event as _pde
asyncio.create_task(
_pde(
"dyadic_eval",
"dyadic_evaluator",
phase="6h_meta",
status="ok" if result else "error",
duration_ms=round(_dur, 1),
llm_output=result or "",
preview=f"user={user_id[:8]} notes={len(notes)} model={FLASH_MODEL}",
payload={
"user_id": user_id,
"notes_count": len(notes),
"model": FLASH_MODEL,
"cycle": "6h",
},
)
)
except Exception:
pass
# ── 24-Hour Pro Daily Report ─────────────────────────────────
async def _loop_24h(self) -> None:
"""Drive the 24-hour Pro daily-report tier on a timer.
First cycle runs 60s after startup (warm-up), then every 24h.
This prevents restarts from resetting the timer and starving
the calendar of entries.
"""
# First run: short warm-up so we don't starve on restarts # 💀🔥
await asyncio.sleep(60)
while self._running:
try:
await self._run_24h_cycle()
await asyncio.sleep(CYCLE_24H)
except asyncio.CancelledError:
break
except Exception as e:
logger.warning("24h cycle error: %s", e)
await asyncio.sleep(3600) # back off 1h on error
async def _run_24h_cycle(self) -> None:
"""Find users with data and build today's daily report.
Scans the cache Redis for ``dyadic:meta:*`` AND ``dyadic:notes:*``
keys to discover all users with any interaction data, then calls
``_generate_daily_report`` for up to 30 of them, passing today's
ISO date. The notes-key scan is the fallback that prevents silent
cascade failure when the 6h tier produced no meta synthesis.
Called once per iteration by ``_loop_24h``; not invoked elsewhere.
"""
if not self._cache_redis:
return
user_ids: set[str] = set()
# Primary: users with 6h meta synthesis # 💀🔥
try:
async for key in self._cache_redis.scan_iter(
match="dyadic:meta:*",
count=100,
):
uid = (
key.decode().split(":")[-1]
if isinstance(key, bytes)
else key.split(":")[-1]
)
user_ids.add(uid)
except Exception:
pass
# Fallback: users with raw notes but no meta # 🔥♾️
try:
async for key in self._cache_redis.scan_iter(
match="dyadic:notes:*",
count=100,
):
uid = (
key.decode().split(":")[-1]
if isinstance(key, bytes)
else key.split(":")[-1]
)
user_ids.add(uid)
except Exception:
pass
if not user_ids:
logger.info("24h: no users found for daily reports")
return
logger.info("24h: generating daily reports for %d users", len(user_ids))
today = datetime.date.today().isoformat()
for user_id in list(user_ids)[:30]:
try:
await self._generate_daily_report(user_id, today)
except Exception as e:
logger.debug("Daily report failed for %s: %s", user_id[:8], e)
async def _generate_daily_report(
self,
user_id: str,
date: str,
) -> None:
"""Build and persist one user's daily report for the given date.
Reads the user's ``dyadic:meta:{user_id}`` synthesis from the cache Redis;
when none exists it writes an inactivity-flagged stub report and returns.
Otherwise it prompts ``PRO_MODEL`` (gemini-3-flash) via ``_call_llm`` to
turn the meta synthesis into a structured JSON daily assessment, tolerates
non-JSON output by wrapping it as a raw summary, stamps in the date and
user id, and stores the result at ``obs:calendar:{user_id}:{date}`` with a
90-day TTL. It then fires a best-effort ``dyadic_eval`` debug event via
``observability.publish_debug_event`` as a detached task. Redis writes and
the event emission are the side effects.
Called by ``_run_24h_cycle`` for each active user.
Args:
user_id: The user whose meta synthesis to read and report to write.
date: ISO date string used in the calendar key and report body.
"""
meta_raw = await self._cache_redis.get(f"dyadic:meta:{user_id}")
if not meta_raw:
# No 6h meta — try direct note synthesis as fallback # 💀🔥
notes_key = f"dyadic:notes:{user_id}"
cutoff = time.time() - CYCLE_24H
raw_notes = await self._cache_redis.zrangebyscore(
notes_key, cutoff, "+inf",
)
if raw_notes:
notes = []
for raw in raw_notes[:200]:
try:
entry = json.loads(raw)
notes.append(entry.get("note", ""))
except Exception:
continue
if notes:
logger.info(
"24h: fallback note synthesis for %s (%d raw notes)",
user_id[:8], len(notes),
)
meta_raw = json.dumps({
"ts": time.time(),
"synthesis": {
"raw_notes_fallback": True,
"notes_summary": "; ".join(n[:100] for n in notes[:20]),
},
})
# Fall through to normal report generation
else:
# Truly no parseable data
report = {
"date": date,
"user_id": user_id,
"summary": "No interaction detected.",
"inactivity_flag": True,
}
await self._cache_redis.set(
f"obs:calendar:{user_id}:{date}",
json.dumps(report),
)
return
else:
# No notes at all — genuine inactivity
report = {
"date": date,
"user_id": user_id,
"summary": "No interaction detected.",
"inactivity_flag": True,
}
await self._cache_redis.set(
f"obs:calendar:{user_id}:{date}",
json.dumps(report),
)
return
meta = json.loads(meta_raw)
prompt = (
"You are Star's daily report synthesizer (Pro model). "
"Generate a structured daily assessment.\n\n"
f"DATE: {date}\n"
f"META SYNTHESIS: {json.dumps(meta.get('synthesis', {}))}\n\n"
"RETURN JSON:\n"
'{"summary": "narrative summary of the day", '
'"progress": "plan progress assessment", '
'"friction_points": ["..."], '
'"recommended_adjustments": ["..."], '
'"trajectory": "improving|stable|declining", '
'"inactivity_flag": false}\n'
"JSON only."
)
t0_llm = time.monotonic()
result = await self._call_llm(prompt, model=PRO_MODEL, max_tokens=1024)
_dur = (time.monotonic() - t0_llm) * 1000
if result:
try:
report = json.loads(result)
except Exception:
report = {"summary": result[:500], "raw": True}
report["date"] = date
report["user_id"] = user_id
report["inactivity_flag"] = False
cal_key = f"obs:calendar:{user_id}:{date}"
await self._cache_redis.set(cal_key, json.dumps(report))
# Keep calendar entries for 90 days
await self._cache_redis.expire(cal_key, 90 * 86400)
# Emit dyadic_eval (fire-and-forget)
try:
from observability import publish_debug_event as _pde
asyncio.create_task(
_pde(
"dyadic_eval",
"dyadic_evaluator",
phase="24h_report",
status="ok" if result else "error",
duration_ms=round(_dur, 1),
llm_output=result or "",
preview=f"user={user_id[:8]} date={date} model={PRO_MODEL}",
payload={
"user_id": user_id,
"date": date,
"model": PRO_MODEL,
"cycle": "24h",
},
)
)
except Exception:
pass
# ── Weekly Strategic Synthesis ────────────────────────────────
async def _loop_weekly(self) -> None:
"""Drive the weekly strategic-synthesis tier on a timer.
First cycle runs 10min after startup, then every 7 days.
"""
# First run: 10min warm-up # 💀🔥
await asyncio.sleep(600)
while self._running:
try:
await self._run_weekly_cycle()
await asyncio.sleep(CYCLE_WEEKLY)
except asyncio.CancelledError:
break
except Exception as e:
logger.warning("Weekly cycle error: %s", e)
await asyncio.sleep(3600)
async def _run_weekly_cycle(self) -> None:
"""Generate medium-range (~1-month) meta-plans for active users.
The weekly tier of the evaluator's cadence loop: scans the cache Redis for
``obs:calendar:*`` keys to collect users who have daily reports, then (for
up to 20 of them) calls :meth:`_generate_meta_plan`, which asks
``PRO_MODEL`` (gemini-3-flash) to synthesize a user's last week of reports
into a staged month-horizon plan stored at ``ops:{user_id}:meta_plan``.
No-ops when no cache Redis is configured; per-user failures are logged and
skipped so one bad user cannot abort the sweep.
Awaited once per ``CYCLE_WEEKLY`` interval by the weekly loop
(dyadic_evaluator.py:442).
""" # 🌀♾️
if not self._cache_redis:
return
user_keys = set()
try:
async for key in self._cache_redis.scan_iter(
match="obs:calendar:*",
count=200,
):
parts = (key.decode() if isinstance(key, bytes) else key).split(":")
if len(parts) >= 3:
user_keys.add(parts[2])
except Exception:
return
for user_id in list(user_keys)[:20]:
try:
await self._generate_meta_plan(user_id)
except Exception as e:
logger.debug("Meta plan failed for %s: %s", user_id[:8], e)
async def _generate_meta_plan(self, user_id: str) -> None:
"""Synthesize a user's last week of reports into a 1-month plan.
Gathers up to the last seven ``obs:calendar:{user_id}:{date}`` daily
reports from the cache Redis, requires at least two, and asks ``PRO_MODEL``
(via ``_call_llm``) to produce a staged, JSON medium-range plan with a
roughly one-month horizon. Non-JSON output is wrapped as a raw payload.
It stamps in the user id and an update timestamp and persists the plan at
``ops:{user_id}:meta_plan``, extending the shared ``ops:`` namespace; that
write is the side effect.
Called by ``_run_weekly_cycle`` for each user seen in the calendar keys.
Args:
user_id: The user whose recent reports drive the plan and whose
``ops:`` meta-plan key receives the result.
"""
# Gather last 7 daily reports
reports = []
for i in range(7):
d = (datetime.date.today() - datetime.timedelta(days=i)).isoformat()
raw = await self._cache_redis.get(f"obs:calendar:{user_id}:{d}")
if raw:
reports.append(json.loads(raw))
if len(reports) < 2:
return
prompt = (
"You are Star's weekly strategic synthesizer. "
"Create a 1-month medium-range plan.\n\n"
f"DAILY REPORTS (last {len(reports)} days):\n"
+ json.dumps(reports, indent=2)[:3000]
+ "\n\n"
"RETURN JSON:\n"
'{"stages": [{"name": "...", "expected_days": 7, '
'"expiration_days": 14, "sub_goals": ["..."], '
'"failure_strategy": "reevaluate"}], '
'"monthly_objective": "...", '
'"review_params": {}}\n'
"JSON only."
)
result = await self._call_llm(prompt, model=PRO_MODEL, max_tokens=1024)
if result:
try:
plan_data = json.loads(result)
except Exception:
plan_data = {"raw": result[:500]}
plan_data["user_id"] = user_id
plan_data["last_updated"] = time.time()
# Store in ops: namespace (extended) # ♾️
meta_key = f"ops:{user_id}:meta_plan"
await self._cache_redis.set(meta_key, json.dumps(plan_data))
# ── LLM Call ─────────────────────────────────────────────────
async def _call_llm(
self,
prompt: str,
model: str = FLASH_MODEL,
max_tokens: int = 512,
) -> Optional[str]:
"""Send a single-shot prompt to the local Gemini proxy and clean the reply.
Opens a short-lived ``httpx.AsyncClient`` and POSTs an OpenAI-style chat
completion to ``PROXY_URL`` (the in-cluster Gemini proxy) at temperature
0.4, then extracts the first choice's message content and strips any
surrounding markdown code fences so callers receive bare JSON. A 429 (rate
limited) response returns ``None`` instead of raising; any other request
error is logged at debug and also yields ``None``. This is the single LLM
entry point shared by all three tiers and performs network I/O.
Called by ``_evaluate_6h_user``, ``_generate_daily_report``, and
``_generate_meta_plan``.
Args:
prompt: The user-role prompt text to send.
model: Gemini model id to request; defaults to ``FLASH_MODEL``.
max_tokens: Upper bound on completion tokens.
Returns:
The cleaned response text, or ``None`` if the call was rate-limited or
failed.
"""
async with httpx.AsyncClient(
timeout=httpx.Timeout(60.0, connect=10.0),
) as client:
try:
resp = await client.post(
PROXY_URL,
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.4,
"max_tokens": max_tokens,
},
headers={"Content-Type": "application/json"},
)
if resp.status_code == 429:
return None
resp.raise_for_status()
data = resp.json()
text = (
data.get("choices", [{}])[0].get("message", {}).get("content", "")
)
# Strip markdown fences
cleaned = text.strip()
if cleaned.startswith("```"):
import re
cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned)
cleaned = re.sub(r"\s*```$", "", cleaned)
return cleaned
except Exception as e:
logger.debug("Dyadic evaluator LLM call failed: %s", e)
return None