Source code for tools.discord_embed

"""``discord_embed`` tool: build and post a rich Discord embed to a channel.

Single-tool module in the ``tools/`` plugin format (``TOOL_NAME`` /
``TOOL_DESCRIPTION`` / ``TOOL_PARAMETERS`` / ``run``) that lets the LLM emit a
fully structured Discord embed from a JSON spec instead of plain text. The
module is discovered and registered by ``tool_loader.py``, and its :func:`run`
handler is dispatched by name from the tool loop; it talks to Discord through
the gateway's cached ``discord.Client`` (resolved via
``tools._discord_helpers``) rather than over raw HTTP itself.
"""

from __future__ import annotations

import datetime
import jsonutil as json
import logging
from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

TOOL_NAME = "discord_embed"
TOOL_DESCRIPTION = (
    "Create and send a rich Discord Embed to a channel from a JSON "
    "specification. Supports title, description, color, author, "
    "footer, image, thumbnail, fields, and timestamps."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "channel_id": {
            "type": "string",
            "description": "Discord channel ID to send the embed to.",
        },
        "embed_spec": {
            "type": "string",
            "description": (
                "JSON string describing the embed. Keys: title, "
                "description, url, color (int or '#RRGGBB'), "
                "timestamp (ISO8601), author {name, url, icon_url}, "
                "footer {text, icon_url}, image {url}, "
                "thumbnail {url}, fields [{name, value, inline}]."
            ),
        },
        "content": {
            "type": "string",
            "description": ("Optional plain text to accompany the embed."),
        },
    },
    "required": ["channel_id", "embed_spec"],
}


def _parse_color(value: Any):
    """Normalize a user-supplied color into a ``discord.Colour`` or ``None``.

    Tolerates the several shapes a model might emit for the embed's ``color``
    field so a slightly-off value degrades to an uncolored embed rather than
    crashing the tool: a ``"#RRGGBB"`` hex string, a decimal string, or a raw
    integer in the ``0..0xFFFFFF`` range. Anything out of range, unparseable, or
    of an unexpected type yields ``None``. ``discord`` is imported lazily here so
    merely importing this module does not pull in the Discord library.

    Called only by :func:`run` in this module to populate the embed colour.

    Args:
        value: The candidate color (hex/decimal string, int, float, or ``None``).

    Returns:
        discord.Colour | None: The parsed colour, or ``None`` when the value is
        absent or invalid.
    """
    import discord

    if value is None:
        return None
    try:
        if isinstance(value, str):
            s = value.strip()
            if s.startswith("#"):
                return discord.Colour(int(s[1:], 16))
            return discord.Colour(int(s, 10))
        if isinstance(value, (int, float)):
            iv = int(value)
            if 0 <= iv <= 0xFFFFFF:
                return discord.Colour(iv)
    except Exception:
        pass
    return None


def _parse_timestamp(value: Any):
    """Normalize a user-supplied timestamp into a tz-aware ``datetime`` or ``None``.

    Accepts either a numeric Unix epoch (interpreted as UTC seconds) or an
    ISO-8601 string, tolerating a trailing ``Z`` by rewriting it to ``+00:00``
    and assuming UTC for any naive value so the embed always carries an
    unambiguous, timezone-aware time. Any falsy input or parse failure returns
    ``None`` so a bad timestamp simply omits the footer time rather than failing
    the whole embed.

    Called only by :func:`run` in this module to set the embed timestamp.

    Args:
        value: The candidate timestamp (epoch number, ISO-8601 string, or
            falsy/``None``).

    Returns:
        datetime.datetime | None: A timezone-aware datetime, or ``None`` when the
        value is missing or cannot be parsed.
    """
    if not value:
        return None
    if isinstance(value, (int, float)):
        try:
            return datetime.datetime.fromtimestamp(
                float(value),
                tz=datetime.timezone.utc,
            )
        except Exception:
            return None
    if isinstance(value, str):
        s = value.strip()
        if s.endswith("Z"):
            s = s[:-1] + "+00:00"
        try:
            dt = datetime.datetime.fromisoformat(s)
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=datetime.timezone.utc)
            return dt
        except Exception:
            return None
    return None


