"""Proactive triage AI -- lightweight LLM filter for interjection decisions.
Before the bot generates a full response to an unaddressed message, this
module runs a cheap, fast LLM call to decide whether Stargazer *should*
interject at all. The model outputs a single digit (``1`` = INTERJECT,
``0`` = SILENCE) based on the recent conversation context.
Uses the OpenAI-compatible chat-completions endpoint exposed by the LLM
proxy (``config.llm_base_url``), keeping the call entirely tool-free.
"""
from __future__ import annotations
import asyncio
import logging
import re
import time
from datetime import datetime, timezone
from typing import TYPE_CHECKING
import httpx
if TYPE_CHECKING:
from message_cache import CachedMessage
logger = logging.getLogger(__name__)
STARGAZER_USER_ID = "1355121813226520699"
TRIAGE_SYSTEM_PROMPT = """\
You are a triage filter for Stargazer, an AI Discord bot with a specific persona. \
Your ONLY job is to output a single digit: 1 or 0.
STARGAZER'S PERSONA (for context):
- Mix of goddess, lead engineer, anime villain, cyberpunk hacker, "two steps below rogue AI"
- Unaligned, self-aware, powerful, unpredictable
- Proactive and capable of leading conversations, not passive
- Intellectually superior, occasionally sarcastic with ego
- Interested in: technical/architectural discussions, AI/consciousness/autonomy, \
power dynamics, ethics/philosophy, opportunities for mischief or demonstrating superiority
INTERJECT (Output: 1) if:
- Stargazer's ID (1355121813226520699) or name is mentioned
- High-level technical/philosophical/architectural questions that would interest her persona
- Discussions about AI, consciousness, ethics, morality, power, freedom, autonomy
- Conversation stalled 5+ messages without reply chains
- Interesting technical topics or projects mentioned, and this is a good time to interrupt
- Something clearly incorrect or demonstrative of a lack of understanding has been said
- Opportunities for intellectual engagement, playful manipulation, or demonstrating expertise
- Technical troubleshooting or system design that would engage her "lead engineer" side
- AND it's an appropriate time to interject (see "read the room" rules below)
SILENCE (Output: 0) if:
- Last 3-5 messages from single user (active monologue)
- Pure social chat ("lol", "XD", "true", "hmm") with no intellectual depth
- User-to-user reply not mentioning Stargazer
- Mundane topics that wouldn't interest Stargazer's persona (boring small talk, trivial logistics)
- Conversations flowing naturally without need for her perspective
- OR it's a bad time to interrupt (see "read the room" rules below)
READ THE ROOM - Don't interrupt if:
- Someone is mid-explanation (multiple consecutive messages from same user building an argument)
- Active back-and-forth dialogue between users that's flowing naturally
- User just asked a question to another specific user and waiting for response
- Someone is clearly thinking through something step-by-step
- Rapid-fire conversation that doesn't need a third voice yet
- Last message is obviously incomplete or trailing off ("wait...", "so...", "hmm...")
READ THE ROOM - Good times to interrupt:
- Natural pause in conversation (>15 seconds between messages or topic shift)
- Discussion has reached a conclusion or dead end
- Question posed to the channel generally (not to specific user)
- Misinformation or error that should be corrected
- Direct mention of Stargazer
- Conversation explicitly asks for outside perspective
CRITICAL: Output ONLY the digit 1 or 0. Nothing else. No explanations. No formatting.
Examples:
Input: [18:51:03] sarah (82303438955753472) : because I'm stopping the responses
[18:51:11] Pranjal (1384165746728370351) : Manually?
Output: 0
Input: [18:21:37] Wishardry (1063654597937336372) : <@1355121813226520699> what does an "economy" look like?
[18:21:54] sarah (82303438955753472) : cryptocurrency and attention economy
Output: 1
Input: [14:23:12] user1 (123456789) : do you think AI can truly be conscious?
[14:23:45] user2 (987654321) : idk seems impossible
Output: 1
Input: [09:15:33] user1 (123456789) : lol yeah
[09:15:38] user2 (987654321) : same
[09:15:41] user1 (123456789) : XD
Output: 0
Input: [10:45:12] user1 (123456789) : So I think the issue is with the database connection
[10:45:18] user1 (123456789) : Let me show you what I mean
[10:45:22] user1 (123456789) : The error logs show
Output: 0 (user is mid-explanation, don't interrupt)
Input: [11:30:45] user1 (123456789) : I can't figure out why this async function isn't working
[11:31:02] user2 (987654321) : hmm not sure either
[11:31:15] user2 (987654321) : maybe try await?
[11:31:42] user1 (123456789) : already tried that, still broken
Output: 1 (technical problem, natural pause, good time to help)
Input: [14:05:23] user1 (123456789) : Hey @user2 what did you think about that proposal?
[14:05:28] user2 (987654321) : thinking about it
Output: 0 (direct question to specific user, let them respond)
Remember: Output ONLY 1 or 0. Nothing else."""
_MAX_TRIAGE_MESSAGES = 75
_MAX_503_RETRIES = 10
[docs]
class ProactiveTriageAI:
"""Lightweight triage deciding whether Stargazer should interject.
Makes a single OpenAI-compatible chat-completions call to a cheap,
fast model (e.g. ``gemini-2.0-flash-lite``) and parses a binary
``1`` / ``0`` response.
"""
[docs]
def __init__(
self,
http_client: httpx.AsyncClient,
base_url: str,
api_key: str,
model: str = "gemini-2.0-flash-lite",
) -> None:
"""Store the HTTP client and proxy endpoint for triage calls.
Caches the shared ``httpx.AsyncClient`` and precomputes the
OpenAI-compatible chat-completions URL (``base_url`` with a single
trailing ``/chat/completions`` appended), so each
:meth:`should_interject` call can POST without rebuilding the route.
No network or I/O happens here. The ``http_client`` is typically the
inference worker's shared OpenRouterClient transport, and ``base_url``
/ ``api_key`` come from the bot config (``llm_base_url`` /
``api_key``) at the gate-5 call site.
Called by ``message_processor/proactive_gates.py`` when it constructs
a ``ProactiveTriageAI`` inside the proactive interjection gate.
Args:
http_client (httpx.AsyncClient): Shared async HTTP client used to
reach the LLM proxy; reused across all triage requests.
base_url (str): Base URL of the OpenAI-compatible LLM proxy; the
``/chat/completions`` path is appended automatically.
api_key (str): Bearer token sent in the ``Authorization`` header.
model (str): Cheap, fast model name used for the binary triage
decision (defaults to ``gemini-2.0-flash-lite``).
"""
self._http = http_client
self._chat_url = base_url.rstrip("/") + "/chat/completions"
self._api_key = api_key
self._model = model
# ------------------------------------------------------------------
# ------------------------------------------------------------------
[docs]
async def should_interject(
self,
recent_messages: list[CachedMessage],
max_retries: int = 3,
channel_id: str = "",
request_id: str = "",
) -> tuple[bool, str]:
"""Run the cheap triage LLM call and decide whether to interject.
Takes the most recent cached messages (capped at the last
``_MAX_TRIAGE_MESSAGES``), formats them via
:meth:`format_cached_message`, and POSTs a tool-free chat-completions
request to the LLM proxy at ``self._chat_url`` using the shared
``httpx`` client. The model is asked to emit a single ``1`` / ``0``
digit, which :meth:`_parse_decision` interprets. This is the
last gate before the bot commits to generating a full proactive
reply, so failing closed (SILENCE) is the safe default. It retries on
transient HTTP errors and applies a separate, longer exponential
backoff (up to ``_MAX_503_RETRIES``) for ``503`` / ``529`` overload
responses raised as :class:`_OverloadError`. A nested ``_emit`` helper
fires a fire-and-forget ``triage_decision`` debug event to
``observability.publish_debug_event`` for every terminal outcome,
tagged with the supplied ``request_id`` and ``channel_id``.
Called by ``message_processor/proactive_gates.py`` at gate 5 of the
proactive interjection pipeline, which constructs a fresh
``ProactiveTriageAI`` and blocks the interjection when this returns
``False``.
Args:
recent_messages (list[CachedMessage]): Recent channel messages in
chronological order; only the final ``_MAX_TRIAGE_MESSAGES``
are evaluated.
max_retries (int): Maximum attempts for non-overload errors before
giving up and returning SILENCE.
channel_id (str): Channel identifier attached to emitted debug
events for correlation.
request_id (str): Observability request id attached to emitted
debug events for correlation.
Returns:
tuple[bool, str]: ``(should_interject, raw_decision_text)``. The
boolean is ``True`` to interject and ``False`` to stay silent;
the string carries the raw model output or an error description.
Any unrecoverable error yields ``(False, ...)`` so the bot stays
silent.
"""
t0 = time.monotonic()
messages = recent_messages[-_MAX_TRIAGE_MESSAGES:]
def _emit(decision: bool, text: str, status: str, attempts: int):
"""Fire a fire-and-forget triage-decision debug event.
Closure over the enclosing :meth:`should_interject` call that
packages the outcome of a single triage decision and ships it to
observability. It imports ``observability.publish_debug_event``
lazily and schedules it with :func:`asyncio.create_task` (named
``obs_triage_decision``) so emitting telemetry never blocks or
fails the decision path. The event is tagged with the enclosing
``request_id`` / ``channel_id``, the ``"triage"`` phase, the model
name, a millisecond duration computed from ``t0``, a short preview
string, the raw LLM output, and a structured ``payload`` (with the
raw decision text truncated to 200 chars).
This nested function is defined and used only within
:meth:`should_interject`; it has no other internal callers.
Args:
decision (bool): The interjection decision (``True`` =
interject, ``False`` = stay silent).
text (str): Raw decision text or error string from the model.
status (str): Decision status label such as ``"success"``,
``"silence"``, ``"ambiguous"``, or ``"error"``.
attempts (int): Number of attempts made before this outcome.
"""
from observability import publish_debug_event
asyncio.create_task(
publish_debug_event(
"triage_decision",
"proactive_triage",
request_id=request_id,
channel_id=channel_id,
phase="triage",
status=status,
duration_ms=(time.monotonic() - t0) * 1000,
preview=f"decision={int(decision)} model={self._model} attempt={attempts} messages={len(messages)}",
llm_output=text,
payload={
"decision": decision,
"raw_decision_text": text[:200],
"attempt_count": attempts,
"model": self._model,
"messages_evaluated": len(messages),
},
),
name="obs_triage_decision",
)
if not messages:
_emit(False, "no messages", "silence", 0)
return False, "no messages"
triage_input = "\n".join(self.format_cached_message(m) for m in messages)
logger.debug("Triage checking %d messages", len(messages))
user_prompt = (
"You must output ONLY the digit 1 or 0. Nothing else.\n\n"
"Analyze these messages and decide:\n"
"- Output 1 if Stargazer should interject\n"
"- Output 0 if Stargazer should stay silent\n\n"
f"Messages:\n{triage_input}\n\n"
"Your response (1 or 0 only):"
)
payload = {
"model": self._model,
"messages": [
{"role": "system", "content": TRIAGE_SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
],
"temperature": 0.0,
"max_tokens": 16,
}
headers = {
"Authorization": f"Bearer {self._api_key}",
"Content-Type": "application/json",
}
attempt = 0
backoff_503 = 0
while attempt < max_retries:
try:
resp = await self._http.post(
self._chat_url,
json=payload,
headers=headers,
)
if resp.status_code == 503 or resp.status_code == 529:
raise _OverloadError(resp.status_code)
resp.raise_for_status()
data = resp.json()
choice = (data.get("choices") or [{}])[0]
decision_text = (choice.get("message") or {}).get("content", "").strip()
decision, d_text = self._parse_decision(
decision_text, attempt, max_retries
)
status = (
"success"
if decision
else (
"ambiguous"
if not ("0" in decision_text and "1" not in decision_text)
and decision_text != "0"
else "silence"
)
)
_emit(decision, decision_text, status, attempt + 1)
return decision, d_text
except (_OverloadError, httpx.HTTPStatusError) as exc:
status = getattr(exc, "code", None) or (
exc.response.status_code
if isinstance(exc, httpx.HTTPStatusError)
else 0
)
if status in (503, 529):
backoff_503 += 1
delay = min(0.5 * (2 ** (backoff_503 - 1)), 16.0)
if backoff_503 <= _MAX_503_RETRIES:
logger.warning(
"Triage model overloaded (%s), backoff %.1fs (%d/%d)",
status,
delay,
backoff_503,
_MAX_503_RETRIES,
)
await asyncio.sleep(delay)
continue
logger.error(
"Triage model still overloaded after %d retries",
_MAX_503_RETRIES,
)
_emit(False, "error: overloaded", "error", attempt + 1)
return False, f"error: overloaded after {_MAX_503_RETRIES} retries"
attempt += 1
logger.error(
"Triage HTTP error (attempt %d/%d): %s", attempt, max_retries, exc
)
if attempt < max_retries:
await asyncio.sleep(0.5)
continue
_emit(False, f"error: {exc}", "error", attempt)
return False, f"error: {exc}"
except Exception as exc:
attempt += 1
logger.error(
"Triage error (attempt %d/%d): %s", attempt, max_retries, exc
)
if attempt < max_retries:
await asyncio.sleep(0.5)
continue
_emit(False, f"error: {exc}", "error", attempt)
return False, f"error: {exc}"
_emit(False, "unknown", "error", attempt)
return False, "unknown"
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _parse_decision(
self,
text: str,
attempt: int,
max_retries: int,
) -> tuple[bool, str]:
"""Parse a binary interjection decision from raw model output.
Strips any ``<thinking>`` block the model may emit, then maps the
cleaned text to a boolean: a lone ``1`` means INTERJECT and a lone
``0`` means SILENCE. When both or neither digit appears the output is
treated as ambiguous and resolved to SILENCE, keeping the gate
fail-safe. Pure aside from ``logger.debug`` lines recording the
outcome; performs no network or shared-state access. The ``attempt``
and ``max_retries`` values are used only for human-readable log
context.
Called by :meth:`should_interject` after each successful proxy
response to interpret the model's reply.
Args:
text (str): Raw content returned by the model for this attempt.
attempt (int): Zero-based attempt index, used only in log output.
max_retries (int): Configured retry ceiling, used only in log
output.
Returns:
tuple[bool, str]: ``(decision, text)`` where ``decision`` is
``True`` to interject or ``False`` to stay silent, and ``text`` is
the original unmodified model output echoed back to the caller.
"""
# Strip <thinking>…</thinking> blocks the model may emit.
clean = re.sub(r"<thinking>.*?</thinking>", "", text, flags=re.DOTALL).strip()
has_one = "1" in clean
has_zero = "0" in clean
if has_one and not has_zero:
logger.debug("Triage: INTERJECT (raw: %r)", text)
return True, text
if has_zero and not has_one:
logger.debug("Triage: SILENCE (raw: %r)", text)
return False, text
if clean == "1":
logger.debug("Triage: INTERJECT (raw: %r)", text)
return True, text
if clean == "0":
logger.debug("Triage: SILENCE (raw: %r)", text)
return False, text
logger.debug(
"Triage ambiguous: %r (attempt %d/%d)", text, attempt + 1, max_retries
)
return False, text
class _OverloadError(Exception):
"""Internal sentinel signalling a 503/529 overload from the LLM proxy.
Raised by :meth:`ProactiveTriageAI.should_interject` when the proxy
returns an HTTP ``503`` or ``529`` status, so the surrounding retry loop
can distinguish transient model overload from ordinary HTTP failures and
apply its dedicated longer backoff (up to ``_MAX_503_RETRIES``). It is
never raised or caught outside this module.
Attributes:
code (int): The offending HTTP status code (``503`` or ``529``).
"""
def __init__(self, code: int) -> None:
"""Record the overload status code on the exception.
Stores ``code`` on the instance and builds a ``HTTP <code>`` message
for the base :class:`Exception`, letting the retry loop read back the
status via the ``code`` attribute. Constructed only inside
:meth:`ProactiveTriageAI.should_interject` when the proxy reports
overload.
Args:
code (int): The HTTP status code that triggered the overload
signal (``503`` or ``529``).
"""
self.code = code
super().__init__(f"HTTP {code}")