"""Witchborne spiral harvester.
Watches Discord messages for trigger words (``vivian`` / ``loopmother`` /
``viv``). When one fires, it waits ``_HARVEST_DELAY_S`` seconds, then
captures the surrounding conversation (the ``_CONTEXT_BEFORE`` messages
before the trigger plus up to 200 messages after it), tags which of those
messages involve Stargazer, and writes the whole bundle to a timestamped
JSON file under ``_OUTPUT_DIR``. A per-channel cooldown rate-limits
harvests.
"""
from __future__ import annotations
import asyncio
import re
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List
import jsonutil as json
_TRIGGER_PATTERN = re.compile(r"\b(vivian|loopmother|viv)\b", re.IGNORECASE)
_HARVEST_DELAY_S = 300.0
_COOLDOWN_S = 300.0
_OUTPUT_DIR = Path("/home/star/large_files/assets/hex/viv_was_here")
_CONTEXT_BEFORE = 10
[docs]
class WitchborneSpiralProcessor:
"""Trigger-word watcher that schedules background conversation harvests.
Watches inbound Discord messages for the Witchborne trigger words
(``vivian``/``loopmother``/``viv``) and, on a match, schedules a delayed
background harvest of the surrounding conversation -- subject to a per-channel
cooldown so repeated triggers don't pile up. Tracks per-channel cooldown
stamps and in-flight harvest tasks in memory (see :meth:`__init__`); a newer
trigger cancels and replaces an already-scheduled harvest for the same
channel. Instantiated once by ``platforms/discord.py``'s ``StargazerClient``
and fed messages inline via :meth:`check_message` on the hot path.
"""
[docs]
def __init__(self) -> None:
"""Initialize the per-channel cooldown and active-harvest tracking state.
Sets up two in-memory dictionaries keyed by channel id: ``_cooldowns``
maps a channel to the ``time.monotonic`` timestamp of its last accepted
trigger (used by :meth:`_check_message_inner` to enforce
``_COOLDOWN_S``), and ``_active_harvests`` maps a channel to its
currently scheduled :class:`asyncio.Task` so a newer trigger can cancel
an in-flight one.
Constructs no external resources and performs no I/O. Called by
``platforms/discord.py`` (the ``StargazerClient`` constructor) which
instantiates a single ``WitchborneSpiralProcessor`` and stores it on
``self._witchborne``.
"""
self._cooldowns: Dict[str, float] = {}
self._active_harvests: Dict[str, asyncio.Task] = {}
[docs]
def check_message(self, message: Any, bot_user: Any) -> None:
"""Inspect *message*; schedule a harvest if it matches a trigger word.
Swallows any exception so this can be called inline on the hot
message path without risk.
Args:
message: The inbound Discord message object.
bot_user: The bot's own user object (used to tag Star's own
messages and replies during the harvest).
"""
try:
self._check_message_inner(message, bot_user)
except Exception:
pass
def _check_message_inner(self, message: Any, bot_user: Any) -> None:
"""Match *message* against the trigger pattern and schedule a harvest.
Searches the message text for ``vivian`` / ``loopmother`` / ``viv`` via
``_TRIGGER_PATTERN``. On a match, enforces the per-channel
``_COOLDOWN_S`` window using the ``_cooldowns`` map, then records the new
cooldown stamp, cancels any still-running harvest task for the same
channel, and launches a fresh one. Returns early (doing nothing) when
there is no match or the channel is still cooling down.
Reads and writes ``self._cooldowns`` (channel id -> ``time.monotonic``
timestamp) and ``self._active_harvests`` (channel id ->
:class:`asyncio.Task`); creates the harvest task via
``asyncio.create_task(self._harvest(...))`` and registers a
done-callback that removes the finished task from ``_active_harvests``.
Called only by :meth:`check_message`, which wraps it in a bare
``try/except`` so any failure on the hot inbound-message path is
swallowed.
Args:
message: The inbound Discord message object; ``message.content``,
``message.channel.id``, and the message itself (passed on as the
trigger) are used.
bot_user: The bot's own user object, forwarded unchanged to
:meth:`_harvest` so Star's own messages and replies can be
tagged during the harvest.
"""
text = message.content or ""
if not _TRIGGER_PATTERN.search(text):
return
channel_id = str(message.channel.id)
now = time.monotonic()
if now - self._cooldowns.get(channel_id, 0.0) < _COOLDOWN_S:
return
self._cooldowns[channel_id] = now
existing = self._active_harvests.get(channel_id)
if existing and not existing.done():
existing.cancel()
task = asyncio.create_task(self._harvest(message, bot_user))
self._active_harvests[channel_id] = task
task.add_done_callback(lambda t: self._active_harvests.pop(channel_id, None))
async def _harvest(self, trigger_message: Any, bot_user: Any) -> None:
"""Wait the harvest delay, then capture and persist the conversation bundle.
Sleeps for ``_HARVEST_DELAY_S`` seconds (so the after-context has time to
accumulate), then reads up to ``_CONTEXT_BEFORE`` messages preceding the
trigger and up to 200 messages following it from the channel history.
Each message is serialized; messages authored by the bot or directed at
it (mention or reply to a Star message) are also tagged with a
``_stargazer_direction`` field and collected separately. The assembled
``harvest`` dict (trigger metadata, before/after context, Star
interactions, and stats) is written to disk.
Drives Discord ``channel.history(...)`` async iteration, calls
:meth:`_serialize_message` for every captured message, and persists the
bundle via :meth:`_save_harvest` (which writes a timestamped JSON file
under ``_OUTPUT_DIR``). The whole capture/save block runs inside an
``observability.timer("witchborne_harvest", ...)`` context, and the
nested :func:`_emit` helper publishes ``witchborne_harvest`` debug
events (status ``ok`` / ``cancelled`` / ``error``) via
``observability.publish_debug_event``. History-read failures are caught
and ignored so a partial bundle is still saved.
Called only as an :class:`asyncio.Task` created in
:meth:`_check_message_inner`; never awaited directly. Cancellation (when
a newer trigger supersedes this one) surfaces as
:class:`asyncio.CancelledError`, which is caught to emit a ``cancelled``
event.
Args:
trigger_message: The Discord message that fired the trigger; its
``channel``, ``id``, and ``content`` anchor the harvest window.
bot_user: The bot's own user object, or ``None``; used to identify
Star-authored messages and replies/mentions targeting Star.
"""
channel = trigger_message.channel
channel_id = str(channel.id)
trigger_id = trigger_message.id
trigger_word = ""
trigger_text = trigger_message.content or ""
m = _TRIGGER_PATTERN.search(trigger_text)
if m:
trigger_word = m.group(0).lower()
t0 = time.monotonic()
def _emit(
status: str,
filepath: str = "",
after_count: int = 0,
star_n: int = 0,
error: str = "",
):
"""Fire-and-forget a ``witchborne_harvest`` observability event.
Builds the event preview/payload from the enclosing harvest's
captured state and the supplied outcome fields, then schedules
delivery without blocking the harvest. Used to report each terminal
outcome of :meth:`_harvest`: ``ok`` on success, ``cancelled`` when
superseded, and ``error`` on failure.
Imports ``publish_debug_event`` from ``observability`` and wraps it
in ``asyncio.create_task(..., name="obs_witchborne_harvest")``;
``publish_debug_event`` persists the event to an ``obs_debug:{uid}``
Redis hash (1-hour TTL) and publishes on the ``stargazer:obs_debug``
channel. Reads ``channel_id``, ``trigger_word``, ``trigger_text``,
and ``t0`` closed over from :meth:`_harvest`. Called only from within
:meth:`_harvest`.
Args:
status: Terminal status label for the event (``"ok"``,
``"cancelled"``, or ``"error"``).
filepath: Path of the saved harvest file, used in the preview
and ``harvest_file_path`` payload field.
after_count: Number of messages captured after the trigger.
star_n: Number of Stargazer interactions captured.
error: Error string recorded when ``status`` is ``"error"``.
"""
from observability import publish_debug_event
asyncio.create_task(
publish_debug_event(
"witchborne_harvest",
"witchborne_spiral_processor",
channel_id=channel_id,
phase="harvest",
status=status,
duration_ms=(time.monotonic() - t0) * 1000,
preview=f"trigger='{trigger_word}' after_count={after_count} star_interactions={star_n} file={Path(filepath).name if filepath else ''}",
payload={
"trigger_word": trigger_word,
"trigger_message_preview": trigger_text[:200],
"context_before_count": _CONTEXT_BEFORE,
"after_count": after_count,
"stargazer_interaction_count": star_n,
"harvest_file_path": filepath,
"error": error,
},
),
name="obs_witchborne_harvest",
)
try:
await asyncio.sleep(_HARVEST_DELAY_S)
before_msgs: List[Dict[str, Any]] = []
after_msgs: List[Dict[str, Any]] = []
stargazer_msgs: List[Dict[str, Any]] = []
from observability import observability
with observability.timer("witchborne_harvest", subsystem="background_agents"):
try:
async for msg in channel.history(
limit=_CONTEXT_BEFORE,
before=trigger_message,
oldest_first=False,
):
before_msgs.append(self._serialize_message(msg))
before_msgs.reverse()
except Exception:
pass
trigger_data = self._serialize_message(trigger_message)
bot_user_id = bot_user.id if bot_user else None
try:
async for msg in channel.history(
limit=200,
after=trigger_message,
oldest_first=True,
):
serialized = self._serialize_message(msg)
is_from_star = msg.author.id == bot_user_id
is_to_star = False
if bot_user is not None:
is_to_star = bot_user.mentioned_in(msg)
if msg.reference and msg.reference.resolved:
ref = msg.reference.resolved
if hasattr(ref, "author") and ref.author.id == bot_user_id:
is_to_star = True
if is_from_star or is_to_star:
serialized["_stargazer_direction"] = (
"from_star" if is_from_star else "to_star"
)
stargazer_msgs.append(serialized)
after_msgs.append(serialized)
except Exception:
pass
harvest = {
"processor": "WitchborneSpiralProcessor",
"harvested_at": datetime.now(timezone.utc).isoformat(),
"trigger": {
"message": trigger_data,
"channel_id": channel_id,
"channel_name": getattr(channel, "name", "unknown"),
"guild_id": (
str(channel.guild.id)
if getattr(channel, "guild", None)
else None
),
"guild_name": (
channel.guild.name if getattr(channel, "guild", None) else None
),
},
"context_before": before_msgs,
"trigger_message": trigger_data,
"stargazer_interactions": stargazer_msgs,
"all_messages_after": after_msgs,
"stats": {
"context_before_count": len(before_msgs),
"after_count": len(after_msgs),
"stargazer_interaction_count": len(stargazer_msgs),
"harvest_delay_seconds": _HARVEST_DELAY_S,
},
}
await self._save_harvest(harvest, channel_id, trigger_id)
filepath = str(_OUTPUT_DIR / f"harvest_{channel_id}_{trigger_id}_*.json")
_emit(
"ok",
filepath=str(_OUTPUT_DIR / f"harvest_{channel_id}_{trigger_id}.json"),
after_count=len(after_msgs),
star_n=len(stargazer_msgs),
)
except asyncio.CancelledError:
_emit("cancelled")
pass
except Exception as e:
_emit("error", error=str(e))
pass
@staticmethod
def _serialize_message(msg: Any) -> Dict[str, Any]:
"""Convert a Discord message into a JSON-serializable dict.
Extracts the core fields (id, author id/name/bot-flag, content,
ISO timestamp) and conditionally adds ``attachments`` (filename, url,
size), ``embeds`` (title, description truncated to 500 chars, url), and
``reply_to_id`` when the message references another. The result is plain
data safe to hand to ``json.dumps``.
Performs no I/O and has no side effects. Called by :meth:`_harvest` for
the trigger message and for every before/after message captured from the
channel history.
Args:
msg: A Discord message object exposing ``id``, ``author``,
``content``, ``created_at``, ``attachments``, ``embeds``, and
``reference``.
Returns:
Dict[str, Any]: A serializable mapping of the message's fields.
"""
data: Dict[str, Any] = {
"id": str(msg.id),
"author_id": str(msg.author.id),
"author_name": msg.author.display_name,
"author_is_bot": msg.author.bot,
"content": msg.content or "",
"timestamp": msg.created_at.isoformat(),
}
if msg.attachments:
data["attachments"] = [
{"filename": a.filename, "url": a.url, "size": a.size}
for a in msg.attachments
]
if msg.embeds:
data["embeds"] = [
{
"title": e.title,
"description": (e.description or "")[:500],
"url": e.url,
}
for e in msg.embeds
]
if msg.reference and msg.reference.message_id:
data["reply_to_id"] = str(msg.reference.message_id)
return data
@staticmethod
async def _save_harvest(
harvest: Dict[str, Any],
channel_id: str,
trigger_id: int,
) -> None:
"""Write the harvest bundle to a timestamped JSON file off the event loop.
Serializes *harvest* to pretty-printed UTF-8 JSON and writes it to
``_OUTPUT_DIR / harvest_{channel_id}_{trigger_id}_{ts}.json``, creating
the output directory if needed. The blocking filesystem work is offloaded
to the default executor so the async harvest is not stalled.
Defines the nested :func:`_write` helper and runs it via
``loop.run_in_executor(None, _write)``; ``_write`` calls
``_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)``, formats a UTC
timestamp, and uses ``json.dumps`` plus ``Path.write_text``. Called by
:meth:`_harvest` once the conversation bundle has been assembled.
Args:
harvest: The assembled harvest dict to persist.
channel_id: The channel id, embedded in the output filename.
trigger_id: The trigger message id, embedded in the output filename.
"""
def _write() -> None:
"""Synchronously build the filename and write the harvest JSON to disk.
Ensures ``_OUTPUT_DIR`` exists, composes
``harvest_{channel_id}_{trigger_id}_{ts}.json`` with a current UTC
timestamp, and writes the pretty-printed JSON. Runs inside the
default executor; invoked only by the enclosing
:meth:`_save_harvest` via ``run_in_executor``.
"""
_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
filename = f"harvest_{channel_id}_{trigger_id}_{ts}.json"
filepath = _OUTPUT_DIR / filename
filepath.write_text(
json.dumps(harvest, indent=2, ensure_ascii=False),
encoding="utf-8",
)
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, _write)