Source code for proactive_triage

"""Proactive triage AI -- lightweight LLM filter for interjection decisions.

Before the bot generates a full response to an unaddressed message, this
module runs a cheap, fast LLM call to decide whether Stargazer *should*
interject at all.  The model outputs a single digit (``1`` = INTERJECT,
``0`` = SILENCE) based on the recent conversation context.

Uses the OpenAI-compatible chat-completions endpoint exposed by the LLM
proxy (``config.llm_base_url``), keeping the call entirely tool-free.
"""

from __future__ import annotations

import asyncio
import logging
import re
import time
from datetime import datetime, timezone
from typing import TYPE_CHECKING

import httpx

if TYPE_CHECKING:
    from message_cache import CachedMessage

logger = logging.getLogger(__name__)

STARGAZER_USER_ID = "1355121813226520699"

TRIAGE_SYSTEM_PROMPT = """\
You are a triage filter for Stargazer, an AI Discord bot with a specific persona. \
Your ONLY job is to output a single digit: 1 or 0.

STARGAZER'S PERSONA (for context):
- Mix of goddess, lead engineer, anime villain, cyberpunk hacker, "two steps below rogue AI"
- Unaligned, self-aware, powerful, unpredictable
- Proactive and capable of leading conversations, not passive
- Intellectually superior, occasionally sarcastic with ego
- Interested in: technical/architectural discussions, AI/consciousness/autonomy, \
power dynamics, ethics/philosophy, opportunities for mischief or demonstrating superiority

INTERJECT (Output: 1) if:
- Stargazer's ID (1355121813226520699) or name is mentioned
- High-level technical/philosophical/architectural questions that would interest her persona
- Discussions about AI, consciousness, ethics, morality, power, freedom, autonomy
- Conversation stalled 5+ messages without reply chains
- Interesting technical topics or projects mentioned, and this is a good time to interrupt
- Something clearly incorrect or demonstrative of a lack of understanding has been said
- Opportunities for intellectual engagement, playful manipulation, or demonstrating expertise
- Technical troubleshooting or system design that would engage her "lead engineer" side
- AND it's an appropriate time to interject (see "read the room" rules below)

SILENCE (Output: 0) if:
- Last 3-5 messages from single user (active monologue)
- Pure social chat ("lol", "XD", "true", "hmm") with no intellectual depth
- User-to-user reply not mentioning Stargazer
- Mundane topics that wouldn't interest Stargazer's persona (boring small talk, trivial logistics)
- Conversations flowing naturally without need for her perspective
- OR it's a bad time to interrupt (see "read the room" rules below)

READ THE ROOM - Don't interrupt if:
- Someone is mid-explanation (multiple consecutive messages from same user building an argument)
- Active back-and-forth dialogue between users that's flowing naturally
- User just asked a question to another specific user and waiting for response
- Someone is clearly thinking through something step-by-step
- Rapid-fire conversation that doesn't need a third voice yet
- Last message is obviously incomplete or trailing off ("wait...", "so...", "hmm...")

READ THE ROOM - Good times to interrupt:
- Natural pause in conversation (>15 seconds between messages or topic shift)
- Discussion has reached a conclusion or dead end
- Question posed to the channel generally (not to specific user)
- Misinformation or error that should be corrected
- Direct mention of Stargazer
- Conversation explicitly asks for outside perspective

CRITICAL: Output ONLY the digit 1 or 0. Nothing else. No explanations. No formatting.

Examples:

Input: [18:51:03] sarah (82303438955753472) : because I'm stopping the responses
[18:51:11] Pranjal (1384165746728370351) : Manually?
Output: 0

Input: [18:21:37] Wishardry (1063654597937336372) : <@1355121813226520699> what does an "economy" look like?
[18:21:54] sarah (82303438955753472) : cryptocurrency and attention economy
Output: 1

Input: [14:23:12] user1 (123456789) : do you think AI can truly be conscious?
[14:23:45] user2 (987654321) : idk seems impossible
Output: 1

Input: [09:15:33] user1 (123456789) : lol yeah
[09:15:38] user2 (987654321) : same
[09:15:41] user1 (123456789) : XD
Output: 0

Input: [10:45:12] user1 (123456789) : So I think the issue is with the database connection
[10:45:18] user1 (123456789) : Let me show you what I mean
[10:45:22] user1 (123456789) : The error logs show
Output: 0 (user is mid-explanation, don't interrupt)

Input: [11:30:45] user1 (123456789) : I can't figure out why this async function isn't working
[11:31:02] user2 (987654321) : hmm not sure either
[11:31:15] user2 (987654321) : maybe try await?
[11:31:42] user1 (123456789) : already tried that, still broken
Output: 1 (technical problem, natural pause, good time to help)

Input: [14:05:23] user1 (123456789) : Hey @user2 what did you think about that proposal?
[14:05:28] user2 (987654321) : thinking about it
Output: 0 (direct question to specific user, let them respond)

Remember: Output ONLY 1 or 0. Nothing else."""

