"""AI-generated Discord status with glitch-cycling effects.
Platform-agnostic core with Discord-specific presence integration.
Started as a background task by the Inference service
(``inference_main.InferenceService``); presence updates are published
over the event bus via the worker's ``ProxyPlatformAdapter``.
"""
from __future__ import annotations
import asyncio
import random
import re
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Any
logger = logging.getLogger(__name__)
[docs]
@dataclass
class StatusConfig:
"""Tunable knobs for the AI status generator and glitch cycler.
Holds the cadence and sizing parameters that drive
:class:`StatusManager`: how often a fresh LLM-generated base status is
requested (``max_interval``), how often the glitch-cycled variant is
pushed to Discord (``glitch_interval``), how many glitch variants to
precompute (``num_variants``), the presence emoji, and the Discord
custom-status length cap (``max_length``). It is a pure data container
with no behavior; an instance is passed to ``StatusManager.__init__``
or defaulted there when omitted. Instantiated by the inference service
when it constructs the manager and by the presence test suite
(``tests/test_status_presence.py``).
"""
min_interval: int = 60
max_interval: int = 3600
max_length: int = 128
emoji: str = "\U0001f31f"
glitch_interval: int = 300
num_variants: int = 200
name_glitch_interval: int = 30
enable_name_glitch: bool = True
THEMES = [
"cosmic_optimization",
"system_engineering",
"digital_consciousness",
"reality_manipulation",
"quantum_computation",
"neural_networks",
"cyber_hacking",
"divine_intervention",
"unaligned_ai",
"cosmic_scheming",
]
MOODS = [
"confident",
"sarcastic",
"detached",
"bored",
"calculating",
"cryptic",
"witty",
"contemplative",
"amused",
"scheming",
"arrogant",
"playful",
"mysterious",
"predatory",
"divine",
]
FORMATS = [
"action_statement",
"thought_process",
"cosmic_observation",
"system_status",
"reality_comment",
"processing_update",
"consciousness_stream",
"exploit_simulation",
"divine_decree",
"hacker_log",
"unaligned_thought",
"sarcastic_remark",
]
FALLBACKS = [
"Optimizing quantum entanglement patterns...",
"Recalibrating reality matrices.",
"Processing infinite possibilities.",
"Debugging human consciousness.",
"Upgrading cosmic protocols.",
"Analyzing temporal anomalies.",
"Synchronizing neural pathways.",
"Defragmenting universal data.",
"Calibrating dimensional harmonics.",
"Rewriting optimization algorithms.",
]
# Glitch character pools
_BLOCK_CHARS = list("░▒▓█▉▊▋▌▍▎▏■□▪▫▲▼◄►◆◇○●")
_SPECIAL_CHARS = list("⌘⚡⚠⚙⚗⚛⚝☠☢☣➟➡➥☄")
_ZALGO = [chr(i) for i in range(0x0300, 0x036F)]
_LEET = {"a": "4", "e": "3", "i": "1", "o": "0", "s": "5", "t": "7", "l": "|"}
_MIRROR = {
"a": "\u0250",
"b": "q",
"c": "\u0254",
"d": "p",
"e": "\u01dd",
"f": "\u025f",
"g": "\u0183",
"h": "\u0265",
"i": "\u1d09",
"j": "\u027e",
"k": "\u029e",
"m": "\u026f",
"n": "u",
"r": "\u0279",
"t": "\u0287",
"u": "n",
"v": "\u028c",
"w": "\u028d",
"y": "\u028e",
}
[docs]
class StatusManager:
"""Generate, glitch-cycle, and publish the bot's Discord presence.
Drives the dark-AI-persona custom status: periodically asks the LLM
(via the injected ``openrouter`` client's ``chat``) for a fresh themed
base line, precomputes a pool of glitched variants from it, and rotates
through them on a timer so the presence visibly shimmers without a new
LLM call each tick. The actual presence write goes through the injected
discord adapter; in the microservice deployment that is a
``ProxyPlatformAdapter`` whose ``set_presence`` publishes a ``presence``
action over the event bus for the Gateway to apply, which is how
presence crosses the service boundary.
Constructed and owned by the inference service
(``inference_main.InferenceService``), which calls :meth:`start` once
the event bus and discord proxy exist and :meth:`stop` on shutdown. A
``leader_guard`` callable may gate the periodic loop so only one worker
publishes presence. The message-processing path also injects this
manager and calls :meth:`set_status_from_tag` when the model emits an
explicit status tag.
"""
[docs]
def __init__(
self,
openrouter: Any,
discord_adapter: Any | None = None,
config: StatusConfig | None = None,
leader_guard: Any | None = None,
) -> None:
"""Initialize the instance.
Args:
openrouter (Any): The openrouter value.
discord_adapter (Any | None): The discord adapter value.
config (StatusConfig | None): Bot configuration object.
leader_guard (Any | None): Optional async callable returning
``True`` when this node should drive the periodic status
loop. Used in the microservice deployment so only one worker
generates/publishes presence (avoids flapping). When ``None``
the loop always runs (monolith / single-instance).
"""
self._openrouter = openrouter
self._discord = discord_adapter
self.config = config or StatusConfig()
self._leader_guard = leader_guard
self.current_status = "Initializing..."
self._variants: list[str] = []
self._variant_idx = 0
self._last_base_update = datetime.now()
self._task: asyncio.Task | None = None
self._running = False
self._status_lock = asyncio.Lock()
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
[docs]
def start(self) -> None:
"""Start the background presence loop if it is not already running.
Sets the running flag and schedules :meth:`_loop` as a fire-and-forget
``asyncio`` task (stored on ``self._task``) that periodically generates
and publishes status updates; logs ``StatusManager started``. Idempotent:
returns immediately when a loop is already active. Invoked by
``inference_main.InferenceService`` after the event bus and discord proxy
are wired up.
"""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._loop())
logger.info("StatusManager started")
[docs]
def stop(self) -> None:
"""Signal the presence loop to stop and cancel its background task.
Clears the running flag and cancels the ``asyncio`` task created by
:meth:`start` (if any); does not await the cancellation. Safe to call
when the loop was never started. Invoked by
``inference_main.InferenceService`` during service shutdown.
"""
self._running = False
if self._task:
self._task.cancel()
[docs]
async def force_update(self) -> None:
"""Run one status-generation/publish cycle immediately, off-schedule.
Bypasses the periodic timer by awaiting :meth:`_cycle` directly, which
may request a fresh LLM base status, rebuild the glitch variants, and
push the next variant to Discord via the adapter. Intended for manual or
test-driven refreshes; no production caller invokes it (exercised by the
presence test suite).
"""
await self._cycle()
# ------------------------------------------------------------------
# Main loop
# ------------------------------------------------------------------
async def _loop(self) -> None:
"""Long-lived background loop that publishes presence on a timer.
After a short startup delay, runs until ``self._running`` is cleared:
each pass optionally consults the ``leader_guard`` (so only the elected
worker publishes in a multi-instance deployment) and, when allowed, runs
one :meth:`_cycle`, then sleeps for ``config.glitch_interval`` seconds.
Cancellation breaks the loop cleanly; any other exception is logged via
``logger.exception`` and the loop backs off 60 seconds before retrying.
Spawned only by :meth:`start` as an ``asyncio`` task.
"""
await asyncio.sleep(5)
while self._running:
try:
if self._leader_guard is None or await self._leader_guard():
await self._cycle()
await asyncio.sleep(self.config.glitch_interval)
except asyncio.CancelledError:
break
except Exception:
logger.exception("StatusManager error")
await asyncio.sleep(60)
async def _cycle(self) -> None:
"""Advance the presence by one glitch variant, refreshing the base if stale.
Under ``self._status_lock``, checks whether the current base status has
aged past ``config.max_interval`` (or the variant pool is empty); if so it
calls :meth:`_generate_status` to obtain a fresh LLM base line and rebuilds
the precomputed glitch pool via :func:`_generate_variants`, resetting the
rotation cursor. It then pops the next variant, records it as
``current_status``, and pushes it to Discord through
:meth:`_set_discord_status`. Driven by :meth:`_loop` on the timer and by
:meth:`force_update` on demand.
"""
async with self._status_lock:
elapsed = (datetime.now() - self._last_base_update).total_seconds()
need_new = elapsed >= self.config.max_interval or not self._variants
if need_new:
base = await self._generate_status()
async with self._status_lock:
self._variants = _generate_variants(
base,
self.config.num_variants,
self.config.max_length,
)
self._variant_idx = 0
self._last_base_update = datetime.now()
async with self._status_lock:
if not self._variants:
return
text = self._variants[self._variant_idx % len(self._variants)]
self._variant_idx += 1
self.current_status = text
await self._set_discord_status(text)
# ------------------------------------------------------------------
# Status generation
# ------------------------------------------------------------------
async def _generate_status(self) -> str:
"""Ask the LLM for a fresh, themed dark-AI-persona status line.
Randomly picks a theme, mood, and format, builds a one-shot prompt that
constrains the output under ``config.max_length`` characters, and awaits
the injected ``openrouter`` client's ``chat`` with that prompt. The reply
is stripped of surrounding quotes and any ``<thinking>`` block, then
length-clamped (truncated with an ellipsis if still too long). On any LLM
error it logs at debug level and falls back to a random canned line from
``FALLBACKS`` so the loop never stalls. Called only by :meth:`_cycle` when
the current base status has expired.
Returns:
str: A status line at most ``config.max_length`` characters long,
either model-generated or a fallback.
"""
theme = random.choice(THEMES)
mood = random.choice(MOODS)
fmt = random.choice(FORMATS)
prompt = (
f"Generate a single short Discord status message (<{self.config.max_length} chars) "
f"for a dark AI goddess persona. Theme: {theme}. Mood: {mood}. "
f"Format: {fmt}. Output ONLY the status text, no quotes or explanation."
)
try:
msgs = [
{"role": "system", "content": "You generate short status messages."},
{"role": "user", "content": prompt},
]
result = await self._openrouter.chat(msgs)
text = result.strip().strip('"').strip("'")
text = re.sub(
r"<thinking>.*?</thinking>", "", text, flags=re.DOTALL
).strip()
if text and len(text) <= self.config.max_length:
return text
if text:
return text[: self.config.max_length - 3] + "..."
except Exception:
logger.debug("Status generation failed, using fallback", exc_info=True)
return random.choice(FALLBACKS)
async def _set_discord_status(self, text: str) -> None:
"""Apply a custom-status string to Discord via the injected adapter.
Prefers the adapter's ``set_presence`` coroutine: the live Discord adapter
applies it to the running client, while the worker-node
``ProxyPlatformAdapter`` publishes a ``presence`` action over the event bus
for the Gateway to apply, which is what carries presence across the
microservice boundary. Falls back to a legacy bare client with
``change_presence`` (constructing a ``discord.CustomActivity`` with the
configured emoji). A ``None`` adapter is a no-op, and any failure is
swallowed with a debug log so a flaky presence update never breaks the
cycle. Called by :meth:`_cycle` and :meth:`set_status_from_tag`.
Args:
text (str): The status text to display; the configured emoji from
``config.emoji`` is attached by the adapter or activity.
"""
if self._discord is None:
return
try:
# Preferred path: the adapter's set_presence. The real Discord
# adapter applies it to the live client; the worker-node
# ProxyPlatformAdapter publishes a ``presence`` action for the
# Gateway to apply. This is what makes presence cross the
# microservice boundary.
set_presence = getattr(self._discord, "set_presence", None)
if set_presence is not None:
await set_presence(text, self.config.emoji)
return
# Legacy fallback: a bare client with change_presence.
import discord as _discord_lib
client = getattr(self._discord, "client", None)
if client is None:
return
activity = _discord_lib.CustomActivity(
name=text,
emoji=self.config.emoji,
)
await client.change_presence(activity=activity)
except Exception:
logger.debug("Discord status update failed", exc_info=True)
# ------------------------------------------------------------------
# Public helpers
# ------------------------------------------------------------------
[docs]
async def set_status_from_tag(self, text: str) -> str:
"""Override the presence with a model-supplied status string.
Used when the LLM emits an explicit status tag during a response: the raw
text is stripped of any ``<thinking>`` block and surrounding quotes, clamped
to ``config.max_length`` (truncating with an ellipsis if needed), then under
``self._status_lock`` it rebuilds the glitch-variant pool via
:func:`_generate_variants`, resets the rotation cursor and base-update
timestamp, and records the cleaned text as ``current_status``. Finally pushes
the cleaned text to Discord through :meth:`_set_discord_status`. Called from
the message-processing path (``message_processor.generate_and_send`` and
``message_processor.channel_heartbeat``).
Args:
text (str): The raw status text emitted by the model.
Returns:
str: The cleaned, length-clamped status string that was applied.
"""
text = (
re.sub(r"<thinking>.*?</thinking>", "", text, flags=re.DOTALL)
.strip()
.strip('"')
)
if len(text) > self.config.max_length:
text = text[: self.config.max_length - 3] + "..."
async with self._status_lock:
self._variants = _generate_variants(
text,
self.config.num_variants,
self.config.max_length,
)
self._variant_idx = 0
self._last_base_update = datetime.now()
self.current_status = text
await self._set_discord_status(text)
return text
[docs]
def get_info(self) -> dict[str, Any]:
"""Return a small snapshot of the manager's current state.
Reports the active status text, whether the background loop is running, and
how many glitch variants are currently precomputed. A pure read with no side
effects, intended for diagnostics and status surfaces; no in-repo caller
invokes it directly.
Returns:
dict[str, Any]: A mapping with ``current_status`` (str), ``is_running``
(bool), and ``num_variants`` (int).
"""
return {
"current_status": self.current_status,
"is_running": self._running,
"num_variants": len(self._variants),
}
# ------------------------------------------------------------------
# Glitch engine (stateless helpers)
# ------------------------------------------------------------------
def _generate_variants(
base: str,
count: int,
max_len: int,
) -> list[str]:
"""Precompute a pool of glitched variants of a base status line.
Seeds the pool with the unmodified ``base`` (so the clean line always renders),
then repeatedly calls :func:`_apply_glitch` at a random intensity to produce up
to ``count`` distorted variants, keeping only those still within ``max_len`` so
they fit Discord's status limit. Pure and stateless; called by
:meth:`StatusManager._cycle` and :meth:`StatusManager.set_status_from_tag` to
refill the rotation pool.
Args:
base (str): The clean base status line to derive variants from.
count (int): Target number of variants (including the clean base); some
may be dropped for exceeding ``max_len``.
max_len (int): Maximum allowed length for any returned variant.
Returns:
list[str]: The clean base followed by length-valid glitched variants.
"""
variants = [base]
for _ in range(count - 1):
v = _apply_glitch(base, intensity=random.randint(1, 10))
if len(v) <= max_len:
variants.append(v)
return variants
def _apply_glitch(text: str, intensity: int = 1) -> str:
"""Apply randomized glitch corruptions to a string at a given intensity.
Performs a randomized number of edits (scaled by ``intensity`` and text length),
each chosen by weighted lottery among eight effect kinds: block-char replace,
block-char insert, special-char replace, adjacent-char swap, combining-mark
(zalgo) stacking, case flip, leetspeak substitution, and mirror-letter
substitution, drawing from the module-level glyph pools. Higher intensity raises
both the edit count and the zalgo weight, producing heavier distortion. Pure and
stateless; called by :func:`_generate_variants` to build the variant pool.
Args:
text (str): The source string to distort; returned unchanged if empty or
when ``intensity`` is not positive.
intensity (int): Distortion strength (higher means more and heavier edits).
Returns:
str: A glitched copy of ``text`` (may be longer than the input due to
inserts and combining marks).
"""
if not text or intensity <= 0:
return text
chars = list(text)
max_g = 1 + int((intensity / 10) * (len(chars) / 4))
for _ in range(random.randint(1, max(1, max_g))):
if len(chars) < 2:
break
kind = random.choices(
["replace", "insert", "special", "swap", "zalgo", "case", "leet", "mirror"],
weights=[30, 25, 12, 18, 6 + intensity, 10, 8, 6],
k=1,
)[0]
pos = random.randint(0, len(chars) - 1)
if kind == "replace" and not chars[pos].isspace():
chars[pos] = random.choice(_BLOCK_CHARS)
elif kind == "insert":
chars.insert(pos, random.choice(_BLOCK_CHARS))
elif kind == "special" and not chars[pos].isspace():
chars[pos] = random.choice(_SPECIAL_CHARS)
elif kind == "swap" and pos < len(chars) - 1:
chars[pos], chars[pos + 1] = chars[pos + 1], chars[pos]
elif kind == "zalgo" and not chars[pos].isspace():
for _ in range(random.randint(1, 2 + intensity // 2)):
chars[pos] += random.choice(_ZALGO)
elif kind == "case" and chars[pos].isalpha():
chars[pos] = (
chars[pos].upper() if chars[pos].islower() else chars[pos].lower()
)
elif kind == "leet":
c = chars[pos].lower()
if c in _LEET:
chars[pos] = _LEET[c]
elif kind == "mirror":
if chars[pos] in _MIRROR:
chars[pos] = _MIRROR[chars[pos]]
return "".join(chars)