Source code for platforms.emoji_resolver

"""Resolve custom emojis from Discord and Matrix into image Attachments.

Custom emojis are platform-specific rich content that arrives as text
tokens (Discord ``<:name:id>``) or HTML ``<img>`` tags (Matrix).  This
module extracts them from message text, downloads the images, and
returns :class:`~platforms.base.Attachment` objects so the LLM can
*see* the emojis as inline PNG images.

Both Discord and Matrix adapters call into these shared utilities from
their ``on_message`` handlers.
"""

from __future__ import annotations

import asyncio
import logging
import re
from dataclasses import dataclass
from typing import Any

import aiohttp

from platforms.base import Attachment

logger = logging.getLogger(__name__)

# ------------------------------------------------------------------
# Discord emoji extraction
# ------------------------------------------------------------------

# Matches  <:name:id>  and  <a:name:id>  (animated)
DISCORD_EMOJI_RE = re.compile(
    r"<(a?):(\w{2,32}):(\d{17,20})>",
)

_DISCORD_CDN = "https://cdn.discordapp.com/emojis"


[docs] @dataclass(frozen=True) class DiscordEmojiMatch: """A single custom emoji found in Discord message text.""" name: str emoji_id: str animated: bool full_match: str # the original ``<:name:id>`` token
[docs] def extract_discord_emojis(text: str) -> list[DiscordEmojiMatch]: """Return unique custom emojis found in *text*, preserving order.""" seen: set[str] = set() results: list[DiscordEmojiMatch] = [] for m in DISCORD_EMOJI_RE.finditer(text): emoji_id = m.group(3) if emoji_id in seen: continue seen.add(emoji_id) results.append(DiscordEmojiMatch( name=m.group(2), emoji_id=emoji_id, animated=bool(m.group(1)), full_match=m.group(0), )) return results
[docs] def rewrite_discord_emoji_text( text: str, matches: list[DiscordEmojiMatch], ) -> str: """Replace every ``<:name:id>`` token with ``[emoji: name]``.""" for em in matches: text = text.replace(em.full_match, f"[emoji: {em.name}]") return text
async def _download_one_discord_emoji( em: DiscordEmojiMatch, media_cache: Any | None = None, ) -> Attachment | None: """Download a single Discord custom emoji from the CDN.""" ext = "gif" if em.animated else "png" url = f"{_DISCORD_CDN}/{em.emoji_id}.{ext}" mimetype = f"image/{ext}" filename = f"{em.name}_{em.emoji_id}.{ext}" try: if media_cache is not None: async def _download() -> tuple[bytes, str, str]: async with aiohttp.ClientSession() as session: async with session.get(url) as resp: resp.raise_for_status() data = await resp.read() return data, mimetype, filename data, mimetype, filename = await media_cache.get_or_download( url, _download, ) else: async with aiohttp.ClientSession() as session: async with session.get(url) as resp: resp.raise_for_status() data = await resp.read() return Attachment( data=data, mimetype=mimetype, filename=filename, source_url=url, ) except Exception: logger.warning( "Failed to download Discord emoji %s (%s)", em.name, url, exc_info=True, ) return None
[docs] async def download_discord_emojis( matches: list[DiscordEmojiMatch], *, max_emojis: int = 5, media_cache: Any | None = None, ) -> list[Attachment]: """Download up to *max_emojis* custom emojis in parallel. Returns a list of successfully downloaded :class:`Attachment` objects. Failed downloads are silently skipped. """ bounded = matches[:max_emojis] if not bounded: return [] results = await asyncio.gather( *[_download_one_discord_emoji(em, media_cache) for em in bounded], return_exceptions=True, ) return [r for r in results if isinstance(r, Attachment)]
# ------------------------------------------------------------------ # Matrix emoji extraction # ------------------------------------------------------------------ # Matches <img ... src="mxc://..." ...> in Matrix formatted_body HTML. # Matrix custom emojis use data-mx-emoticon attribute and mxc:// src. _MATRIX_EMOJI_IMG_RE = re.compile( r'<img\b[^>]*?\bsrc="(mxc://[^"]+)"[^>]*?>', re.IGNORECASE, ) # Extract alt text from an <img> tag _MATRIX_IMG_ALT_RE = re.compile( r'\balt="([^"]*)"', re.IGNORECASE, )
[docs] @dataclass(frozen=True) class MatrixEmojiMatch: """A single custom emoji found in Matrix formatted_body HTML.""" alt_text: str mxc_url: str full_tag: str # the complete ``<img ...>`` tag
[docs] def extract_matrix_emojis(formatted_body: str) -> list[MatrixEmojiMatch]: """Return unique custom emojis found in Matrix *formatted_body* HTML.""" seen: set[str] = set() results: list[MatrixEmojiMatch] = [] for m in _MATRIX_EMOJI_IMG_RE.finditer(formatted_body): mxc_url = m.group(1) if mxc_url in seen: continue seen.add(mxc_url) full_tag = m.group(0) alt_match = _MATRIX_IMG_ALT_RE.search(full_tag) alt_text = alt_match.group(1) if alt_match else "emoji" results.append(MatrixEmojiMatch( alt_text=alt_text, mxc_url=mxc_url, full_tag=full_tag, )) return results
[docs] def rewrite_matrix_emoji_text( body: str, matches: list[MatrixEmojiMatch], ) -> str: """Replace Matrix emoji shortcodes in *body* with ``[emoji: alt]``. Matrix clients typically put the shortcode (e.g. ``:wave:``) in the plain-text body. We strip the surrounding colons and wrap it in a descriptive bracket notation. """ for em in matches: # The alt_text often looks like ":emoji_name:" — clean it up clean = em.alt_text.strip(":") if not clean: clean = "emoji" # The plain body may contain the shortcode version shortcode = f":{clean}:" if shortcode in body: body = body.replace(shortcode, f"[emoji: {clean}]", 1) return body
[docs] async def download_matrix_emojis( matches: list[MatrixEmojiMatch], matrix_client: Any, *, max_emojis: int = 5, media_cache: Any | None = None, ) -> list[Attachment]: """Download up to *max_emojis* Matrix custom emojis in parallel. Uses the matrix-nio ``AsyncClient.download()`` method to fetch media from Matrix homeserver MXC URLs. """ from nio.responses import DownloadError bounded = matches[:max_emojis] if not bounded: return [] async def _download_one(em: MatrixEmojiMatch) -> Attachment | None: mxc_url = em.mxc_url clean_name = em.alt_text.strip(":") or "emoji" filename = f"{clean_name}.png" try: if media_cache is not None: async def _dl() -> tuple[bytes, str, str]: resp = await matrix_client.download(mxc=mxc_url) if isinstance(resp, DownloadError): raise RuntimeError( f"Matrix download failed for {mxc_url}: {resp.message}", ) mimetype = resp.content_type or "image/png" return resp.body, mimetype, filename data, mimetype, fname = await media_cache.get_or_download( mxc_url, _dl, ) else: resp = await matrix_client.download(mxc=mxc_url) if isinstance(resp, DownloadError): raise RuntimeError( f"Matrix download failed for {mxc_url}: {resp.message}", ) data = resp.body mimetype = resp.content_type or "image/png" fname = filename return Attachment( data=data, mimetype=mimetype, filename=fname, source_url=mxc_url, ) except Exception: logger.warning( "Failed to download Matrix emoji %s (%s)", em.alt_text, mxc_url, exc_info=True, ) return None results = await asyncio.gather( *[_download_one(em) for em in bounded], return_exceptions=True, ) return [r for r in results if isinstance(r, Attachment)]