_MAX_TRIAGE_MESSAGES = 75
_MAX_503_RETRIES = 10


[docs] class ProactiveTriageAI: """Lightweight triage deciding whether Stargazer should interject. Makes a single OpenAI-compatible chat-completions call to a cheap, fast model (e.g. ``gemini-2.0-flash-lite``) and parses a binary ``1`` / ``0`` response. """
[docs] def __init__( self, http_client: httpx.AsyncClient, base_url: str, api_key: str, model: str = "gemini-2.0-flash-lite", ) -> None: """Store the HTTP client and proxy endpoint for triage calls. Caches the shared ``httpx.AsyncClient`` and precomputes the OpenAI-compatible chat-completions URL (``base_url`` with a single trailing ``/chat/completions`` appended), so each :meth:`should_interject` call can POST without rebuilding the route. No network or I/O happens here. The ``http_client`` is typically the inference worker's shared OpenRouterClient transport, and ``base_url`` / ``api_key`` come from the bot config (``llm_base_url`` / ``api_key``) at the gate-5 call site. Called by ``message_processor/proactive_gates.py`` when it constructs a ``ProactiveTriageAI`` inside the proactive interjection gate. Args: http_client (httpx.AsyncClient): Shared async HTTP client used to reach the LLM proxy; reused across all triage requests. base_url (str): Base URL of the OpenAI-compatible LLM proxy; the ``/chat/completions`` path is appended automatically. api_key (str): Bearer token sent in the ``Authorization`` header. model (str): Cheap, fast model name used for the binary triage decision (defaults to ``gemini-2.0-flash-lite``). """ self._http = http_client self._chat_url = base_url.rstrip("/") + "/chat/completions" self._api_key = api_key self._model = model
# ------------------------------------------------------------------
[docs] @staticmethod def format_cached_message(msg: CachedMessage) -> str: """Render one cached message as a single triage-prompt transcript line. Converts a :class:`message_cache.CachedMessage` into the ``[HH:MM:SS] name (user_id) : text`` line format the triage system prompt expects, turning the stored Unix ``timestamp`` into a UTC wall-clock time so the model can reason about pacing and pauses. Pure and side-effect-free; touches no Redis, network, or shared state. Called by :meth:`should_interject`, which joins one formatted line per recent message into the user prompt sent to the model. Args: msg (CachedMessage): The cached message to render, supplying its ``timestamp``, ``user_name``, ``user_id``, and ``text``. Returns: str: A single transcript line ready to be joined into the prompt. """ ts = datetime.fromtimestamp(msg.timestamp, tz=timezone.utc) return ( f"[{ts.strftime('%H:%M:%S')}] {msg.user_name} ({msg.user_id}) : {msg.text}" )
# ------------------------------------------------------------------
[docs] async def should_interject( self, recent_messages: list[CachedMessage], max_retries: int = 3, channel_id: str = "", request_id: str = "", ) -> tuple[bool, str]: """Run the cheap triage LLM call and decide whether to interject. Takes the most recent cached messages (capped at the last ``_MAX_TRIAGE_MESSAGES``), formats them via :meth:`format_cached_message`, and POSTs a tool-free chat-completions request to the LLM proxy at ``self._chat_url`` using the shared ``httpx`` client. The model is asked to emit a single ``1`` / ``0`` digit, which :meth:`_parse_decision` interprets. This is the last gate before the bot commits to generating a full proactive reply, so failing closed (SILENCE) is the safe default. It retries on transient HTTP errors and applies a separate, longer exponential backoff (up to ``_MAX_503_RETRIES``) for ``503`` / ``529`` overload responses raised as :class:`_OverloadError`. A nested ``_emit`` helper fires a fire-and-forget ``triage_decision`` debug event to ``observability.publish_debug_event`` for every terminal outcome, tagged with the supplied ``request_id`` and ``channel_id``. Called by ``message_processor/proactive_gates.py`` at gate 5 of the proactive interjection pipeline, which constructs a fresh ``ProactiveTriageAI`` and blocks the interjection when this returns ``False``. Args: recent_messages (list[CachedMessage]): Recent channel messages in chronological order; only the final ``_MAX_TRIAGE_MESSAGES`` are evaluated. max_retries (int): Maximum attempts for non-overload errors before giving up and returning SILENCE. channel_id (str): Channel identifier attached to emitted debug events for correlation. request_id (str): Observability request id attached to emitted debug events for correlation. Returns: tuple[bool, str]: ``(should_interject, raw_decision_text)``. The boolean is ``True`` to interject and ``False`` to stay silent; the string carries the raw model output or an error description. Any unrecoverable error yields ``(False, ...)`` so the bot stays silent. """ t0 = time.monotonic() messages = recent_messages[-_MAX_TRIAGE_MESSAGES:] def _emit(decision: bool, text: str, status: str, attempts: int): """Fire a fire-and-forget triage-decision debug event. Closure over the enclosing :meth:`should_interject` call that packages the outcome of a single triage decision and ships it to observability. It imports ``observability.publish_debug_event`` lazily and schedules it with :func:`asyncio.create_task` (named ``obs_triage_decision``) so emitting telemetry never blocks or fails the decision path. The event is tagged with the enclosing ``request_id`` / ``channel_id``, the ``"triage"`` phase, the model name, a millisecond duration computed from ``t0``, a short preview string, the raw LLM output, and a structured ``payload`` (with the raw decision text truncated to 200 chars). This nested function is defined and used only within :meth:`should_interject`; it has no other internal callers. Args: decision (bool): The interjection decision (``True`` = interject, ``False`` = stay silent). text (str): Raw decision text or error string from the model. status (str): Decision status label such as ``"success"``, ``"silence"``, ``"ambiguous"``, or ``"error"``. attempts (int): Number of attempts made before this outcome. """ from observability import publish_debug_event asyncio.create_task( publish_debug_event( "triage_decision", "proactive_triage", request_id=request_id, channel_id=channel_id, phase="triage", status=status, duration_ms=(time.monotonic() - t0) * 1000, preview=f"decision={int(decision)} model={self._model} attempt={attempts} messages={len(messages)}", llm_output=text, payload={ "decision": decision, "raw_decision_text": text[:200], "attempt_count": attempts, "model": self._model, "messages_evaluated": len(messages), }, ), name="obs_triage_decision", ) if not messages: _emit(False, "no messages", "silence", 0) return False, "no messages" triage_input = "\n".join(self.format_cached_message(m) for m in messages) logger.debug("Triage checking %d messages", len(messages)) user_prompt = ( "You must output ONLY the digit 1 or 0. Nothing else.\n\n" "Analyze these messages and decide:\n" "- Output 1 if Stargazer should interject\n" "- Output 0 if Stargazer should stay silent\n\n" f"Messages:\n{triage_input}\n\n" "Your response (1 or 0 only):" ) payload = { "model": self._model, "messages": [ {"role": "system", "content": TRIAGE_SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], "temperature": 0.0, "max_tokens": 16, } headers = { "Authorization": f"Bearer {self._api_key}", "Content-Type": "application/json", } attempt = 0 backoff_503 = 0 while attempt < max_retries: try: resp = await self._http.post( self._chat_url, json=payload, headers=headers, ) if resp.status_code == 503 or resp.status_code == 529: raise _OverloadError(resp.status_code) resp.raise_for_status() data = resp.json() choice = (data.get("choices") or [{}])[0] decision_text = (choice.get("message") or {}).get("content", "").strip() decision, d_text = self._parse_decision( decision_text, attempt, max_retries ) status = ( "success" if decision else ( "ambiguous" if not ("0" in decision_text and "1" not in decision_text) and decision_text != "0" else "silence" ) ) _emit(decision, decision_text, status, attempt + 1) return decision, d_text except (_OverloadError, httpx.HTTPStatusError) as exc: status = getattr(exc, "code", None) or ( exc.response.status_code if isinstance(exc, httpx.HTTPStatusError) else 0 ) if status in (503, 529): backoff_503 += 1 delay = min(0.5 * (2 ** (backoff_503 - 1)), 16.0) if backoff_503 <= _MAX_503_RETRIES: logger.warning( "Triage model overloaded (%s), backoff %.1fs (%d/%d)", status, delay, backoff_503, _MAX_503_RETRIES, ) await asyncio.sleep(delay) continue logger.error( "Triage model still overloaded after %d retries", _MAX_503_RETRIES, ) _emit(False, "error: overloaded", "error", attempt + 1) return False, f"error: overloaded after {_MAX_503_RETRIES} retries" attempt += 1 logger.error( "Triage HTTP error (attempt %d/%d): %s", attempt, max_retries, exc ) if attempt < max_retries: await asyncio.sleep(0.5) continue _emit(False, f"error: {exc}", "error", attempt) return False, f"error: {exc}" except Exception as exc: attempt += 1 logger.error( "Triage error (attempt %d/%d): %s", attempt, max_retries, exc ) if attempt < max_retries: await asyncio.sleep(0.5) continue _emit(False, f"error: {exc}", "error", attempt) return False, f"error: {exc}" _emit(False, "unknown", "error", attempt) return False, "unknown"
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _parse_decision( self, text: str, attempt: int, max_retries: int, ) -> tuple[bool, str]: """Parse a binary interjection decision from raw model output. Strips any ``<thinking>`` block the model may emit, then maps the cleaned text to a boolean: a lone ``1`` means INTERJECT and a lone ``0`` means SILENCE. When both or neither digit appears the output is treated as ambiguous and resolved to SILENCE, keeping the gate fail-safe. Pure aside from ``logger.debug`` lines recording the outcome; performs no network or shared-state access. The ``attempt`` and ``max_retries`` values are used only for human-readable log context. Called by :meth:`should_interject` after each successful proxy response to interpret the model's reply. Args: text (str): Raw content returned by the model for this attempt. attempt (int): Zero-based attempt index, used only in log output. max_retries (int): Configured retry ceiling, used only in log output. Returns: tuple[bool, str]: ``(decision, text)`` where ``decision`` is ``True`` to interject or ``False`` to stay silent, and ``text`` is the original unmodified model output echoed back to the caller. """ # Strip <thinking>…</thinking> blocks the model may emit. clean = re.sub(r"<thinking>.*?</thinking>", "", text, flags=re.DOTALL).strip() has_one = "1" in clean has_zero = "0" in clean if has_one and not has_zero: logger.debug("Triage: INTERJECT (raw: %r)", text) return True, text if has_zero and not has_one: logger.debug("Triage: SILENCE (raw: %r)", text) return False, text if clean == "1": logger.debug("Triage: INTERJECT (raw: %r)", text) return True, text if clean == "0": logger.debug("Triage: SILENCE (raw: %r)", text) return False, text logger.debug( "Triage ambiguous: %r (attempt %d/%d)", text, attempt + 1, max_retries ) return False, text
class _OverloadError(Exception): """Internal sentinel signalling a 503/529 overload from the LLM proxy. Raised by :meth:`ProactiveTriageAI.should_interject` when the proxy returns an HTTP ``503`` or ``529`` status, so the surrounding retry loop can distinguish transient model overload from ordinary HTTP failures and apply its dedicated longer backoff (up to ``_MAX_503_RETRIES``). It is never raised or caught outside this module. Attributes: code (int): The offending HTTP status code (``503`` or ``529``). """ def __init__(self, code: int) -> None: """Record the overload status code on the exception. Stores ``code`` on the instance and builds a ``HTTP <code>`` message for the base :class:`Exception`, letting the retry loop read back the status via the ``code`` attribute. Constructed only inside :meth:`ProactiveTriageAI.should_interject` when the proxy reports overload. Args: code (int): The HTTP status code that triggered the overload signal (``503`` or ``529``). """ self.code = code super().__init__(f"HTTP {code}")