Source code for tools.discord_embed

"""Create and send a Discord Embed to a channel."""

from __future__ import annotations

import datetime
import json
import logging
from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

TOOL_NAME = "discord_embed"
TOOL_DESCRIPTION = (
    "Create and send a rich Discord Embed to a channel from a JSON "
    "specification. Supports title, description, color, author, "
    "footer, image, thumbnail, fields, and timestamps."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "channel_id": {
            "type": "string",
            "description": "Discord channel ID to send the embed to.",
        },
        "embed_spec": {
            "type": "string",
            "description": (
                "JSON string describing the embed. Keys: title, "
                "description, url, color (int or '#RRGGBB'), "
                "timestamp (ISO8601), author {name, url, icon_url}, "
                "footer {text, icon_url}, image {url}, "
                "thumbnail {url}, fields [{name, value, inline}]."
            ),
        },
        "content": {
            "type": "string",
            "description": (
                "Optional plain text to accompany the embed."
            ),
        },
    },
    "required": ["channel_id", "embed_spec"],
}


def _parse_color(value: Any):
    """Coerce a color value to a discord.Colour or None."""
    import discord
    if value is None:
        return None
    try:
        if isinstance(value, str):
            s = value.strip()
            if s.startswith("#"):
                return discord.Colour(int(s[1:], 16))
            return discord.Colour(int(s, 10))
        if isinstance(value, (int, float)):
            iv = int(value)
            if 0 <= iv <= 0xFFFFFF:
                return discord.Colour(iv)
    except Exception:
        pass
    return None


def _parse_timestamp(value: Any):
    """Coerce a timestamp value to datetime or None."""
    if not value:
        return None
    if isinstance(value, (int, float)):
        try:
            return datetime.datetime.fromtimestamp(
                float(value), tz=datetime.timezone.utc,
            )
        except Exception:
            return None
    if isinstance(value, str):
        s = value.strip()
        if s.endswith("Z"):
            s = s[:-1] + "+00:00"
        try:
            dt = datetime.datetime.fromisoformat(s)
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=datetime.timezone.utc)
            return dt
        except Exception:
            return None
    return None


[docs] async def run( channel_id: str, embed_spec: str, content: str | None = None, ctx: ToolContext | None = None, ) -> str: """Execute this tool and return the result. Args: channel_id (str): Discord/Matrix channel identifier. embed_spec (str): The embed spec value. content (str | None): Content data. ctx (ToolContext | None): Tool execution context providing access to bot internals. Returns: str: Result string. """ import discord from tools._discord_helpers import ( require_discord_client, get_channel, ) client = require_discord_client(ctx) if isinstance(client, str): return client channel = await get_channel(client, channel_id) if isinstance(channel, str): return channel if not hasattr(channel, "send"): return ( f"Error: Channel '{channel_id}' does not support " f"sending messages." ) # Parse spec try: spec: dict[str, Any] = ( json.loads(embed_spec) if isinstance(embed_spec, str) else embed_spec ) if not isinstance(spec, dict): return "Error: embed_spec must be a JSON object." except json.JSONDecodeError as exc: return f"Error: Failed to parse embed_spec JSON: {exc}" title = spec.get("title") description = spec.get("description") url = spec.get("url") colour = _parse_color(spec.get("color", spec.get("colour"))) timestamp_dt = _parse_timestamp(spec.get("timestamp")) if title and len(str(title)) > 256: return "Error: 'title' exceeds 256 characters." if description and len(str(description)) > 4096: return "Error: 'description' exceeds 4096 characters." try: embed = discord.Embed( title=title, description=description, url=url or None, colour=colour, timestamp=timestamp_dt, ) except Exception as exc: return f"Error: Failed to construct Embed: {exc}" # Author author = spec.get("author") if isinstance(author, dict): name = author.get("name") if name and len(str(name)) > 256: return "Error: 'author.name' exceeds 256 characters." embed.set_author( name=name, url=author.get("url"), icon_url=author.get("icon_url"), ) # Footer footer = spec.get("footer") if isinstance(footer, dict): text = footer.get("text") if text and len(str(text)) > 2048: return "Error: 'footer.text' exceeds 2048 characters." embed.set_footer( text=text, icon_url=footer.get("icon_url"), ) # Images image = spec.get("image") if isinstance(image, dict) and image.get("url"): embed.set_image(url=image["url"]) thumbnail = spec.get("thumbnail") if isinstance(thumbnail, dict) and thumbnail.get("url"): embed.set_thumbnail(url=thumbnail["url"]) # Fields fields = spec.get("fields") if fields is not None: if not isinstance(fields, list): return "Error: 'fields' must be a list of objects." if len(fields) > 25: return "Error: 'fields' exceeds maximum of 25." for i, field in enumerate(fields): if not isinstance(field, dict): return f"Error: Field at index {i} must be an object." fname = field.get("name") fvalue = field.get("value") if fname is None or fvalue is None: return ( f"Error: Field at index {i} requires " f"'name' and 'value'." ) if len(str(fname)) > 256: return ( f"Error: Field name at index {i} exceeds " f"256 characters." ) if len(str(fvalue)) > 1024: return ( f"Error: Field value at index {i} exceeds " f"1024 characters." ) embed.add_field( name=str(fname), value=str(fvalue), inline=bool(field.get("inline", False)), ) # Total length check total = sum([ len(embed.title or ""), len(embed.description or ""), len(embed.footer.text if embed.footer else ""), len(embed.author.name if embed.author else ""), ] + [len(f.name) + len(f.value) for f in embed.fields]) if total > 6000: return "Error: Total embed content exceeds 6000 characters." try: await channel.send(content=content or None, embed=embed) ch_name = getattr(channel, "name", channel_id) return f"Successfully sent embed to channel '{ch_name}'." except discord.errors.Forbidden: return ( f"Error: I don't have permission to send messages " f"to channel '{channel_id}'." ) except Exception as exc: return f"An unexpected error occurred: {exc}"