Source code for platforms.base
"""Abstract base types for platform adapters.
Every chat platform (Matrix, Discord, Slack, ...) implements
:class:`PlatformAdapter`. Incoming events are normalised into
:class:`IncomingMessage` and forwarded to the *message_handler*
callback supplied at construction time. In the gateway service that
callback publishes the message onto the ``sg:stream:inbound`` Redis
stream (via :class:`~core.event_bus.RedisEventBus`); the inference
worker later reconstructs the :class:`IncomingMessage` and feeds it to
:class:`~message_processor.MessageProcessor`.
"""
from __future__ import annotations
import abc
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Callable, Awaitable
# ------------------------------------------------------------------
# Unified message / attachment models
# ------------------------------------------------------------------
[docs]
@dataclass
class Attachment:
"""A downloaded media attachment, already materialized as raw bytes.
Represents a single piece of inbound media (image, audio, video, or generic
file) that a platform adapter has already fetched off the wire so downstream
code never has to re-download it. Adapters populate one per attachment while
converting a native event into an :class:`IncomingMessage`, carrying the raw
``data`` together with its ``mimetype`` and ``filename`` so the inference
worker's media pipeline can route it correctly. The ``source_url`` preserves
the original location (a Matrix ``mxc://`` URI, a Discord CDN link, etc.) for
re-fetching, caching, or display.
Constructed by the platform adapters in ``platforms/discord.py``,
``platforms/matrix.py``, ``platforms/discord_self.py``, and
``platforms/emoji_resolver.py``, and reconstructed from the inbound Redis
envelope in ``inference_main.py`` and ``message_queue.py``. Holds bytes in
memory only; it performs no I/O of its own.
"""
data: bytes
mimetype: str
filename: str
source_url: str = ""
"""Original URL the media was fetched from (MXC, CDN, …)."""
[docs]
@dataclass
class IncomingMessage:
"""Platform-agnostic representation of an incoming chat message.
Platform adapters construct one of these for every event and pass
it to the *message_handler* callback (which, in the gateway service,
publishes it onto the ``sg:stream:inbound`` Redis stream for the
inference worker's :class:`~message_processor.MessageProcessor`).
"""
platform: str
"""Short identifier for the originating platform.
For example: ``"matrix"``, ``"discord"``.
"""
channel_id: str
"""Platform-specific channel / room identifier."""
user_id: str
"""Platform-specific sender identifier."""
user_name: str
"""Human-readable display name of the sender."""
text: str
"""Plain-text body of the message (may be empty for media-only messages)."""
is_addressed: bool
"""Whether the bot was explicitly addressed (mention, DM, reply).
"""
attachments: list[Attachment] = field(default_factory=list)
"""Media attachments that have already been downloaded."""
channel_name: str = ""
"""Human-readable channel / room name (for prompt context)."""
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
"""UTC timestamp of the message."""
message_id: str = ""
"""Platform-specific message identifier."""
reply_to_id: str = ""
"""ID of the message being replied to, if any."""
extra: dict[str, Any] = field(default_factory=dict)
"""Arbitrary platform-specific metadata."""
reactions: str = ""
"""Serialized reaction summary, e.g. ``"👍×3, 🔥×1"``."""
unified_user_id: str | None = None
"""Resolved sg_uuid, OR None if unlinked."""
user_aliases: list[str] = field(default_factory=list)
"""Complete active alias set."""
[docs]
def __post_init__(self) -> None:
"""Seed a default alias set when none was supplied.
Dataclass post-initialization hook that runs automatically after
:class:`IncomingMessage` field assignment. When ``user_aliases`` is
left empty by the constructing platform adapter, it backfills a single
canonical ``"{platform}:{user_id}"`` alias so that downstream identity
resolution (the inference worker reconstructing this message from the
``sg:stream:inbound`` Redis stream and looking up a ``unified_user_id``)
always has at least one stable handle to key on. Mutates only
``self.user_aliases`` and performs no I/O.
It is invoked implicitly by the dataclass machinery on every
:class:`IncomingMessage` instantiation, so it has no explicit internal
callers.
"""
if not self.user_aliases:
self.user_aliases = [f"{self.platform}:{self.user_id}"]
[docs]
@dataclass
class HistoricalMessage:
"""Lightweight representation of a message fetched from platform history.
Used by :meth:`PlatformAdapter.fetch_history` to return recent
messages for backfilling conversation context after downtime.
"""
user_id: str
user_name: str
text: str
timestamp: datetime
message_id: str = ""
is_bot: bool = False
"""``True`` when this message was sent by the bot itself."""
reply_to_id: str = ""
"""ID of the message this one replies to, if any."""
reactions: str = ""
"""Serialized reaction summary, e.g. ``"👍×3, 🔥×1"``."""
# ------------------------------------------------------------------
# Adapter interface
# ------------------------------------------------------------------
# Type alias for the callback a platform uses to deliver an
# IncomingMessage to its host service (the gateway publishes it onto
# the sg:stream:inbound Redis stream).
MessageHandler = Callable[
[IncomingMessage, "PlatformAdapter"],
Awaitable[None],
]
MessageUpdateHandler = Callable[
[str, str, str, str, str, str, str, str],
Awaitable[None],
]
MessageDeleteHandler = Callable[
[str, str, str, str],
Awaitable[None],
]
[docs]
class PlatformAdapter(abc.ABC):
"""Interface that every platform must implement.
Subclasses wire up their SDK's event loop, convert native events to
:class:`IncomingMessage`, and forward them to the *message_handler*
callback supplied at construction time.
"""
[docs]
def __init__(self, message_handler: MessageHandler) -> None:
"""Store the inbound callback and initialize optional-handler slots.
Base constructor every concrete adapter chains up to via ``super()``.
It captures the *message_handler* that the host service supplies so each
adapter has exactly one place to deliver a normalized
:class:`IncomingMessage` once a native event arrives. In the gateway
service that handler publishes the message onto the ``sg:stream:inbound``
Redis stream (via :class:`~core.event_bus.RedisEventBus`) for the
inference worker to pick up; in tests it may be a plain coroutine that
collects messages. The remaining attributes are nulled out here and only
wired up later by the host (edit/delete handlers and the
cancel/revoke/purge/toggle button callbacks), so an adapter that never
receives them simply has no interactive affordances.
Invoked indirectly whenever a concrete adapter is instantiated — e.g.
when ``gateway_main.py`` builds its platform adapters at startup.
Args:
message_handler: Coroutine the adapter awaits for every inbound
message; receives the :class:`IncomingMessage` and the adapter
instance itself so the host can reply on the right platform.
"""
self._message_handler = message_handler
self._message_update_handler: MessageUpdateHandler | None = None
self._message_delete_handler: MessageDeleteHandler | None = None
self._cancel_callback: Callable[..., Awaitable[str]] | None = None
self._revoke_callback: Callable[..., Awaitable[str]] | None = None
self._purge_callback: Callable[..., Awaitable[str]] | None = None
self._toggle_callback: Callable[..., Awaitable[str]] | None = None
# -- Metadata ------------------------------------------------------
@property
@abc.abstractmethod
def name(self) -> str:
"""Return the short lowercase platform identifier for this adapter.
Abstract read-only property each concrete adapter must implement to
report a stable slug such as ``"matrix"``, ``"discord"``, or
``"webchat"``. This string is the canonical key the rest of the system
uses to identify the platform: it becomes the ``platform`` field stamped
onto outbound envelopes in ``gateway_main.py``, the key for the adapter
registry in ``background_tasks.py``, the value rendered in the web admin
status payload in ``web/bot_admin.py``, and the running-status entries in
``web/platforms_api.py``. Must match the slug used on
:class:`IncomingMessage` so routing stays consistent.
Returns:
The lowercase platform slug.
"""
@property
@abc.abstractmethod
def is_running(self) -> bool:
"""Report whether the adapter's connection / event loop is live.
Abstract read-only property every adapter implements to expose its
connection state, typically tracking a flag toggled in :meth:`start` and
:meth:`stop`. Callers treat it as the readiness signal for the platform:
``web/deps.py`` and ``background_tasks.py`` gate work on it before
touching an adapter, ``web/platforms_api.py`` and ``web/bot_admin.py``
surface it as the per-platform running indicator, and ``web/bot_admin.py``
also uses it to decide whether the underlying client is safe to query.
Should not perform I/O — a cheap state read.
Returns:
``True`` while the platform event loop is connected and listening,
``False`` before startup or after shutdown.
"""
@property
def bot_identity(self) -> dict[str, str]:
"""Return the bot's own identity on this platform.
Returns a dict with at minimum ``platform`` and ``user_id``.
Adapters should override to provide ``display_name`` and
``mention`` where available. The default returns an empty
``user_id`` (safe to call before login completes).
"""
return {"platform": self.name, "user_id": "", "display_name": ""}
# -- Lifecycle -----------------------------------------------------
[docs]
@abc.abstractmethod
async def start(self) -> None:
"""Connect to the platform, authenticate, and begin consuming events.
Abstract lifecycle entry point each adapter implements to spin up its
SDK's event loop, log in, and start dispatching native events through the
stored *message_handler*. Implementations are expected to flip the state
backing :attr:`is_running` to ``True`` once listening, and may open
network sockets, background tasks, and platform sessions as side effects.
Awaited once per adapter during gateway boot — ``GatewayService`` calls
``adapter.start()`` for every created adapter in ``gateway_main.py``
(deliberately after the outbound consumer is up, per
``core/outbound_consumer.py``), and tests invoke it directly.
"""
[docs]
@abc.abstractmethod
async def stop(self) -> None:
"""Gracefully disconnect from the platform and release its resources.
Abstract counterpart to :meth:`start` that every adapter implements to
tear down its connection: cancel background tasks, close the SDK client
and sockets, and flip the state backing :attr:`is_running` to ``False``.
Should be idempotent and safe to call even if startup never fully
completed.
Awaited on shutdown by ``GatewayService.on_stop()`` in ``gateway_main.py``
and when an operator stops a platform through ``web/platforms_api.py``
(note callers use ``stop()`` rather than a raw ``close()``).
"""
# -- Outbound messaging --------------------------------------------
[docs]
@abc.abstractmethod
async def send(self, channel_id: str, text: str) -> str:
"""Send a plain-text message to *channel_id*.
Returns the platform message ID of the sent message, or ``""``
if the send failed or the platform does not expose one.
"""
[docs]
@abc.abstractmethod
async def send_file(
self,
channel_id: str,
data: bytes,
filename: str,
mimetype: str = "application/octet-stream",
) -> str | None:
"""Send a file/media attachment to *channel_id*.
Parameters
----------
channel_id:
Platform-specific channel / room identifier.
data:
Raw file bytes.
filename:
Suggested filename for the attachment.
mimetype:
MIME type of the file (used for determining how to
present the attachment on platforms that distinguish
images, audio, video, and generic files).
Returns
-------
str | None
A platform-specific content URL for the uploaded file
(``mxc://`` on Matrix, CDN URL on Discord), or ``None``
if the upload failed.
"""
# -- Interactive buttons (optional) --------------------------------
[docs]
async def send_with_buttons(
self,
channel_id: str,
text: str,
view: Any = None,
) -> str:
"""Send a message with interactive buttons attached.
Parameters
----------
channel_id:
Platform-specific channel / room identifier.
text:
Message text content.
view:
Platform-specific UI view object (e.g. ``discord.ui.View``).
Platforms that don't support interactive components should
ignore this and send plain text.
Returns
-------
str
The platform message ID, or ``""`` on failure. The default
implementation falls back to :meth:`send`.
"""
return await self.send(channel_id, text)
# -- Message editing (optional) ------------------------------------
[docs]
async def edit_message(
self,
channel_id: str,
message_id: str,
new_text: str,
) -> bool:
"""Edit an existing message sent by the bot.
Parameters
----------
channel_id:
Platform-specific channel / room identifier.
message_id:
Platform-specific ID of the message to edit.
new_text:
Replacement text content.
Returns ``True`` on success, ``False`` if the platform does not
support editing or the operation failed. The default
implementation is a no-op that returns ``False``.
"""
return False
# -- Typing indicator (optional) -----------------------------------
[docs]
async def start_typing(self, channel_id: str) -> None:
"""Begin showing a typing indicator in *channel_id*.
Implementations should spawn a background task that periodically
refreshes the indicator until :meth:`stop_typing` is called.
The default implementation is a no-op.
"""
[docs]
async def stop_typing(self, channel_id: str) -> None:
"""Stop showing the typing indicator in *channel_id*.
Must be safe to call even if :meth:`start_typing` was never
called for the given channel. The default is a no-op.
"""
# -- Presence / status (optional) ----------------------------------
[docs]
async def set_presence(self, text: str, emoji: str | None = None) -> None:
"""Set the bot's platform presence/status to *text*.
Global (non-channel) action driven by :class:`StatusManager`.
The default implementation is a no-op; platform adapters that
support presence (e.g. Discord) should override it.
"""
# -- Channel history (optional) ------------------------------------
[docs]
async def fetch_history(
self,
channel_id: str,
limit: int = 100,
) -> list[HistoricalMessage]:
"""Fetch recent messages from the platform for *channel_id*.
Returns up to *limit* messages in **chronological** order
(oldest first). The default implementation returns an empty
list; platform adapters should override when the underlying
SDK supports history retrieval.
"""
return []
# -- Channel webhooks (optional) -----------------------------------
[docs]
async def get_channel_webhooks(
self,
channel_id: str,
) -> list[dict[str, Any]]:
"""Return webhooks configured for *channel_id*.
The default implementation returns an empty list. Platform
adapters that support webhooks (e.g. Discord) should override.
"""
return []
[docs]
async def should_skip_channel_heartbeat(self, channel_id: str) -> bool:
"""Return True to skip background channel heartbeat (no LLM) for *channel_id*.
Used by :func:`~message_processor.channel_heartbeat.execute_channel_heartbeat`
before building context. Override on platforms where DMs or tiny Matrix
rooms should not receive periodic nudges.
"""
return False
# -- Server/channel listing (optional) -----------------------------
[docs]
async def list_servers_and_channels(self) -> list[dict[str, Any]]:
"""Return all servers/guilds and their channels.
Each platform adapter should override this to return a list of
dicts describing servers/guilds (or rooms) the bot is active in,
along with their channels. The format is platform-specific but
should include at minimum ``server_name``, ``server_id``, and
``channels`` (a list of channel dicts).
The default implementation returns an empty list.
"""
return []
# -- Guild members (optional) --------------------------------------
[docs]
async def get_guild_members(
self,
guild_id: str,
) -> list[dict[str, str]]:
"""Return all members of *guild_id* with role information.
The default implementation returns an empty list. Platform
adapters with guild/server membership APIs (e.g. Discord)
should override and implement caching.
"""
return []
# -- Channel validation (optional) ---------------------------------
[docs]
async def is_channel_valid(self, channel_id: str) -> bool:
"""Return True if the channel/room is valid and active on the platform.
If a channel has been deleted or is no longer accessible to the bot,
this method should return False. The default implementation returns True.
"""
return True
# -- Reactions (optional) ------------------------------------------
[docs]
async def add_reaction(
self,
channel_id: str,
message_id: str,
emoji: str,
) -> None:
"""Add an emoji reaction to a message.
Counterpart to :class:`core.proxy_adapter.ProxyPlatformAdapter.add_reaction`,
which publishes a ``type: "reaction"`` outbound envelope that the gateway
replays by calling this method on the live adapter. The default is a
no-op; platforms that support reactions (Discord, Matrix) override it.
"""
# -- Slash / app command sync (optional) ---------------------------
[docs]
async def sync_command_tree(self) -> int:
"""Sync the platform's slash/app command tree, returning the count synced.
Used by the gateway-handled ``!sync_tree`` admin command. The default is
a no-op returning ``0`` (selfbots / Matrix have no slash-command tree).
"""
return 0