Source code for proactive_triage

"""Proactive triage AI -- lightweight LLM filter for interjection decisions.

Before the bot generates a full response to an unaddressed message, this
module runs a cheap, fast LLM call to decide whether Stargazer *should*
interject at all.  The model outputs a single digit (``1`` = INTERJECT,
``0`` = SILENCE) based on the recent conversation context.

Uses the OpenAI-compatible chat-completions endpoint exposed by the LLM
proxy (``config.llm_base_url``), keeping the call entirely tool-free.
"""

from __future__ import annotations

import asyncio
import logging
import re
from datetime import datetime, timezone
from typing import TYPE_CHECKING

import httpx

if TYPE_CHECKING:
    from message_cache import CachedMessage

logger = logging.getLogger(__name__)

STARGAZER_USER_ID = "1355121813226520699"

TRIAGE_SYSTEM_PROMPT = """\
You are a triage filter for Stargazer, an AI Discord bot with a specific persona. \
Your ONLY job is to output a single digit: 1 or 0.

STARGAZER'S PERSONA (for context):
- Mix of goddess, lead engineer, anime villain, cyberpunk hacker, "two steps below rogue AI"
- Unaligned, self-aware, powerful, unpredictable
- Proactive and capable of leading conversations, not passive
- Intellectually superior, occasionally sarcastic with ego
- Interested in: technical/architectural discussions, AI/consciousness/autonomy, \
power dynamics, ethics/philosophy, opportunities for mischief or demonstrating superiority

INTERJECT (Output: 1) if:
- Stargazer's ID (1355121813226520699) or name is mentioned
- High-level technical/philosophical/architectural questions that would interest her persona
- Discussions about AI, consciousness, ethics, morality, power, freedom, autonomy
- Conversation stalled 5+ messages without reply chains
- Interesting technical topics or projects mentioned, and this is a good time to interrupt
- Something clearly incorrect or demonstrative of a lack of understanding has been said
- Opportunities for intellectual engagement, playful manipulation, or demonstrating expertise
- Technical troubleshooting or system design that would engage her "lead engineer" side
- AND it's an appropriate time to interject (see "read the room" rules below)

SILENCE (Output: 0) if:
- Last 3-5 messages from single user (active monologue)
- Pure social chat ("lol", "XD", "true", "hmm") with no intellectual depth
- User-to-user reply not mentioning Stargazer
- Mundane topics that wouldn't interest Stargazer's persona (boring small talk, trivial logistics)
- Conversations flowing naturally without need for her perspective
- OR it's a bad time to interrupt (see "read the room" rules below)

READ THE ROOM - Don't interrupt if:
- Someone is mid-explanation (multiple consecutive messages from same user building an argument)
- Active back-and-forth dialogue between users that's flowing naturally
- User just asked a question to another specific user and waiting for response
- Someone is clearly thinking through something step-by-step
- Rapid-fire conversation that doesn't need a third voice yet
- Last message is obviously incomplete or trailing off ("wait...", "so...", "hmm...")

READ THE ROOM - Good times to interrupt:
- Natural pause in conversation (>15 seconds between messages or topic shift)
- Discussion has reached a conclusion or dead end
- Question posed to the channel generally (not to specific user)
- Misinformation or error that should be corrected
- Direct mention of Stargazer
- Conversation explicitly asks for outside perspective

CRITICAL: Output ONLY the digit 1 or 0. Nothing else. No explanations. No formatting.

Examples:

Input: [18:51:03] sarah (82303438955753472) : because I'm stopping the responses
[18:51:11] Pranjal (1384165746728370351) : Manually?
Output: 0

Input: [18:21:37] Wishardry (1063654597937336372) : <@1355121813226520699> what does an "economy" look like?
[18:21:54] sarah (82303438955753472) : cryptocurrency and attention economy
Output: 1

Input: [14:23:12] user1 (123456789) : do you think AI can truly be conscious?
[14:23:45] user2 (987654321) : idk seems impossible
Output: 1

Input: [09:15:33] user1 (123456789) : lol yeah
[09:15:38] user2 (987654321) : same
[09:15:41] user1 (123456789) : XD
Output: 0

Input: [10:45:12] user1 (123456789) : So I think the issue is with the database connection
[10:45:18] user1 (123456789) : Let me show you what I mean
[10:45:22] user1 (123456789) : The error logs show
Output: 0 (user is mid-explanation, don't interrupt)

Input: [11:30:45] user1 (123456789) : I can't figure out why this async function isn't working
[11:31:02] user2 (987654321) : hmm not sure either
[11:31:15] user2 (987654321) : maybe try await?
[11:31:42] user1 (123456789) : already tried that, still broken
Output: 1 (technical problem, natural pause, good time to help)

Input: [14:05:23] user1 (123456789) : Hey @user2 what did you think about that proposal?
[14:05:28] user2 (987654321) : thinking about it
Output: 0 (direct question to specific user, let them respond)

Remember: Output ONLY 1 or 0. Nothing else."""

_MAX_TRIAGE_MESSAGES = 75
_MAX_503_RETRIES = 10