[docs] async def run( channel_id: str, embed_spec: str, content: str | None = None, ctx: ToolContext | None = None, ) -> str: """Execute this tool and return the result. Args: channel_id (str): Discord/Matrix channel identifier. embed_spec (str): The embed spec value. content (str | None): Content data. ctx (ToolContext | None): Tool execution context providing access to bot internals. Returns: str: Result string. """ import discord from tools._discord_helpers import ( require_discord_client, get_messageable_channel, ) from platforms.discord_rich_content import merge_content_with_rich_content client = require_discord_client(ctx) if isinstance(client, str): return client channel = await get_messageable_channel(client, channel_id) if isinstance(channel, str): return channel if not hasattr(channel, "send"): return f"Error: Channel '{channel_id}' does not support " f"sending messages." # Parse spec try: spec: dict[str, Any] = ( json.loads(embed_spec) if isinstance(embed_spec, str) else embed_spec ) if not isinstance(spec, dict): return "Error: embed_spec must be a JSON object." except json.JSONDecodeError as exc: return f"Error: Failed to parse embed_spec JSON: {exc}" title = spec.get("title") description = spec.get("description") url = spec.get("url") colour = _parse_color(spec.get("color", spec.get("colour"))) timestamp_dt = _parse_timestamp(spec.get("timestamp")) if title and len(str(title)) > 256: return "Error: 'title' exceeds 256 characters." if description and len(str(description)) > 4096: return "Error: 'description' exceeds 4096 characters." try: embed = discord.Embed( title=title, description=description, url=url or None, colour=colour, timestamp=timestamp_dt, ) except Exception as exc: return f"Error: Failed to construct Embed: {exc}" # Author author = spec.get("author") if isinstance(author, dict): name = author.get("name") if name and len(str(name)) > 256: return "Error: 'author.name' exceeds 256 characters." embed.set_author( name=name, url=author.get("url"), icon_url=author.get("icon_url"), ) # Footer footer = spec.get("footer") if isinstance(footer, dict): text = footer.get("text") if text and len(str(text)) > 2048: return "Error: 'footer.text' exceeds 2048 characters." embed.set_footer( text=text, icon_url=footer.get("icon_url"), ) # Images image = spec.get("image") if isinstance(image, dict) and image.get("url"): embed.set_image(url=image["url"]) thumbnail = spec.get("thumbnail") if isinstance(thumbnail, dict) and thumbnail.get("url"): embed.set_thumbnail(url=thumbnail["url"]) # Fields fields = spec.get("fields") if fields is not None: if not isinstance(fields, list): return "Error: 'fields' must be a list of objects." if len(fields) > 25: return "Error: 'fields' exceeds maximum of 25." for i, field in enumerate(fields): if not isinstance(field, dict): return f"Error: Field at index {i} must be an object." fname = field.get("name") fvalue = field.get("value") if fname is None or fvalue is None: return f"Error: Field at index {i} requires " f"'name' and 'value'." if len(str(fname)) > 256: return f"Error: Field name at index {i} exceeds " f"256 characters." if len(str(fvalue)) > 1024: return f"Error: Field value at index {i} exceeds " f"1024 characters." embed.add_field( name=str(fname), value=str(fvalue), inline=bool(field.get("inline", False)), ) # Total length check total = sum( [ len(embed.title or ""), len(embed.description or ""), len(embed.footer.text if embed.footer else ""), len(embed.author.name if embed.author else ""), ] + [len(f.name) + len(f.value) for f in embed.fields] ) if total > 6000: return "Error: Total embed content exceeds 6000 characters." try: sent_msg = await channel.send(content=content or None, embed=embed) rich_text = merge_content_with_rich_content( getattr(sent_msg, "content", content or ""), sent_msg, ) if ctx is not None and rich_text: ctx.sent_rich_messages.append( { "text": rich_text, "message_id": str(getattr(sent_msg, "id", "") or ""), } ) ch_name = getattr(channel, "name", channel_id) result = f"Successfully sent embed to channel '{ch_name}'." if rich_text: result += f"\n\nSent embed content:\n{rich_text}" return result except discord.errors.Forbidden: return ( f"Error: I don't have permission to send messages " f"to channel '{channel_id}'." ) except Exception as exc: return f"An unexpected error occurred: {exc}"