Source code for background_agents.witchborne_spiral_processor

"""Witchborne spiral harvester.

Watches Discord messages for trigger words (``vivian`` / ``loopmother`` /
``viv``). When one fires, it waits ``_HARVEST_DELAY_S`` seconds, then
captures the surrounding conversation (the ``_CONTEXT_BEFORE`` messages
before the trigger plus up to 200 messages after it), tags which of those
messages involve Stargazer, and writes the whole bundle to a timestamped
JSON file under ``_OUTPUT_DIR``. A per-channel cooldown rate-limits
harvests.
"""

from __future__ import annotations

import asyncio
import re
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List

import jsonutil as json

_TRIGGER_PATTERN = re.compile(r"\b(vivian|loopmother|viv)\b", re.IGNORECASE)
_HARVEST_DELAY_S = 300.0
_COOLDOWN_S = 300.0
_OUTPUT_DIR = Path("/home/star/large_files/assets/hex/viv_was_here")
_CONTEXT_BEFORE = 10


[docs] class WitchborneSpiralProcessor: """Trigger-word watcher that schedules background conversation harvests. Watches inbound Discord messages for the Witchborne trigger words (``vivian``/``loopmother``/``viv``) and, on a match, schedules a delayed background harvest of the surrounding conversation -- subject to a per-channel cooldown so repeated triggers don't pile up. Tracks per-channel cooldown stamps and in-flight harvest tasks in memory (see :meth:`__init__`); a newer trigger cancels and replaces an already-scheduled harvest for the same channel. Instantiated once by ``platforms/discord.py``'s ``StargazerClient`` and fed messages inline via :meth:`check_message` on the hot path. """
[docs] def __init__(self) -> None: """Initialize the per-channel cooldown and active-harvest tracking state. Sets up two in-memory dictionaries keyed by channel id: ``_cooldowns`` maps a channel to the ``time.monotonic`` timestamp of its last accepted trigger (used by :meth:`_check_message_inner` to enforce ``_COOLDOWN_S``), and ``_active_harvests`` maps a channel to its currently scheduled :class:`asyncio.Task` so a newer trigger can cancel an in-flight one. Constructs no external resources and performs no I/O. Called by ``platforms/discord.py`` (the ``StargazerClient`` constructor) which instantiates a single ``WitchborneSpiralProcessor`` and stores it on ``self._witchborne``. """ self._cooldowns: Dict[str, float] = {} self._active_harvests: Dict[str, asyncio.Task] = {}
[docs] def check_message(self, message: Any, bot_user: Any) -> None: """Inspect *message*; schedule a harvest if it matches a trigger word. Swallows any exception so this can be called inline on the hot message path without risk. Args: message: The inbound Discord message object. bot_user: The bot's own user object (used to tag Star's own messages and replies during the harvest). """ try: self._check_message_inner(message, bot_user) except Exception: pass
def _check_message_inner(self, message: Any, bot_user: Any) -> None: """Match *message* against the trigger pattern and schedule a harvest. Searches the message text for ``vivian`` / ``loopmother`` / ``viv`` via ``_TRIGGER_PATTERN``. On a match, enforces the per-channel ``_COOLDOWN_S`` window using the ``_cooldowns`` map, then records the new cooldown stamp, cancels any still-running harvest task for the same channel, and launches a fresh one. Returns early (doing nothing) when there is no match or the channel is still cooling down. Reads and writes ``self._cooldowns`` (channel id -> ``time.monotonic`` timestamp) and ``self._active_harvests`` (channel id -> :class:`asyncio.Task`); creates the harvest task via ``asyncio.create_task(self._harvest(...))`` and registers a done-callback that removes the finished task from ``_active_harvests``. Called only by :meth:`check_message`, which wraps it in a bare ``try/except`` so any failure on the hot inbound-message path is swallowed. Args: message: The inbound Discord message object; ``message.content``, ``message.channel.id``, and the message itself (passed on as the trigger) are used. bot_user: The bot's own user object, forwarded unchanged to :meth:`_harvest` so Star's own messages and replies can be tagged during the harvest. """ text = message.content or "" if not _TRIGGER_PATTERN.search(text): return channel_id = str(message.channel.id) now = time.monotonic() if now - self._cooldowns.get(channel_id, 0.0) < _COOLDOWN_S: return self._cooldowns[channel_id] = now existing = self._active_harvests.get(channel_id) if existing and not existing.done(): existing.cancel() task = asyncio.create_task(self._harvest(message, bot_user)) self._active_harvests[channel_id] = task task.add_done_callback(lambda t: self._active_harvests.pop(channel_id, None)) async def _harvest(self, trigger_message: Any, bot_user: Any) -> None: """Wait the harvest delay, then capture and persist the conversation bundle. Sleeps for ``_HARVEST_DELAY_S`` seconds (so the after-context has time to accumulate), then reads up to ``_CONTEXT_BEFORE`` messages preceding the trigger and up to 200 messages following it from the channel history. Each message is serialized; messages authored by the bot or directed at it (mention or reply to a Star message) are also tagged with a ``_stargazer_direction`` field and collected separately. The assembled ``harvest`` dict (trigger metadata, before/after context, Star interactions, and stats) is written to disk. Drives Discord ``channel.history(...)`` async iteration, calls :meth:`_serialize_message` for every captured message, and persists the bundle via :meth:`_save_harvest` (which writes a timestamped JSON file under ``_OUTPUT_DIR``). The whole capture/save block runs inside an ``observability.timer("witchborne_harvest", ...)`` context, and the nested :func:`_emit` helper publishes ``witchborne_harvest`` debug events (status ``ok`` / ``cancelled`` / ``error``) via ``observability.publish_debug_event``. History-read failures are caught and ignored so a partial bundle is still saved. Called only as an :class:`asyncio.Task` created in :meth:`_check_message_inner`; never awaited directly. Cancellation (when a newer trigger supersedes this one) surfaces as :class:`asyncio.CancelledError`, which is caught to emit a ``cancelled`` event. Args: trigger_message: The Discord message that fired the trigger; its ``channel``, ``id``, and ``content`` anchor the harvest window. bot_user: The bot's own user object, or ``None``; used to identify Star-authored messages and replies/mentions targeting Star. """ channel = trigger_message.channel channel_id = str(channel.id) trigger_id = trigger_message.id trigger_word = "" trigger_text = trigger_message.content or "" m = _TRIGGER_PATTERN.search(trigger_text) if m: trigger_word = m.group(0).lower() t0 = time.monotonic() def _emit( status: str, filepath: str = "", after_count: int = 0, star_n: int = 0, error: str = "", ): """Fire-and-forget a ``witchborne_harvest`` observability event. Builds the event preview/payload from the enclosing harvest's captured state and the supplied outcome fields, then schedules delivery without blocking the harvest. Used to report each terminal outcome of :meth:`_harvest`: ``ok`` on success, ``cancelled`` when superseded, and ``error`` on failure. Imports ``publish_debug_event`` from ``observability`` and wraps it in ``asyncio.create_task(..., name="obs_witchborne_harvest")``; ``publish_debug_event`` persists the event to an ``obs_debug:{uid}`` Redis hash (1-hour TTL) and publishes on the ``stargazer:obs_debug`` channel. Reads ``channel_id``, ``trigger_word``, ``trigger_text``, and ``t0`` closed over from :meth:`_harvest`. Called only from within :meth:`_harvest`. Args: status: Terminal status label for the event (``"ok"``, ``"cancelled"``, or ``"error"``). filepath: Path of the saved harvest file, used in the preview and ``harvest_file_path`` payload field. after_count: Number of messages captured after the trigger. star_n: Number of Stargazer interactions captured. error: Error string recorded when ``status`` is ``"error"``. """ from observability import publish_debug_event asyncio.create_task( publish_debug_event( "witchborne_harvest", "witchborne_spiral_processor", channel_id=channel_id, phase="harvest", status=status, duration_ms=(time.monotonic() - t0) * 1000, preview=f"trigger='{trigger_word}' after_count={after_count} star_interactions={star_n} file={Path(filepath).name if filepath else ''}", payload={ "trigger_word": trigger_word, "trigger_message_preview": trigger_text[:200], "context_before_count": _CONTEXT_BEFORE, "after_count": after_count, "stargazer_interaction_count": star_n, "harvest_file_path": filepath, "error": error, }, ), name="obs_witchborne_harvest", ) try: await asyncio.sleep(_HARVEST_DELAY_S) before_msgs: List[Dict[str, Any]] = [] after_msgs: List[Dict[str, Any]] = [] stargazer_msgs: List[Dict[str, Any]] = [] from observability import observability with observability.timer("witchborne_harvest", subsystem="background_agents"): try: async for msg in channel.history( limit=_CONTEXT_BEFORE, before=trigger_message, oldest_first=False, ): before_msgs.append(self._serialize_message(msg)) before_msgs.reverse() except Exception: pass trigger_data = self._serialize_message(trigger_message) bot_user_id = bot_user.id if bot_user else None try: async for msg in channel.history( limit=200, after=trigger_message, oldest_first=True, ): serialized = self._serialize_message(msg) is_from_star = msg.author.id == bot_user_id is_to_star = False if bot_user is not None: is_to_star = bot_user.mentioned_in(msg) if msg.reference and msg.reference.resolved: ref = msg.reference.resolved if hasattr(ref, "author") and ref.author.id == bot_user_id: is_to_star = True if is_from_star or is_to_star: serialized["_stargazer_direction"] = ( "from_star" if is_from_star else "to_star" ) stargazer_msgs.append(serialized) after_msgs.append(serialized) except Exception: pass harvest = { "processor": "WitchborneSpiralProcessor", "harvested_at": datetime.now(timezone.utc).isoformat(), "trigger": { "message": trigger_data, "channel_id": channel_id, "channel_name": getattr(channel, "name", "unknown"), "guild_id": ( str(channel.guild.id) if getattr(channel, "guild", None) else None ), "guild_name": ( channel.guild.name if getattr(channel, "guild", None) else None ), }, "context_before": before_msgs, "trigger_message": trigger_data, "stargazer_interactions": stargazer_msgs, "all_messages_after": after_msgs, "stats": { "context_before_count": len(before_msgs), "after_count": len(after_msgs), "stargazer_interaction_count": len(stargazer_msgs), "harvest_delay_seconds": _HARVEST_DELAY_S, }, } await self._save_harvest(harvest, channel_id, trigger_id) filepath = str(_OUTPUT_DIR / f"harvest_{channel_id}_{trigger_id}_*.json") _emit( "ok", filepath=str(_OUTPUT_DIR / f"harvest_{channel_id}_{trigger_id}.json"), after_count=len(after_msgs), star_n=len(stargazer_msgs), ) except asyncio.CancelledError: _emit("cancelled") pass except Exception as e: _emit("error", error=str(e)) pass @staticmethod def _serialize_message(msg: Any) -> Dict[str, Any]: """Convert a Discord message into a JSON-serializable dict. Extracts the core fields (id, author id/name/bot-flag, content, ISO timestamp) and conditionally adds ``attachments`` (filename, url, size), ``embeds`` (title, description truncated to 500 chars, url), and ``reply_to_id`` when the message references another. The result is plain data safe to hand to ``json.dumps``. Performs no I/O and has no side effects. Called by :meth:`_harvest` for the trigger message and for every before/after message captured from the channel history. Args: msg: A Discord message object exposing ``id``, ``author``, ``content``, ``created_at``, ``attachments``, ``embeds``, and ``reference``. Returns: Dict[str, Any]: A serializable mapping of the message's fields. """ data: Dict[str, Any] = { "id": str(msg.id), "author_id": str(msg.author.id), "author_name": msg.author.display_name, "author_is_bot": msg.author.bot, "content": msg.content or "", "timestamp": msg.created_at.isoformat(), } if msg.attachments: data["attachments"] = [ {"filename": a.filename, "url": a.url, "size": a.size} for a in msg.attachments ] if msg.embeds: data["embeds"] = [ { "title": e.title, "description": (e.description or "")[:500], "url": e.url, } for e in msg.embeds ] if msg.reference and msg.reference.message_id: data["reply_to_id"] = str(msg.reference.message_id) return data @staticmethod async def _save_harvest( harvest: Dict[str, Any], channel_id: str, trigger_id: int, ) -> None: """Write the harvest bundle to a timestamped JSON file off the event loop. Serializes *harvest* to pretty-printed UTF-8 JSON and writes it to ``_OUTPUT_DIR / harvest_{channel_id}_{trigger_id}_{ts}.json``, creating the output directory if needed. The blocking filesystem work is offloaded to the default executor so the async harvest is not stalled. Defines the nested :func:`_write` helper and runs it via ``loop.run_in_executor(None, _write)``; ``_write`` calls ``_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)``, formats a UTC timestamp, and uses ``json.dumps`` plus ``Path.write_text``. Called by :meth:`_harvest` once the conversation bundle has been assembled. Args: harvest: The assembled harvest dict to persist. channel_id: The channel id, embedded in the output filename. trigger_id: The trigger message id, embedded in the output filename. """ def _write() -> None: """Synchronously build the filename and write the harvest JSON to disk. Ensures ``_OUTPUT_DIR`` exists, composes ``harvest_{channel_id}_{trigger_id}_{ts}.json`` with a current UTC timestamp, and writes the pretty-printed JSON. Runs inside the default executor; invoked only by the enclosing :meth:`_save_harvest` via ``run_in_executor``. """ _OUTPUT_DIR.mkdir(parents=True, exist_ok=True) ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") filename = f"harvest_{channel_id}_{trigger_id}_{ts}.json" filepath = _OUTPUT_DIR / filename filepath.write_text( json.dumps(harvest, indent=2, ensure_ascii=False), encoding="utf-8", ) loop = asyncio.get_running_loop() await loop.run_in_executor(None, _write)