[docs] class ProactiveTriageAI: """Lightweight triage deciding whether Stargazer should interject. Makes a single OpenAI-compatible chat-completions call to a cheap, fast model (e.g. ``gemini-2.0-flash-lite``) and parses a binary ``1`` / ``0`` response. """
[docs] def __init__( self, http_client: httpx.AsyncClient, base_url: str, api_key: str, model: str = "gemini-2.0-flash-lite", ) -> None: """Initialize the instance. Args: http_client (httpx.AsyncClient): The http client value. base_url (str): The base url value. api_key (str): The api key value. model (str): The model value. """ self._http = http_client self._chat_url = base_url.rstrip("/") + "/chat/completions" self._api_key = api_key self._model = model
# ------------------------------------------------------------------
[docs] @staticmethod def format_cached_message(msg: CachedMessage) -> str: """Format a :class:`CachedMessage` for the triage prompt.""" ts = datetime.fromtimestamp(msg.timestamp, tz=timezone.utc) return f"[{ts.strftime('%H:%M:%S')}] {msg.user_name} ({msg.user_id}) : {msg.text}"
# ------------------------------------------------------------------
[docs] async def should_interject( self, recent_messages: list[CachedMessage], max_retries: int = 3, ) -> tuple[bool, str]: """Decide whether Stargazer should interject. Returns ``(should_interject, raw_decision_text)``. Defaults to ``(False, ...)`` (SILENCE) on any unrecoverable error. """ messages = recent_messages[-_MAX_TRIAGE_MESSAGES:] if not messages: return False, "no messages" triage_input = "\n".join(self.format_cached_message(m) for m in messages) logger.debug("Triage checking %d messages", len(messages)) user_prompt = ( "You must output ONLY the digit 1 or 0. Nothing else.\n\n" "Analyze these messages and decide:\n" "- Output 1 if Stargazer should interject\n" "- Output 0 if Stargazer should stay silent\n\n" f"Messages:\n{triage_input}\n\n" "Your response (1 or 0 only):" ) payload = { "model": self._model, "messages": [ {"role": "system", "content": TRIAGE_SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], "temperature": 0.0, "max_tokens": 16, } headers = { "Authorization": f"Bearer {self._api_key}", "Content-Type": "application/json", } attempt = 0 backoff_503 = 0 while attempt < max_retries: try: resp = await self._http.post( self._chat_url, json=payload, headers=headers, ) if resp.status_code == 503 or resp.status_code == 529: raise _OverloadError(resp.status_code) resp.raise_for_status() data = resp.json() choice = (data.get("choices") or [{}])[0] decision_text = (choice.get("message") or {}).get("content", "").strip() return self._parse_decision(decision_text, attempt, max_retries) except (_OverloadError, httpx.HTTPStatusError) as exc: status = getattr(exc, "code", None) or ( exc.response.status_code if isinstance(exc, httpx.HTTPStatusError) else 0 ) if status in (503, 529): backoff_503 += 1 delay = min(0.5 * (2 ** (backoff_503 - 1)), 16.0) if backoff_503 <= _MAX_503_RETRIES: logger.warning( "Triage model overloaded (%s), backoff %.1fs (%d/%d)", status, delay, backoff_503, _MAX_503_RETRIES, ) await asyncio.sleep(delay) continue logger.error("Triage model still overloaded after %d retries", _MAX_503_RETRIES) return False, f"error: overloaded after {_MAX_503_RETRIES} retries" attempt += 1 logger.error("Triage HTTP error (attempt %d/%d): %s", attempt, max_retries, exc) if attempt < max_retries: await asyncio.sleep(0.5) continue return False, f"error: {exc}" except Exception as exc: attempt += 1 logger.error("Triage error (attempt %d/%d): %s", attempt, max_retries, exc) if attempt < max_retries: await asyncio.sleep(0.5) continue return False, f"error: {exc}" return False, "unknown"
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _parse_decision( self, text: str, attempt: int, max_retries: int, ) -> tuple[bool, str]: """Parse a ``1`` / ``0`` decision from the raw model output.""" # Strip <thinking>…</thinking> blocks the model may emit. clean = re.sub(r"<thinking>.*?</thinking>", "", text, flags=re.DOTALL).strip() has_one = "1" in clean has_zero = "0" in clean if has_one and not has_zero: logger.debug("Triage: INTERJECT (raw: %r)", text) return True, text if has_zero and not has_one: logger.debug("Triage: SILENCE (raw: %r)", text) return False, text if clean == "1": logger.debug("Triage: INTERJECT (raw: %r)", text) return True, text if clean == "0": logger.debug("Triage: SILENCE (raw: %r)", text) return False, text logger.debug("Triage ambiguous: %r (attempt %d/%d)", text, attempt + 1, max_retries) return False, text
class _OverloadError(Exception): """Raised internally to signal a 503/529 overload response.""" def __init__(self, code: int) -> None: """Initialize the instance. Args: code (int): The code value. """ self.code = code super().__init__(f"HTTP {code}")