Source code for platforms.base

"""Abstract base types for platform adapters.

Every chat platform (Matrix, Discord, Slack, ...) implements
:class:`PlatformAdapter`.  Incoming events are normalised into
:class:`IncomingMessage` before being handed to the shared
:class:`~message_processor.MessageProcessor`.
"""

from __future__ import annotations

import abc
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Callable, Awaitable


# ------------------------------------------------------------------
# Unified message / attachment models
# ------------------------------------------------------------------

[docs] @dataclass class Attachment: """A downloaded media attachment, already in raw bytes.""" data: bytes mimetype: str filename: str source_url: str = "" """Original URL the media was fetched from (MXC, CDN, …)."""
[docs] @dataclass class IncomingMessage: """Platform-agnostic representation of an incoming chat message. Platform adapters construct one of these for every event and pass it to the :class:`~message_processor.MessageProcessor`. """ platform: str """Short identifier for the originating platform. For example: ``"matrix"``, ``"discord"``. """ channel_id: str """Platform-specific channel / room identifier.""" user_id: str """Platform-specific sender identifier.""" user_name: str """Human-readable display name of the sender.""" text: str """Plain-text body of the message (may be empty for media-only messages).""" is_addressed: bool """Whether the bot was explicitly addressed (mention, DM, reply). """ attachments: list[Attachment] = field(default_factory=list) """Media attachments that have already been downloaded.""" channel_name: str = "" """Human-readable channel / room name (for prompt context).""" timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) """UTC timestamp of the message.""" message_id: str = "" """Platform-specific message identifier.""" reply_to_id: str = "" """ID of the message being replied to, if any.""" extra: dict[str, Any] = field(default_factory=dict) """Arbitrary platform-specific metadata.""" reactions: str = "" """Serialized reaction summary, e.g. ``"👍×3, 🔥×1"``."""
[docs] @dataclass class HistoricalMessage: """Lightweight representation of a message fetched from platform history. Used by :meth:`PlatformAdapter.fetch_history` to return recent messages for backfilling conversation context after downtime. """ user_id: str user_name: str text: str timestamp: datetime message_id: str = "" is_bot: bool = False """``True`` when this message was sent by the bot itself.""" reply_to_id: str = "" """ID of the message this one replies to, if any.""" reactions: str = "" """Serialized reaction summary, e.g. ``"👍×3, 🔥×1"``."""
# ------------------------------------------------------------------ # Adapter interface # ------------------------------------------------------------------ # Type alias for the callback a platform uses to deliver messages # to the MessageProcessor. MessageHandler = Callable[ [IncomingMessage, "PlatformAdapter"], Awaitable[None], ] MessageUpdateHandler = Callable[ [str, str, str, str, str, str, str, str], Awaitable[None], ] MessageDeleteHandler = Callable[ [str, str, str, str], Awaitable[None], ]
[docs] class PlatformAdapter(abc.ABC): """Interface that every platform must implement. Subclasses wire up their SDK's event loop, convert native events to :class:`IncomingMessage`, and forward them to the *message_handler* callback supplied at construction time. """
[docs] def __init__(self, message_handler: MessageHandler) -> None: """Initialize the instance. Args: message_handler (MessageHandler): The message handler value. """ self._message_handler = message_handler self._message_update_handler: MessageUpdateHandler | None = None self._message_delete_handler: MessageDeleteHandler | None = None self._cancel_callback: Callable[..., Awaitable[str]] | None = None
# -- Metadata ------------------------------------------------------ @property @abc.abstractmethod def name(self) -> str: """Short lowercase identifier, e.g. ``"matrix"`` or ``"discord"``.""" @property @abc.abstractmethod def is_running(self) -> bool: """Return ``True`` while the platform event loop is active.""" @property def bot_identity(self) -> dict[str, str]: """Return the bot's own identity on this platform. Returns a dict with at minimum ``platform`` and ``user_id``. Adapters should override to provide ``display_name`` and ``mention`` where available. The default returns an empty ``user_id`` (safe to call before login completes). """ return {"platform": self.name, "user_id": "", "display_name": ""} # -- Lifecycle -----------------------------------------------------
[docs] @abc.abstractmethod async def start(self) -> None: """Connect to the service, authenticate, and begin listening."""
[docs] @abc.abstractmethod async def stop(self) -> None: """Gracefully disconnect and release resources."""
# -- Outbound messaging --------------------------------------------
[docs] @abc.abstractmethod async def send(self, channel_id: str, text: str) -> str: """Send a plain-text message to *channel_id*. Returns the platform message ID of the sent message, or ``""`` if the send failed or the platform does not expose one. """
[docs] @abc.abstractmethod async def send_file( self, channel_id: str, data: bytes, filename: str, mimetype: str = "application/octet-stream", ) -> str | None: """Send a file/media attachment to *channel_id*. Parameters ---------- channel_id: Platform-specific channel / room identifier. data: Raw file bytes. filename: Suggested filename for the attachment. mimetype: MIME type of the file (used for determining how to present the attachment on platforms that distinguish images, audio, video, and generic files). Returns ------- str | None A platform-specific content URL for the uploaded file (``mxc://`` on Matrix, CDN URL on Discord), or ``None`` if the upload failed. """
# -- Interactive buttons (optional) --------------------------------
[docs] async def send_with_buttons( self, channel_id: str, text: str, view: Any = None, ) -> str: """Send a message with interactive buttons attached. Parameters ---------- channel_id: Platform-specific channel / room identifier. text: Message text content. view: Platform-specific UI view object (e.g. ``discord.ui.View``). Platforms that don't support interactive components should ignore this and send plain text. Returns the platform message ID, or ``""`` on failure. The default implementation falls back to :meth:`send`. """ return await self.send(channel_id, text)
# -- Message editing (optional) ------------------------------------
[docs] async def edit_message( self, channel_id: str, message_id: str, new_text: str, ) -> bool: """Edit an existing message sent by the bot. Parameters ---------- channel_id: Platform-specific channel / room identifier. message_id: Platform-specific ID of the message to edit. new_text: Replacement text content. Returns ``True`` on success, ``False`` if the platform does not support editing or the operation failed. The default implementation is a no-op that returns ``False``. """ return False
# -- Typing indicator (optional) -----------------------------------
[docs] async def start_typing(self, channel_id: str) -> None: """Begin showing a typing indicator in *channel_id*. Implementations should spawn a background task that periodically refreshes the indicator until :meth:`stop_typing` is called. The default implementation is a no-op. """
[docs] async def stop_typing(self, channel_id: str) -> None: """Stop showing the typing indicator in *channel_id*. Must be safe to call even if :meth:`start_typing` was never called for the given channel. The default is a no-op. """
# -- Channel history (optional) ------------------------------------
[docs] async def fetch_history( self, channel_id: str, limit: int = 100, ) -> list[HistoricalMessage]: """Fetch recent messages from the platform for *channel_id*. Returns up to *limit* messages in **chronological** order (oldest first). The default implementation returns an empty list; platform adapters should override when the underlying SDK supports history retrieval. """ return []
# -- Channel webhooks (optional) -----------------------------------
[docs] async def get_channel_webhooks( self, channel_id: str, ) -> list[dict[str, Any]]: """Return webhooks configured for *channel_id*. The default implementation returns an empty list. Platform adapters that support webhooks (e.g. Discord) should override. """ return []
# -- Server/channel listing (optional) -----------------------------
[docs] async def list_servers_and_channels(self) -> list[dict[str, Any]]: """Return all servers/guilds and their channels. Each platform adapter should override this to return a list of dicts describing servers/guilds (or rooms) the bot is active in, along with their channels. The format is platform-specific but should include at minimum ``server_name``, ``server_id``, and ``channels`` (a list of channel dicts). The default implementation returns an empty list. """ return []
# -- Guild members (optional) --------------------------------------
[docs] async def get_guild_members( self, guild_id: str, ) -> list[dict[str, str]]: """Return all members of *guild_id* with role information. The default implementation returns an empty list. Platform adapters with guild/server membership APIs (e.g. Discord) should override and implement caching. """ return []