Source code for platforms.base
"""Abstract base types for platform adapters.
Every chat platform (Matrix, Discord, Slack, ...) implements
:class:`PlatformAdapter`. Incoming events are normalised into
:class:`IncomingMessage` before being handed to the shared
:class:`~message_processor.MessageProcessor`.
"""
from __future__ import annotations
import abc
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Callable, Awaitable
# ------------------------------------------------------------------
# Unified message / attachment models
# ------------------------------------------------------------------
[docs]
@dataclass
class Attachment:
"""A downloaded media attachment, already in raw bytes."""
data: bytes
mimetype: str
filename: str
source_url: str = ""
"""Original URL the media was fetched from (MXC, CDN, …)."""
[docs]
@dataclass
class IncomingMessage:
"""Platform-agnostic representation of an incoming chat message.
Platform adapters construct one of these for every event and pass
it to the :class:`~message_processor.MessageProcessor`.
"""
platform: str
"""Short identifier for the originating platform.
For example: ``"matrix"``, ``"discord"``.
"""
channel_id: str
"""Platform-specific channel / room identifier."""
user_id: str
"""Platform-specific sender identifier."""
user_name: str
"""Human-readable display name of the sender."""
text: str
"""Plain-text body of the message (may be empty for media-only messages)."""
is_addressed: bool
"""Whether the bot was explicitly addressed (mention, DM, reply).
"""
attachments: list[Attachment] = field(default_factory=list)
"""Media attachments that have already been downloaded."""
channel_name: str = ""
"""Human-readable channel / room name (for prompt context)."""
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
"""UTC timestamp of the message."""
message_id: str = ""
"""Platform-specific message identifier."""
reply_to_id: str = ""
"""ID of the message being replied to, if any."""
extra: dict[str, Any] = field(default_factory=dict)
"""Arbitrary platform-specific metadata."""
reactions: str = ""
"""Serialized reaction summary, e.g. ``"👍×3, 🔥×1"``."""
[docs]
@dataclass
class HistoricalMessage:
"""Lightweight representation of a message fetched from platform history.
Used by :meth:`PlatformAdapter.fetch_history` to return recent
messages for backfilling conversation context after downtime.
"""
user_id: str
user_name: str
text: str
timestamp: datetime
message_id: str = ""
is_bot: bool = False
"""``True`` when this message was sent by the bot itself."""
reply_to_id: str = ""
"""ID of the message this one replies to, if any."""
reactions: str = ""
"""Serialized reaction summary, e.g. ``"👍×3, 🔥×1"``."""
# ------------------------------------------------------------------
# Adapter interface
# ------------------------------------------------------------------
# Type alias for the callback a platform uses to deliver messages
# to the MessageProcessor.
MessageHandler = Callable[
[IncomingMessage, "PlatformAdapter"], Awaitable[None],
]
MessageUpdateHandler = Callable[
[str, str, str, str, str, str, str, str], Awaitable[None],
]
MessageDeleteHandler = Callable[
[str, str, str, str], Awaitable[None],
]
[docs]
class PlatformAdapter(abc.ABC):
"""Interface that every platform must implement.
Subclasses wire up their SDK's event loop, convert native events to
:class:`IncomingMessage`, and forward them to the *message_handler*
callback supplied at construction time.
"""
[docs]
def __init__(self, message_handler: MessageHandler) -> None:
"""Initialize the instance.
Args:
message_handler (MessageHandler): The message handler value.
"""
self._message_handler = message_handler
self._message_update_handler: MessageUpdateHandler | None = None
self._message_delete_handler: MessageDeleteHandler | None = None
self._cancel_callback: Callable[..., Awaitable[str]] | None = None
# -- Metadata ------------------------------------------------------
@property
@abc.abstractmethod
def name(self) -> str:
"""Short lowercase identifier, e.g. ``"matrix"`` or ``"discord"``."""
@property
@abc.abstractmethod
def is_running(self) -> bool:
"""Return ``True`` while the platform event loop is active."""
@property
def bot_identity(self) -> dict[str, str]:
"""Return the bot's own identity on this platform.
Returns a dict with at minimum ``platform`` and ``user_id``.
Adapters should override to provide ``display_name`` and
``mention`` where available. The default returns an empty
``user_id`` (safe to call before login completes).
"""
return {"platform": self.name, "user_id": "", "display_name": ""}
# -- Lifecycle -----------------------------------------------------
[docs]
@abc.abstractmethod
async def start(self) -> None:
"""Connect to the service, authenticate, and begin listening."""
[docs]
@abc.abstractmethod
async def stop(self) -> None:
"""Gracefully disconnect and release resources."""
# -- Outbound messaging --------------------------------------------
[docs]
@abc.abstractmethod
async def send(self, channel_id: str, text: str) -> str:
"""Send a plain-text message to *channel_id*.
Returns the platform message ID of the sent message, or ``""``
if the send failed or the platform does not expose one.
"""
[docs]
@abc.abstractmethod
async def send_file(
self,
channel_id: str,
data: bytes,
filename: str,
mimetype: str = "application/octet-stream",
) -> str | None:
"""Send a file/media attachment to *channel_id*.
Parameters
----------
channel_id:
Platform-specific channel / room identifier.
data:
Raw file bytes.
filename:
Suggested filename for the attachment.
mimetype:
MIME type of the file (used for determining how to
present the attachment on platforms that distinguish
images, audio, video, and generic files).
Returns
-------
str | None
A platform-specific content URL for the uploaded file
(``mxc://`` on Matrix, CDN URL on Discord), or ``None``
if the upload failed.
"""
# -- Interactive buttons (optional) --------------------------------
[docs]
async def send_with_buttons(
self,
channel_id: str,
text: str,
view: Any = None,
) -> str:
"""Send a message with interactive buttons attached.
Parameters
----------
channel_id:
Platform-specific channel / room identifier.
text:
Message text content.
view:
Platform-specific UI view object (e.g. ``discord.ui.View``).
Platforms that don't support interactive components should
ignore this and send plain text.
Returns the platform message ID, or ``""`` on failure.
The default implementation falls back to :meth:`send`.
"""
return await self.send(channel_id, text)
# -- Message editing (optional) ------------------------------------
[docs]
async def edit_message(
self,
channel_id: str,
message_id: str,
new_text: str,
) -> bool:
"""Edit an existing message sent by the bot.
Parameters
----------
channel_id:
Platform-specific channel / room identifier.
message_id:
Platform-specific ID of the message to edit.
new_text:
Replacement text content.
Returns ``True`` on success, ``False`` if the platform does not
support editing or the operation failed. The default
implementation is a no-op that returns ``False``.
"""
return False
# -- Typing indicator (optional) -----------------------------------
[docs]
async def start_typing(self, channel_id: str) -> None:
"""Begin showing a typing indicator in *channel_id*.
Implementations should spawn a background task that periodically
refreshes the indicator until :meth:`stop_typing` is called.
The default implementation is a no-op.
"""
[docs]
async def stop_typing(self, channel_id: str) -> None:
"""Stop showing the typing indicator in *channel_id*.
Must be safe to call even if :meth:`start_typing` was never
called for the given channel. The default is a no-op.
"""
# -- Channel history (optional) ------------------------------------
[docs]
async def fetch_history(
self,
channel_id: str,
limit: int = 100,
) -> list[HistoricalMessage]:
"""Fetch recent messages from the platform for *channel_id*.
Returns up to *limit* messages in **chronological** order
(oldest first). The default implementation returns an empty
list; platform adapters should override when the underlying
SDK supports history retrieval.
"""
return []
# -- Channel webhooks (optional) -----------------------------------
[docs]
async def get_channel_webhooks(
self, channel_id: str,
) -> list[dict[str, Any]]:
"""Return webhooks configured for *channel_id*.
The default implementation returns an empty list. Platform
adapters that support webhooks (e.g. Discord) should override.
"""
return []
# -- Server/channel listing (optional) -----------------------------
[docs]
async def list_servers_and_channels(self) -> list[dict[str, Any]]:
"""Return all servers/guilds and their channels.
Each platform adapter should override this to return a list of
dicts describing servers/guilds (or rooms) the bot is active in,
along with their channels. The format is platform-specific but
should include at minimum ``server_name``, ``server_id``, and
``channels`` (a list of channel dicts).
The default implementation returns an empty list.
"""
return []
# -- Guild members (optional) --------------------------------------
[docs]
async def get_guild_members(
self, guild_id: str,
) -> list[dict[str, str]]:
"""Return all members of *guild_id* with role information.
The default implementation returns an empty list. Platform
adapters with guild/server membership APIs (e.g. Discord)
should override and implement caching.
"""
return []