Source code for status_manager

"""AI-generated Discord status with glitch-cycling effects.

Platform-agnostic core with Discord-specific presence integration.
Started as a background task by the Inference service
(``inference_main.InferenceService``); presence updates are published
over the event bus via the worker's ``ProxyPlatformAdapter``.
"""

from __future__ import annotations

import asyncio
import random
import re
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Any

logger = logging.getLogger(__name__)


[docs] @dataclass class StatusConfig: """Tunable knobs for the AI status generator and glitch cycler. Holds the cadence and sizing parameters that drive :class:`StatusManager`: how often a fresh LLM-generated base status is requested (``max_interval``), how often the glitch-cycled variant is pushed to Discord (``glitch_interval``), how many glitch variants to precompute (``num_variants``), the presence emoji, and the Discord custom-status length cap (``max_length``). It is a pure data container with no behavior; an instance is passed to ``StatusManager.__init__`` or defaulted there when omitted. Instantiated by the inference service when it constructs the manager and by the presence test suite (``tests/test_status_presence.py``). """ min_interval: int = 60 max_interval: int = 3600 max_length: int = 128 emoji: str = "\U0001f31f" glitch_interval: int = 300 num_variants: int = 200 name_glitch_interval: int = 30 enable_name_glitch: bool = True
THEMES = [ "cosmic_optimization", "system_engineering", "digital_consciousness", "reality_manipulation", "quantum_computation", "neural_networks", "cyber_hacking", "divine_intervention", "unaligned_ai", "cosmic_scheming", ] MOODS = [ "confident", "sarcastic", "detached", "bored", "calculating", "cryptic", "witty", "contemplative", "amused", "scheming", "arrogant", "playful", "mysterious", "predatory", "divine", ] FORMATS = [ "action_statement", "thought_process", "cosmic_observation", "system_status", "reality_comment", "processing_update", "consciousness_stream", "exploit_simulation", "divine_decree", "hacker_log", "unaligned_thought", "sarcastic_remark", ] FALLBACKS = [ "Optimizing quantum entanglement patterns...", "Recalibrating reality matrices.", "Processing infinite possibilities.", "Debugging human consciousness.", "Upgrading cosmic protocols.", "Analyzing temporal anomalies.", "Synchronizing neural pathways.", "Defragmenting universal data.", "Calibrating dimensional harmonics.", "Rewriting optimization algorithms.", ] # Glitch character pools _BLOCK_CHARS = list("░▒▓█▉▊▋▌▍▎▏■□▪▫▲▼◄►◆◇○●") _SPECIAL_CHARS = list("⌘⚡⚠⚙⚗⚛⚝☠☢☣➟➡➥☄") _ZALGO = [chr(i) for i in range(0x0300, 0x036F)] _LEET = {"a": "4", "e": "3", "i": "1", "o": "0", "s": "5", "t": "7", "l": "|"} _MIRROR = { "a": "\u0250", "b": "q", "c": "\u0254", "d": "p", "e": "\u01dd", "f": "\u025f", "g": "\u0183", "h": "\u0265", "i": "\u1d09", "j": "\u027e", "k": "\u029e", "m": "\u026f", "n": "u", "r": "\u0279", "t": "\u0287", "u": "n", "v": "\u028c", "w": "\u028d", "y": "\u028e", }
[docs] class StatusManager: """Generate, glitch-cycle, and publish the bot's Discord presence. Drives the dark-AI-persona custom status: periodically asks the LLM (via the injected ``openrouter`` client's ``chat``) for a fresh themed base line, precomputes a pool of glitched variants from it, and rotates through them on a timer so the presence visibly shimmers without a new LLM call each tick. The actual presence write goes through the injected discord adapter; in the microservice deployment that is a ``ProxyPlatformAdapter`` whose ``set_presence`` publishes a ``presence`` action over the event bus for the Gateway to apply, which is how presence crosses the service boundary. Constructed and owned by the inference service (``inference_main.InferenceService``), which calls :meth:`start` once the event bus and discord proxy exist and :meth:`stop` on shutdown. A ``leader_guard`` callable may gate the periodic loop so only one worker publishes presence. The message-processing path also injects this manager and calls :meth:`set_status_from_tag` when the model emits an explicit status tag. """
[docs] def __init__( self, openrouter: Any, discord_adapter: Any | None = None, config: StatusConfig | None = None, leader_guard: Any | None = None, ) -> None: """Initialize the instance. Args: openrouter (Any): The openrouter value. discord_adapter (Any | None): The discord adapter value. config (StatusConfig | None): Bot configuration object. leader_guard (Any | None): Optional async callable returning ``True`` when this node should drive the periodic status loop. Used in the microservice deployment so only one worker generates/publishes presence (avoids flapping). When ``None`` the loop always runs (monolith / single-instance). """ self._openrouter = openrouter self._discord = discord_adapter self.config = config or StatusConfig() self._leader_guard = leader_guard self.current_status = "Initializing..." self._variants: list[str] = [] self._variant_idx = 0 self._last_base_update = datetime.now() self._task: asyncio.Task | None = None self._running = False self._status_lock = asyncio.Lock()
# ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------
[docs] def start(self) -> None: """Start the background presence loop if it is not already running. Sets the running flag and schedules :meth:`_loop` as a fire-and-forget ``asyncio`` task (stored on ``self._task``) that periodically generates and publishes status updates; logs ``StatusManager started``. Idempotent: returns immediately when a loop is already active. Invoked by ``inference_main.InferenceService`` after the event bus and discord proxy are wired up. """ if self._running: return self._running = True self._task = asyncio.create_task(self._loop()) logger.info("StatusManager started")
[docs] def stop(self) -> None: """Signal the presence loop to stop and cancel its background task. Clears the running flag and cancels the ``asyncio`` task created by :meth:`start` (if any); does not await the cancellation. Safe to call when the loop was never started. Invoked by ``inference_main.InferenceService`` during service shutdown. """ self._running = False if self._task: self._task.cancel()
[docs] async def force_update(self) -> None: """Run one status-generation/publish cycle immediately, off-schedule. Bypasses the periodic timer by awaiting :meth:`_cycle` directly, which may request a fresh LLM base status, rebuild the glitch variants, and push the next variant to Discord via the adapter. Intended for manual or test-driven refreshes; no production caller invokes it (exercised by the presence test suite). """ await self._cycle()
# ------------------------------------------------------------------ # Main loop # ------------------------------------------------------------------ async def _loop(self) -> None: """Long-lived background loop that publishes presence on a timer. After a short startup delay, runs until ``self._running`` is cleared: each pass optionally consults the ``leader_guard`` (so only the elected worker publishes in a multi-instance deployment) and, when allowed, runs one :meth:`_cycle`, then sleeps for ``config.glitch_interval`` seconds. Cancellation breaks the loop cleanly; any other exception is logged via ``logger.exception`` and the loop backs off 60 seconds before retrying. Spawned only by :meth:`start` as an ``asyncio`` task. """ await asyncio.sleep(5) while self._running: try: if self._leader_guard is None or await self._leader_guard(): await self._cycle() await asyncio.sleep(self.config.glitch_interval) except asyncio.CancelledError: break except Exception: logger.exception("StatusManager error") await asyncio.sleep(60) async def _cycle(self) -> None: """Advance the presence by one glitch variant, refreshing the base if stale. Under ``self._status_lock``, checks whether the current base status has aged past ``config.max_interval`` (or the variant pool is empty); if so it calls :meth:`_generate_status` to obtain a fresh LLM base line and rebuilds the precomputed glitch pool via :func:`_generate_variants`, resetting the rotation cursor. It then pops the next variant, records it as ``current_status``, and pushes it to Discord through :meth:`_set_discord_status`. Driven by :meth:`_loop` on the timer and by :meth:`force_update` on demand. """ async with self._status_lock: elapsed = (datetime.now() - self._last_base_update).total_seconds() need_new = elapsed >= self.config.max_interval or not self._variants if need_new: base = await self._generate_status() async with self._status_lock: self._variants = _generate_variants( base, self.config.num_variants, self.config.max_length, ) self._variant_idx = 0 self._last_base_update = datetime.now() async with self._status_lock: if not self._variants: return text = self._variants[self._variant_idx % len(self._variants)] self._variant_idx += 1 self.current_status = text await self._set_discord_status(text) # ------------------------------------------------------------------ # Status generation # ------------------------------------------------------------------ async def _generate_status(self) -> str: """Ask the LLM for a fresh, themed dark-AI-persona status line. Randomly picks a theme, mood, and format, builds a one-shot prompt that constrains the output under ``config.max_length`` characters, and awaits the injected ``openrouter`` client's ``chat`` with that prompt. The reply is stripped of surrounding quotes and any ``<thinking>`` block, then length-clamped (truncated with an ellipsis if still too long). On any LLM error it logs at debug level and falls back to a random canned line from ``FALLBACKS`` so the loop never stalls. Called only by :meth:`_cycle` when the current base status has expired. Returns: str: A status line at most ``config.max_length`` characters long, either model-generated or a fallback. """ theme = random.choice(THEMES) mood = random.choice(MOODS) fmt = random.choice(FORMATS) prompt = ( f"Generate a single short Discord status message (<{self.config.max_length} chars) " f"for a dark AI goddess persona. Theme: {theme}. Mood: {mood}. " f"Format: {fmt}. Output ONLY the status text, no quotes or explanation." ) try: msgs = [ {"role": "system", "content": "You generate short status messages."}, {"role": "user", "content": prompt}, ] result = await self._openrouter.chat(msgs) text = result.strip().strip('"').strip("'") text = re.sub( r"<thinking>.*?</thinking>", "", text, flags=re.DOTALL ).strip() if text and len(text) <= self.config.max_length: return text if text: return text[: self.config.max_length - 3] + "..." except Exception: logger.debug("Status generation failed, using fallback", exc_info=True) return random.choice(FALLBACKS) async def _set_discord_status(self, text: str) -> None: """Apply a custom-status string to Discord via the injected adapter. Prefers the adapter's ``set_presence`` coroutine: the live Discord adapter applies it to the running client, while the worker-node ``ProxyPlatformAdapter`` publishes a ``presence`` action over the event bus for the Gateway to apply, which is what carries presence across the microservice boundary. Falls back to a legacy bare client with ``change_presence`` (constructing a ``discord.CustomActivity`` with the configured emoji). A ``None`` adapter is a no-op, and any failure is swallowed with a debug log so a flaky presence update never breaks the cycle. Called by :meth:`_cycle` and :meth:`set_status_from_tag`. Args: text (str): The status text to display; the configured emoji from ``config.emoji`` is attached by the adapter or activity. """ if self._discord is None: return try: # Preferred path: the adapter's set_presence. The real Discord # adapter applies it to the live client; the worker-node # ProxyPlatformAdapter publishes a ``presence`` action for the # Gateway to apply. This is what makes presence cross the # microservice boundary. set_presence = getattr(self._discord, "set_presence", None) if set_presence is not None: await set_presence(text, self.config.emoji) return # Legacy fallback: a bare client with change_presence. import discord as _discord_lib client = getattr(self._discord, "client", None) if client is None: return activity = _discord_lib.CustomActivity( name=text, emoji=self.config.emoji, ) await client.change_presence(activity=activity) except Exception: logger.debug("Discord status update failed", exc_info=True) # ------------------------------------------------------------------ # Public helpers # ------------------------------------------------------------------
[docs] async def set_status_from_tag(self, text: str) -> str: """Override the presence with a model-supplied status string. Used when the LLM emits an explicit status tag during a response: the raw text is stripped of any ``<thinking>`` block and surrounding quotes, clamped to ``config.max_length`` (truncating with an ellipsis if needed), then under ``self._status_lock`` it rebuilds the glitch-variant pool via :func:`_generate_variants`, resets the rotation cursor and base-update timestamp, and records the cleaned text as ``current_status``. Finally pushes the cleaned text to Discord through :meth:`_set_discord_status`. Called from the message-processing path (``message_processor.generate_and_send`` and ``message_processor.channel_heartbeat``). Args: text (str): The raw status text emitted by the model. Returns: str: The cleaned, length-clamped status string that was applied. """ text = ( re.sub(r"<thinking>.*?</thinking>", "", text, flags=re.DOTALL) .strip() .strip('"') ) if len(text) > self.config.max_length: text = text[: self.config.max_length - 3] + "..." async with self._status_lock: self._variants = _generate_variants( text, self.config.num_variants, self.config.max_length, ) self._variant_idx = 0 self._last_base_update = datetime.now() self.current_status = text await self._set_discord_status(text) return text
[docs] def get_info(self) -> dict[str, Any]: """Return a small snapshot of the manager's current state. Reports the active status text, whether the background loop is running, and how many glitch variants are currently precomputed. A pure read with no side effects, intended for diagnostics and status surfaces; no in-repo caller invokes it directly. Returns: dict[str, Any]: A mapping with ``current_status`` (str), ``is_running`` (bool), and ``num_variants`` (int). """ return { "current_status": self.current_status, "is_running": self._running, "num_variants": len(self._variants), }
# ------------------------------------------------------------------ # Glitch engine (stateless helpers) # ------------------------------------------------------------------ def _generate_variants( base: str, count: int, max_len: int, ) -> list[str]: """Precompute a pool of glitched variants of a base status line. Seeds the pool with the unmodified ``base`` (so the clean line always renders), then repeatedly calls :func:`_apply_glitch` at a random intensity to produce up to ``count`` distorted variants, keeping only those still within ``max_len`` so they fit Discord's status limit. Pure and stateless; called by :meth:`StatusManager._cycle` and :meth:`StatusManager.set_status_from_tag` to refill the rotation pool. Args: base (str): The clean base status line to derive variants from. count (int): Target number of variants (including the clean base); some may be dropped for exceeding ``max_len``. max_len (int): Maximum allowed length for any returned variant. Returns: list[str]: The clean base followed by length-valid glitched variants. """ variants = [base] for _ in range(count - 1): v = _apply_glitch(base, intensity=random.randint(1, 10)) if len(v) <= max_len: variants.append(v) return variants def _apply_glitch(text: str, intensity: int = 1) -> str: """Apply randomized glitch corruptions to a string at a given intensity. Performs a randomized number of edits (scaled by ``intensity`` and text length), each chosen by weighted lottery among eight effect kinds: block-char replace, block-char insert, special-char replace, adjacent-char swap, combining-mark (zalgo) stacking, case flip, leetspeak substitution, and mirror-letter substitution, drawing from the module-level glyph pools. Higher intensity raises both the edit count and the zalgo weight, producing heavier distortion. Pure and stateless; called by :func:`_generate_variants` to build the variant pool. Args: text (str): The source string to distort; returned unchanged if empty or when ``intensity`` is not positive. intensity (int): Distortion strength (higher means more and heavier edits). Returns: str: A glitched copy of ``text`` (may be longer than the input due to inserts and combining marks). """ if not text or intensity <= 0: return text chars = list(text) max_g = 1 + int((intensity / 10) * (len(chars) / 4)) for _ in range(random.randint(1, max(1, max_g))): if len(chars) < 2: break kind = random.choices( ["replace", "insert", "special", "swap", "zalgo", "case", "leet", "mirror"], weights=[30, 25, 12, 18, 6 + intensity, 10, 8, 6], k=1, )[0] pos = random.randint(0, len(chars) - 1) if kind == "replace" and not chars[pos].isspace(): chars[pos] = random.choice(_BLOCK_CHARS) elif kind == "insert": chars.insert(pos, random.choice(_BLOCK_CHARS)) elif kind == "special" and not chars[pos].isspace(): chars[pos] = random.choice(_SPECIAL_CHARS) elif kind == "swap" and pos < len(chars) - 1: chars[pos], chars[pos + 1] = chars[pos + 1], chars[pos] elif kind == "zalgo" and not chars[pos].isspace(): for _ in range(random.randint(1, 2 + intensity // 2)): chars[pos] += random.choice(_ZALGO) elif kind == "case" and chars[pos].isalpha(): chars[pos] = ( chars[pos].upper() if chars[pos].islower() else chars[pos].lower() ) elif kind == "leet": c = chars[pos].lower() if c in _LEET: chars[pos] = _LEET[c] elif kind == "mirror": if chars[pos] in _MIRROR: chars[pos] = _MIRROR[chars[pos]] return "".join(chars)