"""Resolve custom emojis from Discord and Matrix into image Attachments.
Custom emojis are platform-specific rich content that arrives as text
tokens (Discord ``<:name:id>``) or HTML ``<img>`` tags (Matrix). This
module extracts them from message text, downloads the images, and
returns :class:`~platforms.base.Attachment` objects so the LLM can
*see* the emojis as inline PNG images.
Both Discord and Matrix adapters call into these shared utilities from
their ``on_message`` handlers.
"""
from __future__ import annotations
import asyncio
import logging
import re
from dataclasses import dataclass
from typing import Any
import aiohttp
from platforms.base import Attachment
logger = logging.getLogger(__name__)
# ------------------------------------------------------------------
# Discord emoji extraction
# ------------------------------------------------------------------
# Matches <:name:id> and <a:name:id> (animated)
DISCORD_EMOJI_RE = re.compile(
r"<(a?):(\w{2,32}):(\d{17,20})>",
)
_DISCORD_CDN = "https://cdn.discordapp.com/emojis"
[docs]
@dataclass(frozen=True)
class DiscordEmojiMatch:
"""A single custom emoji found in Discord message text."""
name: str
emoji_id: str
animated: bool
full_match: str # the original ``<:name:id>`` token
[docs]
def rewrite_discord_emoji_text(
text: str,
matches: list[DiscordEmojiMatch],
) -> str:
"""Replace every ``<:name:id>`` token with ``[emoji: name]``."""
for em in matches:
text = text.replace(em.full_match, f"[emoji: {em.name}]")
return text
async def _download_one_discord_emoji(
em: DiscordEmojiMatch,
media_cache: Any | None = None,
) -> Attachment | None:
"""Download a single Discord custom emoji from the CDN."""
ext = "gif" if em.animated else "png"
url = f"{_DISCORD_CDN}/{em.emoji_id}.{ext}"
mimetype = f"image/{ext}"
filename = f"{em.name}_{em.emoji_id}.{ext}"
try:
if media_cache is not None:
async def _download() -> tuple[bytes, str, str]:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
resp.raise_for_status()
data = await resp.read()
return data, mimetype, filename
data, mimetype, filename = await media_cache.get_or_download(
url, _download,
)
else:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
resp.raise_for_status()
data = await resp.read()
return Attachment(
data=data,
mimetype=mimetype,
filename=filename,
source_url=url,
)
except Exception:
logger.warning(
"Failed to download Discord emoji %s (%s)",
em.name, url, exc_info=True,
)
return None
[docs]
async def download_discord_emojis(
matches: list[DiscordEmojiMatch],
*,
max_emojis: int = 5,
media_cache: Any | None = None,
) -> list[Attachment]:
"""Download up to *max_emojis* custom emojis in parallel.
Returns a list of successfully downloaded :class:`Attachment` objects.
Failed downloads are silently skipped.
"""
bounded = matches[:max_emojis]
if not bounded:
return []
results = await asyncio.gather(
*[_download_one_discord_emoji(em, media_cache) for em in bounded],
return_exceptions=True,
)
return [r for r in results if isinstance(r, Attachment)]
# ------------------------------------------------------------------
# Matrix emoji extraction
# ------------------------------------------------------------------
# Matches <img ... src="mxc://..." ...> in Matrix formatted_body HTML.
# Matrix custom emojis use data-mx-emoticon attribute and mxc:// src.
_MATRIX_EMOJI_IMG_RE = re.compile(
r'<img\b[^>]*?\bsrc="(mxc://[^"]+)"[^>]*?>',
re.IGNORECASE,
)
# Extract alt text from an <img> tag
_MATRIX_IMG_ALT_RE = re.compile(
r'\balt="([^"]*)"',
re.IGNORECASE,
)
[docs]
@dataclass(frozen=True)
class MatrixEmojiMatch:
"""A single custom emoji found in Matrix formatted_body HTML."""
alt_text: str
mxc_url: str
full_tag: str # the complete ``<img ...>`` tag
[docs]
def rewrite_matrix_emoji_text(
body: str,
matches: list[MatrixEmojiMatch],
) -> str:
"""Replace Matrix emoji shortcodes in *body* with ``[emoji: alt]``.
Matrix clients typically put the shortcode (e.g. ``:wave:``) in the
plain-text body. We strip the surrounding colons and wrap it in
a descriptive bracket notation.
"""
for em in matches:
# The alt_text often looks like ":emoji_name:" — clean it up
clean = em.alt_text.strip(":")
if not clean:
clean = "emoji"
# The plain body may contain the shortcode version
shortcode = f":{clean}:"
if shortcode in body:
body = body.replace(shortcode, f"[emoji: {clean}]", 1)
return body
[docs]
async def download_matrix_emojis(
matches: list[MatrixEmojiMatch],
matrix_client: Any,
*,
max_emojis: int = 5,
media_cache: Any | None = None,
) -> list[Attachment]:
"""Download up to *max_emojis* Matrix custom emojis in parallel.
Uses the matrix-nio ``AsyncClient.download()`` method to fetch
media from Matrix homeserver MXC URLs.
"""
from nio.responses import DownloadError
bounded = matches[:max_emojis]
if not bounded:
return []
async def _download_one(em: MatrixEmojiMatch) -> Attachment | None:
mxc_url = em.mxc_url
clean_name = em.alt_text.strip(":") or "emoji"
filename = f"{clean_name}.png"
try:
if media_cache is not None:
async def _dl() -> tuple[bytes, str, str]:
resp = await matrix_client.download(mxc=mxc_url)
if isinstance(resp, DownloadError):
raise RuntimeError(
f"Matrix download failed for {mxc_url}: {resp.message}",
)
mimetype = resp.content_type or "image/png"
return resp.body, mimetype, filename
data, mimetype, fname = await media_cache.get_or_download(
mxc_url, _dl,
)
else:
resp = await matrix_client.download(mxc=mxc_url)
if isinstance(resp, DownloadError):
raise RuntimeError(
f"Matrix download failed for {mxc_url}: {resp.message}",
)
data = resp.body
mimetype = resp.content_type or "image/png"
fname = filename
return Attachment(
data=data,
mimetype=mimetype,
filename=fname,
source_url=mxc_url,
)
except Exception:
logger.warning(
"Failed to download Matrix emoji %s (%s)",
em.alt_text, mxc_url, exc_info=True,
)
return None
results = await asyncio.gather(
*[_download_one(em) for em in bounded],
return_exceptions=True,
)
return [r for r in results if isinstance(r, Attachment)]