Source code for platforms.discord
"""Discord platform adapter using discord.py.
Wraps a :class:`discord.Client` and converts Discord events into
:class:`~platforms.base.IncomingMessage` instances for the shared
:class:`~message_processor.MessageProcessor`.
"""
from __future__ import annotations
import asyncio
import json
import logging
import time
from datetime import datetime, timezone
from typing import Any, Awaitable, Callable
import discord
from config import Config
from media_cache import MediaCache
from message_utils import split_message
from platforms.base import (
Attachment,
HistoricalMessage,
IncomingMessage,
MessageHandler,
MessageUpdateHandler,
MessageDeleteHandler,
PlatformAdapter,
)
# Type alias for the reaction-update callback
ReactionUpdateHandler = Callable[
[str, str, str, str], Awaitable[None],
]
logger = logging.getLogger(__name__)
[docs]
class DiscordPlatform(PlatformAdapter):
"""Platform adapter for Discord via discord.py.
Parameters
----------
message_handler:
Async callback that receives :class:`IncomingMessage` instances.
token:
Discord bot token.
"""
_WEBHOOK_CACHE_TTL = 60 # seconds
_MEMBER_CACHE_TTL = 21600 # 6 hours
[docs]
def __init__(
self,
message_handler: MessageHandler,
*,
token: str,
config: Config | None = None,
media_cache: MediaCache | None = None,
message_update_handler: MessageUpdateHandler | None = None,
message_delete_handler: MessageDeleteHandler | None = None,
reaction_update_handler: ReactionUpdateHandler | None = None,
) -> None:
"""Initialize the instance.
Args:
message_handler (MessageHandler): The message handler value.
"""
super().__init__(message_handler)
self._message_update_handler = message_update_handler
self._message_delete_handler = message_delete_handler
self._reaction_update_handler = reaction_update_handler
self._token = token
self._config = config
self._media_cache = media_cache
self._client: discord.Client | None = None
self._ready_event = asyncio.Event()
self._task: asyncio.Task | None = None
self._webhook_cache: dict[str, tuple[float, list[dict[str, Any]]]] = {}
self._typing_tasks: dict[str, asyncio.Task[None]] = {}
# guild_id -> (monotonic_timestamp, member_list)
self._member_cache: dict[str, tuple[float, list[dict[str, str]]]] = {}
from services.lyria_service import LyriaService
self._lyria_service = LyriaService()
@property
def lyria_service(self):
"""Lyria RealTime music service for voice channel playback."""
return self._lyria_service
# -- PlatformAdapter metadata --------------------------------------
@property
def name(self) -> str:
"""Name.
Returns:
str: Result string.
"""
return "discord"
@property
def is_running(self) -> bool:
"""Check whether is running.
Returns:
bool: True on success, False otherwise.
"""
return (
self._client is not None
and self._client.is_ready()
)
@property
def client(self) -> discord.Client | None:
"""Return the underlying :class:`discord.Client`, or ``None``."""
return self._client
@property
def bot_identity(self) -> dict[str, str]:
user = self._client.user if self._client is not None else None
uid = str(user.id) if user else ""
return {
"platform": "discord",
"user_id": uid,
"display_name": user.name if user else "",
"mention": f"<@{uid}>" if uid else "",
}
# -- PlatformAdapter lifecycle -------------------------------------
[docs]
async def start(self) -> None:
"""Start.
"""
if self._task is not None and not self._task.done():
logger.warning("Discord platform is already running")
return
intents = discord.Intents.default()
intents.message_content = True
intents.members = True
self._client = _DiscordClient(
platform=self,
media_cache=self._media_cache,
intents=intents,
)
self._ready_event.clear()
# Run client.start() in a background task (it blocks until closed)
self._task = asyncio.create_task(
self._run_client(),
)
# Wait for the client to be ready (with a timeout)
try:
await asyncio.wait_for(self._ready_event.wait(), timeout=30)
except asyncio.TimeoutError:
logger.error("Discord client did not become ready within 30s")
raise RuntimeError("Discord login timed out")
logger.info("Discord platform is running.")
[docs]
async def stop(self) -> None:
"""Stop.
"""
if self._client is not None:
await self._client.close()
if self._task is not None:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
self._client = None
logger.info("Discord platform stopped")
# -- PlatformAdapter outbound messaging ----------------------------
[docs]
async def send(self, channel_id: str, text: str) -> str:
"""Send.
Args:
channel_id (str): Discord/Matrix channel identifier.
text (str): Text content.
Returns:
str: Result string.
"""
if self._client is None:
logger.error("Discord client is not connected")
return ""
channel = self._client.get_channel(int(channel_id))
if channel is None:
try:
channel = await self._client.fetch_channel(int(channel_id))
except Exception:
logger.exception(
"Could not find Discord channel %s", channel_id,
)
return ""
# Discord has a 2000-char limit per message; split if needed
first_id = ""
try:
inner: _DiscordClient = self._client # type: ignore[assignment]
for chunk in split_message(text, max_length=1950, overflow_allowed=45):
sent_msg = await channel.send(chunk) # type: ignore[union-attr]
inner._sent_message_ids.add(sent_msg.id)
if not first_id:
first_id = str(sent_msg.id)
except discord.Forbidden:
logger.warning(
"403 Forbidden sending to Discord channel %s", channel_id,
)
raise
except Exception:
logger.exception(
"Failed to send message to Discord channel %s", channel_id,
)
return first_id
[docs]
async def send_file(
self,
channel_id: str,
data: bytes,
filename: str,
mimetype: str = "application/octet-stream",
) -> str | None:
"""Send file.
Args:
channel_id (str): Discord/Matrix channel identifier.
data (bytes): Input data payload.
filename (str): The filename value.
mimetype (str): The mimetype value.
Returns:
str | None: The CDN URL of the first attachment, or None on failure.
"""
from io import BytesIO
if self._client is None:
logger.error("Discord client is not connected")
return None
channel = self._client.get_channel(int(channel_id))
if channel is None:
try:
channel = await self._client.fetch_channel(int(channel_id))
except Exception:
logger.exception(
"Could not find Discord channel %s", channel_id,
)
return None
try:
file_obj = discord.File(BytesIO(data), filename=filename)
if mimetype.startswith("image/"):
embed = discord.Embed()
embed.set_image(url=f"attachment://{filename}")
msg = await channel.send( # type: ignore[union-attr]
file=file_obj, embed=embed,
)
else:
msg = await channel.send( # type: ignore[union-attr]
file=file_obj,
)
if msg and msg.attachments:
return msg.attachments[0].url
return None
except Exception:
logger.exception(
"Failed to send file to Discord channel %s", channel_id,
)
return None
# -- Message editing ------------------------------------------------
[docs]
async def edit_message(
self,
channel_id: str,
message_id: str,
new_text: str,
) -> bool:
"""Edit an existing bot message in *channel_id*."""
if self._client is None:
logger.error("Discord client is not connected")
return False
channel = self._client.get_channel(int(channel_id))
if channel is None:
try:
channel = await self._client.fetch_channel(int(channel_id))
except Exception:
logger.exception(
"Could not find Discord channel %s for edit", channel_id,
)
return False
try:
msg = await channel.fetch_message(int(message_id)) # type: ignore[union-attr]
await msg.edit(content=new_text)
return True
except discord.NotFound:
logger.warning(
"Message %s not found in channel %s", message_id, channel_id,
)
return False
except discord.Forbidden:
logger.warning(
"403 Forbidden editing message %s in channel %s",
message_id, channel_id,
)
return False
except Exception:
logger.exception(
"Failed to edit message %s in channel %s",
message_id, channel_id,
)
return False
# -- Interactive buttons (GameGirl Color) ---------------------------
[docs]
async def send_with_buttons(
self,
channel_id: str,
text: str,
view: Any = None,
) -> str:
"""Send a message with a discord.ui.View of interactive buttons.
Used by the GameGirl Color system to attach choice buttons
to game turn responses.
"""
if self._client is None:
logger.error("Discord client is not connected")
return ""
channel = self._client.get_channel(int(channel_id))
if channel is None:
try:
channel = await self._client.fetch_channel(int(channel_id))
except Exception:
logger.exception(
"Could not find Discord channel %s", channel_id,
)
return ""
import discord as _discord
first_id = ""
try:
inner: _DiscordClient = self._client # type: ignore[assignment]
for i, chunk in enumerate(
split_message(text, max_length=1950, overflow_allowed=45),
):
# Attach the View only to the last chunk # 🎮
kwargs: dict[str, Any] = {}
is_last = True # assume last, we'll fix on next iteration
# We can't know if it's the last chunk in advance with
# a generator, so we attach the view to every chunk
# that is sent. The view on earlier chunks will be
# overridden by the next message's view in the user's
# attention, which is acceptable.
if view is not None and isinstance(view, _discord.ui.View):
kwargs["view"] = view
sent_msg = await channel.send( # type: ignore[union-attr]
chunk, **kwargs,
)
inner._sent_message_ids.add(sent_msg.id)
if not first_id:
first_id = str(sent_msg.id)
except _discord.Forbidden:
logger.warning(
"403 Forbidden sending to Discord channel %s", channel_id,
)
raise
except Exception:
logger.exception(
"Failed to send message with buttons to channel %s",
channel_id,
)
return first_id
# -- Typing indicator ----------------------------------------------
[docs]
async def start_typing(self, channel_id: str) -> None:
"""Start typing.
Args:
channel_id (str): Discord/Matrix channel identifier.
"""
await self.stop_typing(channel_id)
if self._client is None:
return
channel = self._client.get_channel(int(channel_id))
if channel is None:
try:
channel = await self._client.fetch_channel(int(channel_id))
except Exception:
return
async def _typing_loop(ch: Any) -> None:
"""Internal helper: typing loop.
Args:
ch (Any): The ch value.
"""
try:
async with ch.typing():
await asyncio.sleep(float('inf'))
except asyncio.CancelledError:
pass
self._typing_tasks[channel_id] = asyncio.create_task(
_typing_loop(channel),
)
[docs]
async def stop_typing(self, channel_id: str) -> None:
"""Stop typing.
Args:
channel_id (str): Discord/Matrix channel identifier.
"""
task = self._typing_tasks.pop(channel_id, None)
if task is not None and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# -- Guild member list (cached) ------------------------------------
[docs]
async def get_guild_members(
self, guild_id: str,
) -> list[dict[str, str]]:
"""Return a cached list of all guild members with roles.
Results are cached per guild for :attr:`_MEMBER_CACHE_TTL` seconds
(6 hours). Each member dict contains ``id``, ``name``,
``nickname``, ``top_role``, ``roles``, and ``bot``.
Args:
guild_id: Discord guild (server) snowflake ID.
Returns:
List of member dicts, or empty list on failure.
"""
if self._client is None or not guild_id:
return []
cached = self._member_cache.get(guild_id)
if cached is not None:
ts, data = cached
if time.monotonic() - ts < self._MEMBER_CACHE_TTL:
return data
guild = self._client.get_guild(int(guild_id))
if guild is None:
return []
try:
members: list[dict[str, str]] = []
async for member in guild.fetch_members(limit=None):
role_names = [
r.name for r in member.roles
if r.name != "@everyone"
]
members.append({
"id": str(member.id),
"name": str(member),
"nickname": member.display_name,
"top_role": member.top_role.name if member.top_role.name != "@everyone" else "",
"roles": ", ".join(role_names) if role_names else "",
"bot": str(member.bot),
})
self._member_cache[guild_id] = (time.monotonic(), members)
logger.info(
"Cached %d guild members for guild %s",
len(members), guild_id,
)
return members
except discord.Forbidden:
logger.warning(
"Missing permissions to fetch members for guild %s",
guild_id,
)
return []
except Exception:
logger.debug(
"Failed to fetch members for guild %s",
guild_id, exc_info=True,
)
return []
# -- Server/channel listing ----------------------------------------
[docs]
async def list_servers_and_channels(self) -> list[dict[str, Any]]:
"""Return all Discord guilds and their channels."""
import discord as _discord
if self._client is None or not self._client.is_ready():
return []
servers: list[dict[str, Any]] = []
for guild in self._client.guilds:
channels: list[dict[str, Any]] = []
for ch in sorted(guild.channels, key=lambda c: c.position):
if isinstance(ch, _discord.CategoryChannel):
continue
ch_type = "text"
if isinstance(ch, _discord.VoiceChannel):
ch_type = "voice"
elif isinstance(ch, _discord.StageChannel):
ch_type = "stage"
elif isinstance(ch, _discord.ForumChannel):
ch_type = "forum"
channels.append({
"channel_id": str(ch.id),
"channel_name": ch.name,
"type": ch_type,
"category": ch.category.name if ch.category else None,
})
servers.append({
"server_name": guild.name,
"server_id": str(guild.id),
"member_count": guild.member_count or 0,
"channels": channels,
})
return servers
# -- Channel webhooks ----------------------------------------------
[docs]
async def get_channel_webhooks(
self, channel_id: str,
) -> list[dict[str, Any]]:
"""Retrieve the channel webhooks.
Args:
channel_id (str): Discord/Matrix channel identifier.
Returns:
list[dict[str, Any]]: The result.
"""
if self._client is None:
return []
cached = self._webhook_cache.get(channel_id)
if cached is not None:
ts, data = cached
if time.monotonic() - ts < self._WEBHOOK_CACHE_TTL:
return data
channel = self._client.get_channel(int(channel_id))
if channel is None:
try:
channel = await self._client.fetch_channel(int(channel_id))
except Exception:
return []
if not hasattr(channel, "webhooks"):
return []
try:
webhooks = await channel.webhooks() # type: ignore[union-attr]
except discord.Forbidden:
logger.debug(
"Missing Manage Webhooks permission in channel %s",
channel_id,
)
return []
except Exception:
logger.debug(
"Failed to fetch webhooks for channel %s",
channel_id, exc_info=True,
)
return []
result = [
{
"webhook_id": str(wh.id),
"name": wh.name or "",
"url": wh.url or "",
"created_at": (
wh.created_at.isoformat() if wh.created_at else ""
),
}
for wh in webhooks
]
self._webhook_cache[channel_id] = (time.monotonic(), result)
return result
# -- Channel history -----------------------------------------------
[docs]
async def fetch_history(
self,
channel_id: str,
limit: int = 100,
) -> list[HistoricalMessage]:
"""Fetch history.
Args:
channel_id (str): Discord/Matrix channel identifier.
limit (int): Maximum number of items.
Returns:
list[HistoricalMessage]: The result.
"""
if self._client is None:
return []
channel = self._client.get_channel(int(channel_id))
if channel is None:
try:
channel = await self._client.fetch_channel(int(channel_id))
except Exception:
logger.debug(
"Could not find Discord channel %s for history fetch",
channel_id,
)
return []
if not hasattr(channel, "history"):
return []
bot_user = self._client.user
messages: list[HistoricalMessage] = []
try:
async for msg in channel.history(limit=limit): # type: ignore[union-attr]
reply_to = ""
if msg.reference and msg.reference.message_id:
reply_to = str(msg.reference.message_id)
text = msg.content
rich = _serialize_rich_content(msg)
if rich:
text = f"{text}\n{rich}" if text else rich
messages.append(HistoricalMessage(
user_id=str(msg.author.id),
user_name=msg.author.display_name,
text=text,
timestamp=msg.created_at,
message_id=str(msg.id),
is_bot=(msg.author == bot_user),
reply_to_id=reply_to,
reactions=_serialize_reactions(msg),
))
except discord.Forbidden:
logger.debug(
"Missing Read Message History permission in channel %s",
channel_id,
)
return []
except Exception:
logger.debug(
"Failed to fetch history for channel %s",
channel_id, exc_info=True,
)
return []
messages.reverse()
return messages
# -- Internal ------------------------------------------------------
async def _run_client(self) -> None:
"""Start the discord.py client (blocking coroutine)."""
try:
await self._client.start(self._token) # type: ignore[union-attr]
except asyncio.CancelledError:
pass
except Exception:
logger.exception("Discord client encountered an error")
def _mark_ready(self) -> None:
"""Called by the inner client when on_ready fires."""
self._ready_event.set()
# ------------------------------------------------------------------
# Inner discord.Client subclass
# ------------------------------------------------------------------
class _DiscordClient(discord.Client):
"""Thin discord.py Client that forwards events to the adapter."""
def __init__(
self,
platform: DiscordPlatform,
media_cache: MediaCache | None = None,
**kwargs: Any,
) -> None:
"""Initialize the instance.
Args:
platform (DiscordPlatform): Platform adapter instance.
media_cache (MediaCache | None): The media cache value.
"""
super().__init__(**kwargs)
self._platform = platform
self._media_cache = media_cache
self._config = platform._config
# message_id -> True for messages the bot has sent
self._sent_message_ids: set[int] = set()
self.tree = discord.app_commands.CommandTree(self)
self._register_slash_commands()
def _register_slash_commands(self) -> None:
"""Register all bot slash commands on the command tree."""
@self.tree.command(
name="stop",
description="Stop the bot's current response in this channel",
)
async def stop_command(interaction: discord.Interaction) -> None:
callback = self._platform._cancel_callback
if callback is None:
await interaction.response.send_message(
"Stop command is not available.",
ephemeral=True,
)
return
user_id = str(interaction.user.id)
channel_id = str(interaction.channel_id)
is_admin = (
interaction.user.guild_permissions.administrator
if (
interaction.guild is not None
and isinstance(interaction.user, discord.Member)
)
else False
)
result = await callback(
"discord", channel_id, user_id, is_admin,
)
await interaction.response.send_message(
result, ephemeral=True,
)
async def on_ready(self) -> None:
"""On ready -- sync slash commands and restore game sessions."""
logger.info(
"Discord: logged in as %s (ID: %s)",
self.user, self.user.id if self.user else "?",
)
self._platform._mark_ready()
try:
synced = await self.tree.sync()
logger.info(
"Discord: synced %d slash command(s)", len(synced),
)
except Exception:
logger.warning(
"Discord: failed to sync slash commands",
exc_info=True,
)
# Restore active game sessions from Redis # 🎮💀
try:
redis = getattr(self._platform, "_redis", None)
if redis is None:
redis = getattr(self._config, "redis", None)
if redis is None:
return
from game_session import GameSession, set_session
keys = await redis.keys("game:session:*")
restored = 0
for key in keys:
try:
# Skip history keys
key_str = key if isinstance(key, str) else key.decode()
if ":history" in key_str:
continue
raw = await redis.get(key)
if raw is None:
continue
data = json.loads(raw)
if not data.get("active", False):
continue
session = GameSession.from_dict(data)
session.active = True
set_session(session.channel_id, session)
restored += 1
logger.info(
"Restored game session: %s (%s) in channel %s",
session.game_name, session.game_id,
session.channel_id,
)
except Exception as exc:
logger.warning("Failed to restore session from %s: %s", key, exc)
if restored:
logger.info("Restored %d game session(s) from Redis", restored)
except ImportError:
pass
except Exception:
logger.debug("Game session restore skipped (no Redis or module)")
async def on_message(self, message: discord.Message) -> None:
# Ignore our own messages
"""On message.
Args:
message (discord.Message): The message value.
"""
if message.author == self.user:
return
is_addressed = self._check_addressed(message)
# Download attachments (using cache when available)
attachments: list[Attachment] = []
for att in message.attachments:
try:
att_url = att.url
mimetype = att.content_type or "application/octet-stream"
filename = att.filename
if self._media_cache is not None:
async def _download(
_att=att, _mt=mimetype, _fn=filename,
) -> tuple[bytes, str, str]:
"""Internal helper: download.
Returns:
tuple[bytes, str, str]: The result.
"""
return await _att.read(), _mt, _fn
data, mimetype, filename = (
await self._media_cache.get_or_download(
att_url, _download,
)
)
else:
data = await att.read()
attachments.append(Attachment(
data=data,
mimetype=mimetype,
filename=filename,
source_url=att_url,
))
except Exception:
logger.exception(
"Failed to download Discord attachment %s", att.filename,
)
reply_to_id = ""
if message.reference and message.reference.message_id:
reply_to_id = str(message.reference.message_id)
extra = _build_extra(message, self.user)
text = message.content
rich = _serialize_rich_content(message)
if rich:
text = f"{text}\n{rich}" if text else rich
# --- Resolve custom emojis as images --------------------------
cfg = self._config
if cfg is not None and cfg.resolve_emojis_as_images and text:
try:
from platforms.emoji_resolver import (
extract_discord_emojis,
rewrite_discord_emoji_text,
download_discord_emojis,
)
emoji_matches = extract_discord_emojis(text)
if emoji_matches:
emoji_atts = await download_discord_emojis(
emoji_matches,
max_emojis=cfg.max_emojis_per_message,
media_cache=self._media_cache,
)
if emoji_atts:
text = rewrite_discord_emoji_text(text, emoji_matches[:cfg.max_emojis_per_message])
attachments.extend(emoji_atts)
logger.info(
"Resolved %d/%d custom emojis as images",
len(emoji_atts), len(emoji_matches),
)
except Exception:
logger.debug("Emoji resolution failed", exc_info=True)
msg = IncomingMessage(
platform="discord",
channel_id=str(message.channel.id),
user_id=str(message.author.id),
user_name=message.author.display_name,
text=text,
is_addressed=is_addressed,
attachments=attachments,
channel_name=getattr(message.channel, "name", "DM"),
timestamp=message.created_at,
message_id=str(message.id),
reply_to_id=reply_to_id,
extra=extra,
reactions=_serialize_reactions(message),
)
logger.debug(
"[Discord/%s] %s: %s",
msg.channel_name,
msg.user_name,
msg.text[:120],
)
await self._platform._message_handler(msg, self._platform)
def _check_addressed(self, message: discord.Message) -> bool:
"""Return ``True`` when the bot should respond.
The bot responds if any of the following hold:
* The message is a DM (no guild).
* The bot is mentioned in the message.
* The message is a reply to one of the bot's own messages.
"""
# DM -- always respond
if message.guild is None:
return True
# Bot is @mentioned
if self.user is not None and self.user.mentioned_in(message):
return True
# Reply to one of the bot's own messages
if message.reference and message.reference.message_id:
if message.reference.message_id in self._sent_message_ids:
return True
return False
async def on_message_edit(
self, before: discord.Message, after: discord.Message,
) -> None:
"""Silently update the cached message content on any edit.
Edits never trigger a new LLM response. The updated context
will be visible the next time the bot is mentioned.
"""
if after.author == self.user:
return
text = after.content
rich = _serialize_rich_content(after)
if rich:
text = f"{text}\n{rich}" if text else rich
old_rich = _serialize_rich_content(before)
if before.content == after.content and old_rich == rich:
return
reply_to_id = ""
if after.reference and after.reference.message_id:
reply_to_id = str(after.reference.message_id)
handler = self._platform._message_update_handler
if handler is not None:
ts = after.edited_at or after.created_at
logger.debug(
"[Discord/%s] CACHE-UPDATE %s (msg %s)",
getattr(after.channel, "name", "DM"),
after.author.display_name, after.id,
)
await handler(
"discord",
str(after.channel.id),
str(after.id),
after.author.display_name,
str(after.author.id),
text,
ts.isoformat(),
reply_to_id,
)
async def on_message_delete(self, message: discord.Message) -> None:
"""Mark a deleted message in the cache so the bot retains context."""
if message.author == self.user:
return
handler = self._platform._message_delete_handler
if handler is not None:
ts = datetime.now(timezone.utc)
logger.debug(
"[Discord/%s] DELETE msg %s by %s",
getattr(message.channel, "name", "DM"),
message.id, message.author.display_name,
)
await handler(
"discord",
str(message.channel.id),
str(message.id),
ts.isoformat(),
)
async def on_raw_reaction_add(
self, payload: discord.RawReactionActionEvent,
) -> None:
"""Update the cached reaction metadata when a reaction is added."""
await self._handle_reaction_event(payload)
async def on_raw_reaction_remove(
self, payload: discord.RawReactionActionEvent,
) -> None:
"""Update the cached reaction metadata when a reaction is removed."""
await self._handle_reaction_event(payload)
async def _handle_reaction_event(
self, payload: discord.RawReactionActionEvent,
) -> None:
"""Shared handler for reaction add/remove — re-fetches the message
and patches the conversation history with the updated reaction summary.
"""
handler = self._platform._reaction_update_handler
if handler is None:
return
channel = self.get_channel(payload.channel_id)
if channel is None:
try:
channel = await self.fetch_channel(payload.channel_id)
except Exception:
return
if not hasattr(channel, "fetch_message"):
return
try:
message = await channel.fetch_message(payload.message_id) # type: ignore[union-attr]
except Exception:
logger.debug(
"Could not fetch message %s for reaction update",
payload.message_id, exc_info=True,
)
return
reactions_str = _serialize_reactions(message)
await handler(
"discord",
str(payload.channel_id),
str(payload.message_id),
reactions_str,
)
# Override send to track sent message IDs
async def _send_and_track(
self, channel: Any, content: str,
) -> discord.Message:
"""Internal helper: send and track.
Args:
channel (Any): The channel value.
content (str): Content data.
Returns:
discord.Message: The result.
"""
msg = await channel.send(content)
self._sent_message_ids.add(msg.id)
return msg
# -- GameGirl Color interaction handler # 🎮🌀 ---------------------
async def on_interaction(
self, interaction: discord.Interaction,
) -> None:
"""Handle button presses from GameGirl Color game sessions.
Flow:
1. Check if the interaction is a GameGirl Color button (custom_id
starts with 'gg:').
2. Look up the active game session for the channel.
3. Register the player's choice.
4. If first choice, start 10s countdown.
5. When countdown expires, format all choices and feed as
synthetic IncomingMessage to the message handler.
"""
if interaction.type != discord.InteractionType.component:
return
custom_id = interaction.data.get("custom_id", "") if interaction.data else ""
if not custom_id.startswith("gg:"):
return
parts = custom_id.split(":", 3)
if len(parts) < 3:
return
_, channel_id_from_btn, action = parts[0], parts[1], parts[2]
channel_id = str(interaction.channel_id)
user_id = str(interaction.user.id)
user_name = interaction.user.display_name
# Import game session # 💀
try:
from game_session import get_session
except ImportError:
logger.debug("game_session not available")
try:
await interaction.response.send_message(
"Game engine not loaded.", ephemeral=True,
)
except Exception:
pass
return
session = get_session(channel_id)
if session is None or not session.active:
try:
await interaction.response.send_message(
"No active game in this channel.", ephemeral=True,
)
except Exception:
pass
return
# Handle Menu button — open the submenu # 🎮
# After a restart, View callbacks won't exist, so on_interaction
# must be the fallback handler for ALL gg: interactions.
if action == "open_menu":
try:
from game_renderer import SubMenuView
await interaction.response.send_message(
"**\U0001f3ae GAME MENU \U0001f3ae**\n"
"Select an action:",
view=SubMenuView(channel_id),
ephemeral=True,
)
except Exception:
try:
await interaction.response.defer(ephemeral=True)
except Exception:
pass
return
# Submenu buttons — handled by _SubMenuButton.callback if View
# is alive, otherwise we defer and let the user retry # 💀
if action.startswith("menu_"):
try:
if not interaction.response.is_done():
await interaction.response.defer(ephemeral=True)
except Exception:
pass
return
if action in ("upload_asset", "menu_upload_asset"):
try:
await interaction.response.send_message(
"Asset upload: attach an image and label it "
"in your next message. Use format:\n"
"`[upload: category=enemy, name=Dark Slime]`",
ephemeral=True,
)
except Exception:
pass
return
if action in ("hot_swap", "menu_hot_swap"):
try:
await interaction.response.send_message(
"To hot-swap cartridges, type:\n"
"`[hot swap: New Game Name]`\n"
"or mention the bot and ask to swap games.",
ephemeral=True,
)
except Exception:
pass
return
if action == "new_game":
# This is handled by the NewGameButton callback
return
# Regular choice button # 😈
# Extract the choice label from the action
choice_label = action.replace("gg_", "").replace("_", " ").title()
# Also try to get the label from the button component
if interaction.message and interaction.message.components:
for row in interaction.message.components:
for child in row.children:
if hasattr(child, "custom_id") and child.custom_id == custom_id:
if hasattr(child, "label") and child.label:
choice_label = child.label
break
# Submit the choice to the session
is_first, remaining = await session.submit_choice(
user_id, user_name, choice_label,
)
# Acknowledge the button press # 💕
try:
await interaction.response.send_message(
f"**{user_name}** selected: **{choice_label}**\n"
f"{'Starting 10s countdown for other players...' if is_first else ''}"
f"{'Time remaining: {:.0f}s'.format(remaining) if not is_first else ''}",
ephemeral=False,
)
except Exception:
try:
await interaction.followup.send(
f"**{user_name}** selected: **{choice_label}**",
ephemeral=False,
)
except Exception:
pass
# If this is the first choice, start the countdown task # 🔥
if is_first:
async def _countdown_and_resolve() -> None:
"""Wait for countdown then resolve the turn."""
try:
choices = await session.wait_for_countdown()
if not choices:
return
# Format choices as synthetic input
synthetic_text = session.format_choices_as_input(choices)
# Build synthetic IncomingMessage # 🌀
from platforms.base import IncomingMessage
synthetic_msg = IncomingMessage(
platform="discord",
channel_id=channel_id,
user_id=user_id, # first player as primary
user_name=user_name,
text=synthetic_text,
is_addressed=True,
channel_name=getattr(
interaction.channel, "name", "game",
),
extra={
"is_game_turn": True,
"game_id": session.game_id,
"game_name": session.game_name,
"turn_number": session.turn_number,
"all_choices": choices,
},
)
# Feed to the message handler
await self._platform._message_handler(
synthetic_msg, self._platform,
)
except asyncio.CancelledError:
pass
except Exception:
logger.exception(
"GameGirl Color countdown/resolve failed",
)
# Cancel any existing countdown # 💀
if session.countdown_task and not session.countdown_task.done():
session.countdown_task.cancel()
session.countdown_task = asyncio.create_task(
_countdown_and_resolve(),
)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _build_extra(
message: discord.Message,
bot_user: discord.User | None,
) -> dict[str, Any]:
"""Build the ``extra`` dict for :class:`IncomingMessage`.
Extracts guild metadata, user details, bot permissions, and a
compact channel-member list so that
:class:`~prompt_context.PromptContextBuilder` can populate every
template variable without touching the Discord API itself.
"""
guild = message.guild
guild_id = str(guild.id) if guild else ""
guild_name = guild.name if guild else ""
channel = message.channel
extra: dict[str, Any] = {
"guild_id": guild_id,
"guild_name": guild_name,
"bot_id": str(bot_user.id) if bot_user else "",
"channel_topic": getattr(channel, "topic", "") or "",
"member_count": (
guild.member_count if guild else 0
),
"is_dm": guild is None,
"is_server_admin": (
message.author.guild_permissions.administrator
if isinstance(message.author, discord.Member)
else False
),
}
# -- Interacting user details ----------------------------------
author = message.author
user_details: dict[str, str] = {
"id": str(author.id),
"name": str(author),
"nickname": author.display_name,
"top_role": "",
"created_at": author.created_at.isoformat(),
"joined_at": "",
}
if isinstance(author, discord.Member):
user_details["top_role"] = author.top_role.name
if author.joined_at is not None:
user_details["joined_at"] = (
author.joined_at.isoformat()
)
extra["user_details"] = user_details
# -- Bot permissions in this channel ---------------------------
if guild is not None and bot_user is not None:
me = guild.get_member(bot_user.id)
if me is not None:
chan_perms = channel.permissions_for(me) # type: ignore[union-attr]
extra["bot_permissions"] = [
perm_name
for perm_name, has in chan_perms
if has
]
return extra
def _serialize_embed(embed: discord.Embed) -> str:
"""Serialize a single :class:`discord.Embed` to XML."""
lines = ["<embed>"]
if embed.author and embed.author.name:
lines.append(f" <author>{embed.author.name}</author>")
if embed.title:
lines.append(f" <title>{embed.title}</title>")
if embed.url:
lines.append(f" <url>{embed.url}</url>")
if embed.description:
lines.append(f" <description>{embed.description}</description>")
for field in embed.fields:
lines.append(f' <field name="{field.name}">{field.value}</field>')
if embed.image:
lines.append(f" <image>{embed.image.url}</image>")
if embed.thumbnail:
lines.append(f" <thumbnail>{embed.thumbnail.url}</thumbnail>")
if embed.footer and embed.footer.text:
lines.append(f" <footer>{embed.footer.text}</footer>")
if embed.video:
lines.append(f" <video>{embed.video.url}</video>")
lines.append("</embed>")
if len(lines) <= 2:
return ""
return "\n".join(lines)
def _serialize_rich_content(message: discord.Message) -> str:
"""Convert embeds, forwarded messages, stickers and polls to XML text.
Returns an empty string when the message contains no rich content.
The result is intended to be appended to the plain-text body so the
LLM can interpret Discord rich content that would otherwise be lost.
"""
parts: list[str] = []
for embed in message.embeds:
serialized = _serialize_embed(embed)
if serialized:
parts.append(serialized)
for snapshot in getattr(message, "message_snapshots", None) or []:
lines = ["<forwarded_message>"]
if snapshot.content:
lines.append(f" <content>{snapshot.content}</content>")
for emb in getattr(snapshot, "embeds", []) or []:
serialized = _serialize_embed(emb)
if serialized:
lines.append(serialized)
if snapshot.attachments:
for att in snapshot.attachments:
lines.append(f" <attachment url=\"{att.url}\">{att.filename}</attachment>")
lines.append("</forwarded_message>")
if len(lines) > 2:
parts.append("\n".join(lines))
for sticker in message.stickers:
parts.append(f"[Sticker: {sticker.name}]")
poll = getattr(message, "poll", None)
if poll is not None:
question_text = getattr(poll.question, "text", None) or str(poll.question)
lines = [f'<poll question="{question_text}">']
for answer in poll.answers:
answer_text = getattr(answer, "text", None) or str(answer)
lines.append(f" <option>{answer_text}</option>")
lines.append("</poll>")
parts.append("\n".join(lines))
return "\n".join(parts)
def _serialize_reactions(message: discord.Message) -> str:
"""Serialize a Discord message's reactions to a compact string.
Returns a string like ``"👍×3, 🔥×1, :custom:×2"`` or ``""``
if the message has no reactions.
"""
if not message.reactions:
return ""
parts: list[str] = []
for reaction in message.reactions:
emoji = reaction.emoji
if isinstance(emoji, str):
label = emoji
elif hasattr(emoji, "name"):
label = f":{emoji.name}:"
else:
label = str(emoji)
parts.append(f"{label}×{reaction.count}")
return ", ".join(parts)