Source code for platforms.matrix

"""Matrix platform adapter using matrix-nio.

Wraps the matrix-nio ``AsyncClient`` and converts Matrix events into
:class:`~platforms.base.IncomingMessage` instances for the shared
:class:`~message_processor.MessageProcessor`.
"""

from __future__ import annotations

import asyncio
import base64
import io
import json as _stdlib_json
import jsonutil as json
import logging
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Union

import markdown as md
import olm

import aiohttp
import aiofiles
from nio import (
    AsyncClient,
    AsyncClientConfig,
    InviteMemberEvent,
    JoinError,
    LoginResponse,
    MatrixRoom,
    MegolmEvent,
    RoomMessageText,
    RoomSendResponse,
)
from nio.api import Api as NioApi
from nio.crypto import Sas
from nio.crypto.attachments import decrypt_attachment
from nio.crypto.device import OlmDevice
from nio.events.room_events import (
    RoomEncryptedAudio,
    RoomEncryptedFile,
    RoomEncryptedImage,
    RoomEncryptedMedia,
    RoomEncryptedVideo,
    RoomMessageAudio,
    RoomMessageFile,
    RoomMessageImage,
    RoomMessageMedia,
    RoomMessageUnknown,
    RoomMessageVideo,
    UnknownEvent,
)
from nio.events.to_device import (
    KeyVerificationCancel,
    KeyVerificationKey,
    KeyVerificationMac,
    KeyVerificationStart,
    UnknownToDeviceEvent,
)
from nio import ToDeviceMessage
from nio.responses import (
    DownloadError,
    ProfileGetDisplayNameError,
    ProfileGetDisplayNameResponse,
    RoomMessagesResponse,
)

from media_cache import MediaCache
from config import Config
from platforms.base import (
    Attachment,
    HistoricalMessage,
    IncomingMessage,
    MessageHandler,
    PlatformAdapter,
)
from platforms.media_common import download_with_retry

logger = logging.getLogger(__name__)

# Union of all media event types handled by on_media
MediaEvent = Union[
    RoomMessageImage,
    RoomEncryptedImage,
    RoomMessageAudio,
    RoomEncryptedAudio,
    RoomMessageVideo,
    RoomEncryptedVideo,
    RoomMessageFile,
    RoomEncryptedFile,
]


def _get_reply_to_id(event: Any) -> str:
    """Extract the reply-to event ID from a Matrix event, or empty string.

    Digs through the raw event ``source`` dict for the
    ``m.relates_to`` / ``m.in_reply_to`` envelope that Matrix uses to thread
    replies, so the rest of the adapter can populate
    :attr:`IncomingMessage.reply_to_id` and
    :attr:`HistoricalMessage.reply_to_id` without reaching into nio internals
    at every call site. Pure read-only dict traversal with no side effects.

    Called by :meth:`MatrixPlatform.fetch_history`, :meth:`MatrixPlatform._on_message`,
    and :meth:`MatrixPlatform._on_media` within this module; it has no external
    callers.

    Args:
        event (Any): A matrix-nio room event (or anything exposing a
            ``source`` mapping).

    Returns:
        str: The referenced event ID, or an empty string when the message is
        not a reply.
    """
    source = getattr(event, "source", None) or {}
    content = source.get("content", {})
    relates = content.get("m.relates_to", {})
    reply_to = relates.get("m.in_reply_to", {})
    return reply_to.get("event_id", "")


_STRIP_HTML_RE = re.compile(r"<[^>]+>")
_MD_EXTENSIONS = ["fenced_code", "tables", "nl2br"]

# Matches Matrix user IDs like @localpart:server or @localpart:server:port
_MATRIX_MENTION_RE = re.compile(r"@[\w.=\-/+]+:[\w.\-]+(:\d+)?")
# Matches any HTML tag (used to skip mention replacement inside tags)
_HTML_TAG_RE = re.compile(r"<[^>]+>")


def _markdown_to_html(text: str) -> str:
    """Convert Markdown text to HTML for the Matrix ``formatted_body``.

    Matrix renders rich messages from an HTML ``formatted_body`` alongside the
    plain ``body``, so outbound text is run through the Python-Markdown library
    (with the fenced-code, tables, and ``nl2br`` extensions enabled via
    ``_MD_EXTENSIONS``) before it is sent. Pure transformation with no I/O or
    side effects.

    Called by :meth:`MatrixPlatform.send` within this module to build the
    formatted body for ``room_send``; it has no external callers.

    Args:
        text (str): The Markdown source text.

    Returns:
        str: The rendered HTML fragment.
    """
    return md.markdown(text, extensions=_MD_EXTENSIONS)


def _strip_html(text: str) -> str:
    """Strip HTML tags to produce the plain-text ``body`` fallback.

    Matrix messages carry both a rich ``formatted_body`` and a plain ``body``;
    this removes every ``<tag>`` (via the module-level ``_STRIP_HTML_RE``
    pattern) so clients without HTML rendering still show readable text. Pure
    transformation with no I/O or side effects.

    Called by :meth:`MatrixPlatform.send` within this module to derive the
    plain ``body`` from the rendered HTML; it has no external callers.

    Args:
        text (str): The HTML (or Markdown) source whose tags should be removed.

    Returns:
        str: The text with all HTML tags stripped out.
    """
    return _STRIP_HTML_RE.sub("", text)


def _linkify_matrix_mentions(html: str) -> tuple[str, list[str]]:
    """Replace bare @user:server mentions in HTML with matrix.to anchor links.

    Operates only on text nodes (skips content inside HTML tags) so existing
    links or attributes are never double-processed.

    Returns ``(new_html, unique_user_ids)`` where ``unique_user_ids`` is the
    ordered list of Matrix user IDs found, suitable for ``m.mentions.user_ids``.
    """
    mentions: list[str] = []

    def _replace(m: re.Match) -> str:
        """Turn one matched ``@user:server`` mention into an anchor link.

        Substitution callback used by ``_MATRIX_MENTION_RE.sub`` inside the
        enclosing :func:`_linkify_matrix_mentions`. Records the matched
        Matrix user ID into the enclosing ``mentions`` list (so the caller
        can later dedupe it for ``m.mentions.user_ids``) and returns the user
        ID wrapped in a ``matrix.to`` HTML anchor.

        This nested function is defined and used only within
        :func:`_linkify_matrix_mentions`; it has no other internal callers.

        Args:
            m (re.Match): A regex match whose group 0 is the bare mention.

        Returns:
            str: The mention rewritten as an ``<a href="...">`` anchor.
        """
        uid = m.group(0)
        mentions.append(uid)
        return f'<a href="https://matrix.to/#/{uid}">{uid}</a>'

    parts: list[str] = []
    pos = 0
    for tag_match in _HTML_TAG_RE.finditer(html):
        # Linkify text between tags, keep the tag itself verbatim
        parts.append(_MATRIX_MENTION_RE.sub(_replace, html[pos : tag_match.start()]))
        parts.append(tag_match.group(0))
        pos = tag_match.end()
    parts.append(_MATRIX_MENTION_RE.sub(_replace, html[pos:]))

    seen: set[str] = set()
    unique = [u for u in mentions if not (u in seen or seen.add(u))]  # type: ignore[func-returns-value]
    return "".join(parts), unique


# ------------------------------------------------------------------
# Matrix-specific media download
# ------------------------------------------------------------------


[docs] async def download_matrix_media( client: AsyncClient, event: RoomMessageMedia | RoomEncryptedMedia, ) -> tuple[bytes, str, str]: """Download (and optionally decrypt) a media attachment from Matrix. Returns ------- tuple of (data, mimetype, filename) """ mxc_url: str = event.url filename: str = event.body or "attachment" resp = await client.download(mxc=mxc_url) if isinstance(resp, DownloadError): raise RuntimeError(f"Failed to download {mxc_url}: {resp.message}") data: bytes = resp.body # type: ignore[assignment] # Determine MIME type if isinstance(event, RoomEncryptedMedia): mimetype = event.mimetype or resp.content_type or "application/octet-stream" else: info = event.source.get("content", {}).get("info", {}) mimetype = ( info.get("mimetype") or resp.content_type or "application/octet-stream" ) # Decrypt if necessary if isinstance(event, RoomEncryptedMedia): data = decrypt_attachment( ciphertext=data, key=event.key["k"], hash=event.hashes["sha256"], iv=event.iv, ) return data, mimetype, filename
# ------------------------------------------------------------------ # Credential helpers # ------------------------------------------------------------------
[docs] async def save_matrix_credentials( credentials_file: str, homeserver: str, client: AsyncClient, ) -> None: """Persist Matrix login credentials, preserving extra keys like seeds. Writes the homeserver URL plus the live ``user_id``, ``device_id``, and ``access_token`` from a logged-in nio ``AsyncClient`` to the JSON credentials file so the session can be restored on the next start without re-authenticating. Reads any existing file first (via aiofiles) and merges on top of it, so unrelated keys such as the cross-signing seeds written by :func:`setup_cross_signing` survive. Touches the filesystem and logs an info line on success. Called by :meth:`MatrixPlatform.start` within this module right after a fresh password login succeeds; it has no external callers. Args: credentials_file (str): Path to the JSON credentials file to write. homeserver (str): The Matrix homeserver URL to record. client (AsyncClient): The logged-in nio client supplying the identity and access token. """ existing: dict[str, Any] = {} cred_path = Path(credentials_file) if cred_path.exists(): try: async with aiofiles.open(credentials_file, "r") as f: existing = json.loads(await f.read()) except (json.JSONDecodeError, OSError): pass existing.update( { "homeserver": homeserver, "user_id": client.user_id, "device_id": client.device_id, "access_token": client.access_token, } ) async with aiofiles.open(credentials_file, "w") as f: await f.write(json.dumps(existing, indent=2)) logger.info("Matrix credentials saved to %s", credentials_file)
[docs] async def load_matrix_credentials(credentials_file: str) -> dict | None: """Load previously saved Matrix credentials, or return ``None``. Reads and parses the JSON credentials file (via aiofiles) that :func:`save_matrix_credentials` writes, so a restart can call ``client.restore_login`` instead of logging in with a password. Returns ``None`` when the file is absent or contains invalid JSON, signaling the caller to fall back to a fresh login. Read-only filesystem access with no other side effects. Called by :meth:`MatrixPlatform.start` within this module to recover a saved session; it has no external callers. Args: credentials_file (str): Path to the JSON credentials file to read. Returns: dict | None: The decoded credentials mapping, or ``None`` if the file is missing or unparseable. """ cred_path = Path(credentials_file) if not cred_path.exists(): return None async with aiofiles.open(credentials_file, "r") as f: content = await f.read() try: return json.loads(content) except json.JSONDecodeError: return None
# ------------------------------------------------------------------ # Auto-trust helper # ------------------------------------------------------------------
[docs] def trust_all_devices(client: AsyncClient) -> None: """Mark every known device of every tracked user as trusted. Walks the nio client's ``device_store`` and calls ``verify_device`` on any device the Olm layer does not already consider verified, so the bot will encrypt to and accept messages from all participants without manual verification. This is a deliberate trust-on-first-use policy for an unattended bot; it mutates the client's in-memory device-trust state and logs each newly trusted device at debug level. Called by :meth:`MatrixPlatform.start` (after the initial sync) and :meth:`MatrixPlatform._sync_loop` (after every successful sync) within this module; it has no external callers. Args: client (AsyncClient): The connected nio client whose device store is scanned and whose devices are verified in place. """ for user_id, devices in client.device_store.items(): for device_id, olm_device in devices.items(): if not client.olm.is_device_verified(olm_device): client.verify_device(olm_device) logger.debug( "Auto-trusted device %s of %s", device_id, user_id, )
# ------------------------------------------------------------------ # Cross-signing setup # ------------------------------------------------------------------ def _sign_json(signing_key: olm.PkSigning, obj: dict) -> str: """Canonical-JSON-sign ``obj`` with ``signing_key`` and return the signature. Serializes the object to Matrix canonical JSON (sorted keys, no whitespace, via the stdlib ``json`` module aliased here as ``_stdlib_json`` so the project ``jsonutil`` shim is bypassed for byte-exact output) and signs it with the given Olm ``PkSigning`` key. Used to build the cross-signing key hierarchy and the self-signature on the bot's own device. Pure computation with no I/O or side effects. Called by :func:`setup_cross_signing` (to sign the self-signing and user-signing keys with the master key) and :func:`_sign_own_device` (to sign the device key with the self-signing key) within this module; it has no external callers. Args: signing_key (olm.PkSigning): The Olm signing key to sign with. obj (dict): The object to canonicalize and sign. Returns: str: The base64 Ed25519 signature over the canonical JSON of ``obj``. """ canonical = _stdlib_json.dumps( obj, ensure_ascii=False, sort_keys=True, separators=(",", ":") ) return signing_key.sign(canonical)
[docs] async def setup_cross_signing( client: AsyncClient, password: str, credentials_file: str, saved_seeds: dict[str, str] | None = None, ) -> None: """Generate cross-signing keys, upload them, and self-sign the device. If *saved_seeds* is provided the keys are re-derived from persisted seeds instead of generating new ones. Seeds are persisted to *credentials_file* for future restarts. """ user_id = client.user_id device_id = client.device_id assert user_id and device_id # --- Derive or generate keys ---------------------------------------- if saved_seeds: master_seed = base64.b64decode(saved_seeds["master"]) self_signing_seed = base64.b64decode(saved_seeds["self_signing"]) user_signing_seed = base64.b64decode(saved_seeds["user_signing"]) else: master_seed = olm.PkSigning.generate_seed() self_signing_seed = olm.PkSigning.generate_seed() user_signing_seed = olm.PkSigning.generate_seed() master_key = olm.PkSigning(master_seed) self_signing_key = olm.PkSigning(self_signing_seed) user_signing_key = olm.PkSigning(user_signing_seed) # --- Check if master key is already published ----------------------- query_path = f"/_matrix/client/v3/keys/query" f"?access_token={client.access_token}" query_body = json.dumps({"device_keys": {user_id: []}}) try: resp = await client.send("POST", query_path, query_body) if resp.status == 200: data = json.loads(await resp.read()) existing_master = ( data.get("master_keys", {}).get(user_id, {}).get("keys", {}) ) if f"ed25519:{master_key.public_key}" in existing_master: logger.info( "Cross-signing master key already published — skipping upload", ) await _sign_own_device( client, self_signing_key, user_id, device_id, ) return except Exception: logger.debug("Could not query existing cross-signing keys", exc_info=True) if not password: logger.warning( "Cross-signing setup requires a password for UIA — skipping. " "Set up will be attempted on next fresh login.", ) return # --- Build the key upload payload ----------------------------------- master_key_obj = { "user_id": user_id, "usage": ["master"], "keys": {f"ed25519:{master_key.public_key}": master_key.public_key}, } self_signing_key_obj = { "user_id": user_id, "usage": ["self_signing"], "keys": { f"ed25519:{self_signing_key.public_key}": self_signing_key.public_key, }, } ss_sig = _sign_json(master_key, self_signing_key_obj) self_signing_key_obj["signatures"] = { user_id: {f"ed25519:{master_key.public_key}": ss_sig}, } user_signing_key_obj = { "user_id": user_id, "usage": ["user_signing"], "keys": { f"ed25519:{user_signing_key.public_key}": user_signing_key.public_key, }, } us_sig = _sign_json(master_key, user_signing_key_obj) user_signing_key_obj["signatures"] = { user_id: {f"ed25519:{master_key.public_key}": us_sig}, } upload_body: dict[str, Any] = { "master_key": master_key_obj, "self_signing_key": self_signing_key_obj, "user_signing_key": user_signing_key_obj, } # --- Upload with UIA (password auth) -------------------------------- upload_path = ( f"/_matrix/client/v3/keys/device_signing/upload" f"?access_token={client.access_token}" ) # First request to get the session for UIA resp = await client.send( "POST", upload_path, json.dumps(upload_body), ) if resp.status == 401: uia_data = json.loads(await resp.read()) session = uia_data.get("session", "") upload_body["auth"] = { "type": "m.login.password", "user": user_id, "password": password, "session": session, } resp = await client.send( "POST", upload_path, json.dumps(upload_body), ) if resp.status != 200: body = await resp.read() logger.error( "Cross-signing key upload failed (HTTP %d): %s", resp.status, body.decode(errors="replace"), ) return logger.info("Cross-signing keys uploaded successfully") # --- Sign own device ------------------------------------------------ await _sign_own_device(client, self_signing_key, user_id, device_id) # --- Persist seeds -------------------------------------------------- seeds = { "master": base64.b64encode(master_seed).decode(), "self_signing": base64.b64encode(self_signing_seed).decode(), "user_signing": base64.b64encode(user_signing_seed).decode(), } try: creds: dict[str, Any] = {} cred_path = Path(credentials_file) if cred_path.exists(): async with aiofiles.open(credentials_file, "r") as f: creds = json.loads(await f.read()) creds["cross_signing_seeds"] = seeds async with aiofiles.open(credentials_file, "w") as f: await f.write(json.dumps(creds, indent=2)) logger.info("Cross-signing seeds persisted to %s", credentials_file) except Exception: logger.warning("Failed to persist cross-signing seeds", exc_info=True)
async def _sign_own_device( client: AsyncClient, self_signing_key: olm.PkSigning, user_id: str, device_id: str, ) -> None: """Sign the bot's own device key with the cross-signing self-signing key. Builds the device-key object from the Olm account's identity keys, signs it with :func:`_sign_json`, and uploads the signature to the homeserver's ``/keys/signatures/upload`` endpoint (an HTTP POST via ``client.send``). This is what lets other clients see the bot's device as cross-signed rather than warning that it was "encrypted by a device not verified by its owner". Performs network I/O and logs the upload outcome; swallows and logs any exception rather than raising. Called by :func:`setup_cross_signing` within this module, both when the master key is already published and after a fresh key upload; it has no external callers. Args: client (AsyncClient): The connected nio client used for the HTTP upload. self_signing_key (olm.PkSigning): The self-signing key that signs the device. user_id (str): The bot's Matrix user ID. device_id (str): The bot's device ID being signed. """ device_key_id = f"ed25519:{device_id}" device_keys = client.olm.account.identity_keys ed25519_key = device_keys["ed25519"] curve25519_key = device_keys["curve25519"] device_key_obj = { "algorithms": [ "m.olm.v1.curve25519-aes-sha2-256", "m.megolm.v1.aes-sha2", ], "device_id": device_id, "keys": { f"curve25519:{device_id}": curve25519_key, f"ed25519:{device_id}": ed25519_key, }, "user_id": user_id, } dev_sig = _sign_json(self_signing_key, device_key_obj) sig_upload = { user_id: { device_id: { f"ed25519:{self_signing_key.public_key}": dev_sig, }, }, } sig_path = ( f"/_matrix/client/v3/keys/signatures/upload" f"?access_token={client.access_token}" ) try: resp = await client.send( "POST", sig_path, json.dumps(sig_upload), ) if resp.status == 200: logger.info( "Device %s self-signed with cross-signing key", device_id, ) else: body = await resp.read() logger.warning( "Device signature upload failed (HTTP %d): %s", resp.status, body.decode(errors="replace"), ) except Exception: logger.warning("Failed to upload device signature", exc_info=True) # ------------------------------------------------------------------ # MatrixPlatform adapter # ------------------------------------------------------------------
[docs] class MatrixPlatform(PlatformAdapter): """Platform adapter for Matrix via matrix-nio. Parameters ---------- message_handler: Async callback that receives :class:`IncomingMessage` instances. homeserver: Matrix homeserver URL. user_id: Matrix user ID for the bot. password: Password (only needed for first login). store_path: Path to the nio E2EE key store. credentials_file: Path to the JSON file for persisting login credentials. """
[docs] def __init__( self, message_handler: MessageHandler, *, homeserver: str, user_id: str, password: str = "", store_path: str = "nio_store", credentials_file: str = "matrix_credentials.json", media_cache: MediaCache | None = None, config: Config | None = None, ) -> None: """Store configuration and initialize per-room bookkeeping state. Records the homeserver, identity, and E2EE store/credentials paths but does no network I/O here; the nio ``AsyncClient`` is created and connected later in :meth:`start`. Also forwards ``message_handler`` to :class:`PlatformAdapter` and sets up the in-memory maps the adapter relies on while running: ``_sent_events`` and ``_egregore_sent_events`` (so replies to the bot's own or ghost messages count as addressed in :meth:`_is_addressed`), ``_typing_tasks`` (background typing loops), and ``_in_room_sas`` (active in-room SAS verifications). No external side effects beyond constructing this object. Constructed by the platform factory / gateway wiring that instantiates each adapter from its platform config. Args: message_handler (MessageHandler): Async callback invoked with each decoded :class:`IncomingMessage` and this adapter. homeserver (str): Matrix homeserver URL to connect to. user_id (str): The bot's Matrix user ID. password (str): Login password, only needed for the first login. store_path (str): Directory for the nio E2EE key store. credentials_file (str): Path to the JSON file persisting login credentials and cross-signing seeds. media_cache (MediaCache | None): Optional shared cache used to avoid re-downloading the same media. config (Config | None): Optional runtime config controlling emoji resolution and media-download retries. """ super().__init__(message_handler) self._homeserver = homeserver self._user_id = user_id self._password = password self._store_path = store_path self._credentials_file = credentials_file self._media_cache = media_cache self._config = config self.client: AsyncClient | None = None self.credentials: dict | None = None self._sync_task: asyncio.Task | None = None self._stop_event = asyncio.Event() # channel_id -> set of event IDs the bot has sent self._sent_events: dict[str, set[str]] = {} # channel_id -> event IDs from egregore ghosts (AS) for reply addressing self._egregore_sent_events: dict[str, set[str]] = {} self._typing_tasks: dict[str, asyncio.Task[None]] = {} # request_event_id -> (Sas, OlmDevice, room_id) for in-room SAS verifications self._in_room_sas: dict[str, tuple[Sas, OlmDevice, str]] = {} # Profile display name for :meth:`bot_identity` (filled after login/sync). self._profile_display_name: str = ""
# -- PlatformAdapter metadata -------------------------------------- @property def name(self) -> str: """Return this adapter's stable platform identifier, ``"matrix"``. Implements the abstract :attr:`PlatformAdapter.name` property. The value is used as the registry key and routing/label tag for the adapter: ``gateway_main`` reads it when wiring platforms, ``background_tasks`` keys its ``adapter_map`` and channel catalog by it, and ``message_processor.channel_heartbeat`` uses it for log labels. Constant with no side effects. Returns: str: The literal platform name ``"matrix"``. """ return "matrix" @property def is_running(self) -> bool: """Report whether the background Matrix sync loop is active. Implements the abstract :attr:`PlatformAdapter.is_running` property by checking that the ``_sync_task`` created in :meth:`start` exists and has not finished. Used as a guard in :meth:`start` and :meth:`stop` and read externally by the web admin surfaces (``web/bot_admin.py``, ``web/platforms_api.py``, ``web/deps.py``) and ``background_tasks`` to decide whether the platform is live. Read-only with no side effects. Returns: bool: ``True`` while the sync task is running, ``False`` otherwise. """ return self._sync_task is not None and not self._sync_task.done() @property def bot_identity(self) -> dict[str, str]: """Describe this bot's Matrix identity for the pipeline. Implements the abstract :attr:`PlatformAdapter.bot_identity` property. Prefers the live ``user_id`` from the connected nio ``AsyncClient``, falling back to the configured ``self._user_id`` before login, and uses the cached ``self._profile_display_name`` (populated by :meth:`_refresh_self_display_name`) for the display name, falling back to the user ID when no profile name is known. On Matrix the user ID itself doubles as the mention token. Read across adapters by ``prompt_context`` (which gathers ``bot_identity`` for every adapter), ``message_processor.user_message_format``, ``message_processor.history_backfill``, ``background_tasks``, and ``web/bot_admin.py`` to label and attribute the bot's own messages. Returns: dict[str, str]: A mapping with ``platform``, ``user_id``, ``display_name``, and ``mention`` keys describing this bot. """ uid = (self.client.user_id if self.client else None) or self._user_id display = (self._profile_display_name or "").strip() or uid return { "platform": "matrix", "user_id": uid, "display_name": display, "mention": uid, } async def _refresh_self_display_name(self) -> None: """Fetch and cache the bot's profile display name from the homeserver. Calls the nio client's ``get_displayname`` (an HTTP request to the homeserver) and stores the result in ``self._profile_display_name`` so :attr:`bot_identity` can present a human-readable name instead of the bare user ID. Handles a missing client, request failure, or error response by logging at debug level and leaving the cached name unchanged; logs the resolved name at info level on success. Called by :meth:`start` within this module after the initial sync; it has no external callers. """ if not self.client: return try: resp = await self.client.get_displayname() except Exception: logger.debug("Matrix: get_displayname request failed", exc_info=True) return if isinstance(resp, ProfileGetDisplayNameError): logger.debug("Matrix: get_displayname error: %s", resp) return if isinstance(resp, ProfileGetDisplayNameResponse): dn = (resp.displayname or "").strip() self._profile_display_name = dn if dn: logger.info("Matrix: profile display name is %r", dn) # -- PlatformAdapter lifecycle -------------------------------------
[docs] async def start(self) -> None: """Connect to Matrix, perform the initial sync, and launch the sync loop. Implements the abstract :meth:`PlatformAdapter.start` lifecycle hook. Creates the E2EE-enabled nio ``AsyncClient``, then either restores a saved session (via :func:`load_matrix_credentials` / ``restore_login``) or logs in with the configured password and persists the new session through :func:`save_matrix_credentials`. Registers all event callbacks (:meth:`_register_callbacks`), runs a full initial sync, trusts known devices (:func:`trust_all_devices`), uploads/queries/claims E2EE keys, refreshes the profile name (:meth:`_refresh_self_display_name`), runs cross-signing setup (:func:`setup_cross_signing`), and finally spawns the background :meth:`_sync_loop` task. Touches the homeserver over HTTP, the local E2EE store directory and credentials file, and the asyncio loop. Invoked by the gateway service (``gateway_main``) when it brings each configured platform adapter online. Raises: RuntimeError: If there are no saved credentials and no password to log in with, or if the login attempt is rejected. """ if self.is_running: logger.warning("Matrix platform is already running") return self._stop_event.clear() # Ensure store directory exists store_path = Path(self._store_path) store_path.mkdir(parents=True, exist_ok=True) # Configure nio with E2EE client_config = AsyncClientConfig( max_limit_exceeded=0, max_timeouts=0, store_sync_tokens=True, encryption_enabled=True, ) self.client = AsyncClient( homeserver=self._homeserver, user=self._user_id, store_path=str(store_path), config=client_config, ) # Login or restore session creds = self.credentials or await load_matrix_credentials( self._credentials_file, ) self.credentials = creds if creds and creds.get("access_token"): logger.info( "Restoring Matrix session for %s on device %s", creds["user_id"], creds["device_id"], ) self.client.restore_login( user_id=creds["user_id"], device_id=creds["device_id"], access_token=creds["access_token"], ) else: if not self._password: raise RuntimeError( "No saved Matrix credentials and no password " "configured – cannot login" ) logger.info("Logging in to Matrix as %s …", self._user_id) resp = await self.client.login( self._password, device_name="MatrixLLMBot", ) if not isinstance(resp, LoginResponse): await self.client.close() raise RuntimeError(f"Matrix login failed: {resp}") logger.info( "Logged in as %s on device %s", resp.user_id, resp.device_id, ) await save_matrix_credentials( self._credentials_file, self._homeserver, self.client, ) self.credentials = { "homeserver": self._homeserver, "user_id": resp.user_id, "device_id": resp.device_id, "access_token": resp.access_token, } # Register event callbacks self._register_callbacks() # Initial sync + auto-trust + key management logger.info("Matrix: performing initial sync …") await self.client.sync(timeout=30000, full_state=True) trust_all_devices(self.client) if self.client.should_upload_keys: await self.client.keys_upload() if self.client.should_query_keys: await self.client.keys_query() if self.client.should_claim_keys: await self.client.keys_claim(self.client.get_users_for_key_claiming()) await self.client.send_to_device_messages() await self._refresh_self_display_name() # Cross-signing: self-sign our device so other users don't see # "Encrypted by a device not verified by its owner" try: saved_seeds = (self.credentials or {}).get("cross_signing_seeds") await setup_cross_signing( self.client, password=self._password, credentials_file=self._credentials_file, saved_seeds=saved_seeds, ) except Exception: logger.warning("Cross-signing setup failed", exc_info=True) logger.info("Matrix platform is running. Listening for messages …") self._sync_task = asyncio.create_task(self._sync_loop())
[docs] async def stop(self) -> None: """Cancel the sync loop and close the Matrix client connection. Implements the abstract :meth:`PlatformAdapter.stop` lifecycle hook. Sets the ``_stop_event`` so :meth:`_sync_loop` will exit, cancels and awaits the ``_sync_task``, then closes the nio ``AsyncClient`` and clears the references so :attr:`is_running` reports false. Releases the network connection and the asyncio task; logs when fully stopped. Safe to call when not running (returns immediately). Invoked by the gateway service (``gateway_main``) during shutdown and by ``web/platforms_api`` when an operator stops a platform from the admin UI. """ if not self.is_running: return self._stop_event.set() if self._sync_task: self._sync_task.cancel() try: await self._sync_task except asyncio.CancelledError: pass self._sync_task = None if self.client: await self.client.close() self.client = None logger.info("Matrix platform stopped")
[docs] async def should_skip_channel_heartbeat(self, channel_id: str) -> bool: """Decide whether to suppress the periodic heartbeat for a room. Implements the :class:`PlatformAdapter` hook consulted by ``message_processor.channel_heartbeat`` and ``core.outbound_consumer`` before they run an unsolicited background heartbeat. Looks the room up in the connected nio client's ``rooms`` map and reports that the heartbeat should be skipped for direct messages and other rooms with two or fewer members, since unprompted activity there would feel intrusive. Returns ``False`` (do not skip) when the client is not connected or the room is unknown. Args: channel_id (str): The Matrix room ID to evaluate. Returns: bool: ``True`` if the heartbeat should be skipped for this room, ``False`` otherwise. """ if self.client is None: return False room = self.client.rooms.get(channel_id) if room is None: return False return room.member_count <= 2
# -- PlatformAdapter outbound messaging ----------------------------
[docs] async def send(self, channel_id: str, text: str) -> str: """Render and send a text message to a Matrix room, returning its event ID. Implements the abstract :meth:`PlatformAdapter.send` outbound hook. Converts the text to an HTML ``formatted_body`` (:func:`_markdown_to_html`), linkifies bare mentions into ``matrix.to`` anchors and collects them into ``m.mentions`` (:func:`_linkify_matrix_mentions`), and derives the plain ``body`` (:func:`_strip_html`). Best-effort enriches the payload with the current sprite/character state read from Redis (keys ``star:sprite:state:matrix:<channel>`` and ``star:sprite:chars:matrix:<channel>``) under an ``sg.sprite`` field for per-message replay, swallowing any Redis error. Sends via the nio client's ``room_send`` (network I/O) and records the resulting event ID in ``_sent_events`` so replies to it are treated as addressed by :meth:`_is_addressed`. Driven by the outbound side of the pipeline -- ``message_processor`` (``generate_and_send``, ``command_router``, ``processor``) and the ``task_manager`` -- which call ``platform.send`` on the resolved adapter. Args: channel_id (str): The Matrix room ID to post into. text (str): The Markdown message text to render and send. Returns: str: The sent event ID, or an empty string if the client is not connected or the send fails. """ if self.client is None: logger.error("Matrix client is not connected") return "" html = _markdown_to_html(text) html, mentioned_users = _linkify_matrix_mentions(html) content: dict[str, Any] = { "msgtype": "m.text", "body": _strip_html(text), "format": "org.matrix.custom.html", "formatted_body": html, "m.mentions": {"user_ids": mentioned_users}, } # 💀🔥 Embed current sprite state for per-message replay try: from config import config as app_config import redis.asyncio as _aredis _r = await _aredis.from_url( app_config.get("REDIS_URL", "redis://localhost:6379"), decode_responses=True, socket_timeout=2, ) _ck = f"matrix:{channel_id}" _spr = await _r.get(f"star:sprite:state:{_ck}") _chr = await _r.get(f"star:sprite:chars:{_ck}") await _r.close() sprite_meta: dict[str, Any] = {} if _spr: sprite_meta["sprite_state"] = json.loads(_spr) if _chr: _cd = json.loads(_chr) sprite_meta["characters"] = ( list(_cd.values()) if isinstance(_cd, dict) else _cd ) if sprite_meta: content["sg.sprite"] = sprite_meta except Exception: pass # Non-critical -- don't break message sending try: resp = await self.client.room_send( room_id=channel_id, message_type="m.room.message", content=content, ignore_unverified_devices=True, ) if isinstance(resp, RoomSendResponse): self._sent_events.setdefault( channel_id, set(), ).add(resp.event_id) return resp.event_id logger.warning( "room_send to %s returned non-success response: %s", channel_id, type(resp).__name__, ) except Exception: logger.exception( "Failed to send message to Matrix room %s", channel_id, ) return ""
[docs] async def add_reaction(self, channel_id: str, message_id: str, emoji: str) -> None: """Add an ``m.reaction`` annotation to a Matrix event. Gateway side of the outbound ``type: "reaction"`` relay. Sends an ``m.reaction`` event relating to *message_id* (the target event id). Errors are logged, not raised. """ if self.client is None: logger.error("Matrix client is not connected (add_reaction)") return try: await self.client.room_send( room_id=channel_id, message_type="m.reaction", content={ "m.relates_to": { "rel_type": "m.annotation", "event_id": message_id, "key": emoji, } }, ignore_unverified_devices=True, ) except Exception: logger.exception( "Failed to add reaction %r to event %s in room %s", emoji, message_id, channel_id, )
[docs] async def send_file( self, channel_id: str, data: bytes, filename: str, mimetype: str = "application/octet-stream", ) -> str | None: """Upload a file to Matrix and post it into a room as a media message. Implements the abstract :meth:`PlatformAdapter.send_file` outbound hook. Uploads the bytes via the nio client's ``upload`` (encrypting the blob when the target room is encrypted), picks the ``msgtype`` (``m.image`` / ``m.audio`` / ``m.video`` / ``m.file``) from the MIME type, builds the message content with the returned ``mxc://`` URI (or encryption keys for encrypted rooms), and sends it with ``room_send``. Records the event ID in ``_sent_events`` like :meth:`send` so replies count as addressed. Performs network I/O; logs and returns ``None`` on failure instead of raising. Driven by the outbound media path -- ``core.outbound_consumer`` and the media-producing ``tools`` (image/video/audio generators) and ``background_agents`` -- which call ``adapter.send_file`` on the resolved adapter. Args: channel_id (str): The Matrix room ID to post into. data (bytes): The raw file contents to upload. filename (str): The display/body filename for the attachment. mimetype (str): The MIME type of the file. Returns: str | None: The ``mxc://`` content URI on success, or ``None`` if the client is not connected or the upload/send fails. """ if self.client is None: logger.error("Matrix client is not connected") return None try: room = self.client.rooms.get(channel_id) room_encrypted = room.encrypted if room else False upload_resp, encryption_keys = await self.client.upload( io.BytesIO(data), content_type=mimetype, filename=filename, filesize=len(data), encrypt=room_encrypted, ) content_uri: str = upload_resp.content_uri msgtype = "m.file" if mimetype.startswith("image/"): msgtype = "m.image" elif mimetype.startswith("audio/"): msgtype = "m.audio" elif mimetype.startswith("video/"): msgtype = "m.video" content = { "msgtype": msgtype, "body": filename, "info": { "mimetype": mimetype, "size": len(data), }, } if encryption_keys: encryption_keys["url"] = content_uri content["file"] = encryption_keys else: content["url"] = content_uri resp = await self.client.room_send( room_id=channel_id, message_type="m.room.message", content=content, ignore_unverified_devices=True, ) if isinstance(resp, RoomSendResponse): self._sent_events.setdefault( channel_id, set(), ).add(resp.event_id) return content_uri except Exception: logger.exception( "Failed to send file to Matrix room %s", channel_id, ) return None
# -- Typing indicator ----------------------------------------------
[docs] async def start_typing(self, channel_id: str) -> None: """Begin showing the bot's typing indicator in a room until cleared. Implements the abstract :meth:`PlatformAdapter.start_typing` hook. First clears any existing indicator for the room (:meth:`stop_typing`), then spawns a background task (the nested ``_typing_loop``) that repeatedly calls the nio client's ``room_typing`` with a 30 s timeout and re-asserts it every 25 s, since Matrix typing notices expire. The task handle is stored in ``_typing_tasks`` keyed by room so :meth:`stop_typing` can cancel it. No-op when the client is not connected. Driven by the response pipeline -- ``message_processor.processor`` and ``core.outbound_consumer`` -- which call ``start_typing`` while a reply is being generated. Args: channel_id (str): The Matrix room ID to show typing in. """ await self.stop_typing(channel_id) if self.client is None: return async def _typing_loop(room_id: str) -> None: """Re-assert the typing indicator until the task is cancelled. Background coroutine spawned by the enclosing :meth:`start_typing`. Loops forever calling the nio client's ``room_typing`` with a 30 s server timeout and sleeping 25 s between refreshes so the indicator never lapses; exits quietly on ``CancelledError`` when :meth:`stop_typing` cancels the stored task. Performs network I/O on each iteration. Defined and scheduled only within :meth:`start_typing`; it has no other callers. Args: room_id (str): The Matrix room ID to keep the typing indicator alive in. """ try: while True: await self.client.room_typing( # type: ignore[union-attr] room_id, typing_state=True, timeout=30000, ) await asyncio.sleep(25) except asyncio.CancelledError: pass self._typing_tasks[channel_id] = asyncio.create_task( _typing_loop(channel_id), )
[docs] async def stop_typing(self, channel_id: str) -> None: """Cancel the typing-indicator loop for a room and clear the indicator. Implements the abstract :meth:`PlatformAdapter.stop_typing` hook. Pops and cancels the background task started by :meth:`start_typing` from ``_typing_tasks``, then explicitly tells the homeserver the bot is no longer typing via the nio client's ``room_typing`` with ``typing_state=False`` (logging at debug level if that call fails). Performs network I/O; safe to call when nothing is typing. Driven by the response pipeline -- ``message_processor.processor`` and ``core.outbound_consumer`` -- once a reply is sent; also called at the top of :meth:`start_typing` to reset state. Args: channel_id (str): The Matrix room ID to stop the typing indicator in. """ task = self._typing_tasks.pop(channel_id, None) if task is not None and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass if self.client is not None: try: await self.client.room_typing( channel_id, typing_state=False, ) except Exception: logger.debug( "Failed to clear typing indicator in %s", channel_id, exc_info=True, )
# -- Channel validation --------------------------------------------
[docs] async def is_channel_valid(self, channel_id: str) -> bool: """Return ``True`` if the room is one the bot is currently joined to. Implements the abstract :meth:`PlatformAdapter.is_channel_valid` hook by checking the channel ID against the connected nio client's in-memory ``rooms`` map, so callers can confirm a target room is reachable before sending. Returns ``False`` when the client is not connected. Read-only with no side effects. Args: channel_id (str): The Matrix room ID to validate. Returns: bool: ``True`` if the room is known/joined, ``False`` otherwise. """ if self.client is None: return False return channel_id in self.client.rooms
# -- Server/channel listing ----------------------------------------
[docs] async def list_servers_and_channels(self) -> list[dict[str, Any]]: """Return all Matrix rooms the bot is in. Matrix doesn't have a guild/server hierarchy in the same way Discord does — each room is listed as a standalone entry. """ if self.client is None: return [] rooms: list[dict[str, Any]] = [] for room_id, room in self.client.rooms.items(): rooms.append( { "server_name": room.display_name, "server_id": room_id, "member_count": room.member_count, "channels": [], # Matrix rooms are flat } ) return rooms
# -- Channel history -----------------------------------------------
[docs] async def fetch_history( self, channel_id: str, limit: int = 100, ) -> list[HistoricalMessage]: """Fetch recent room history as normalized :class:`HistoricalMessage` items. Implements the abstract :meth:`PlatformAdapter.fetch_history` hook. Pages backward from the client's current ``next_batch`` token via the nio client's ``room_messages`` (a homeserver HTTP request, filtered to ``m.room.message`` events), then maps each text or media event into a :class:`HistoricalMessage` -- rendering media as ``[Image]`` / ``[Video]`` / ``[Audio]`` / ``[File]`` placeholders, resolving sender display names, flagging the bot's own messages, and capturing reply targets via :func:`_get_reply_to_id`. Results are reversed into chronological order. Performs network I/O; on error or a non-success response it logs at debug level and returns an empty list. Driven by history-backfill and cross-channel features -- ``background_tasks``, ``message_processor.history_backfill``, ``core.outbound_consumer``, and the ``cross_channel_query`` / ``admin_whisper`` tools -- which call ``fetch_history`` on the resolved adapter. Args: channel_id (str): The Matrix room ID to read history from. limit (int): Maximum number of messages to fetch. Returns: list[HistoricalMessage]: The fetched messages in chronological order, or an empty list if the client is disconnected or the request fails. """ if self.client is None: return [] start_token = self.client.next_batch if not start_token: return [] try: resp = await self.client.room_messages( room_id=channel_id, start=start_token, limit=limit, message_filter={"types": ["m.room.message"]}, ) except Exception: logger.debug( "Failed to fetch history for Matrix room %s", channel_id, exc_info=True, ) return [] if not isinstance(resp, RoomMessagesResponse): return [] room = self.client.rooms.get(channel_id) bot_user_id = self.client.user_id messages: list[HistoricalMessage] = [] _media_types = ( RoomMessageImage, RoomMessageVideo, RoomMessageAudio, RoomMessageFile, RoomEncryptedImage, RoomEncryptedVideo, RoomEncryptedAudio, RoomEncryptedFile, ) for event in resp.chunk: text: str | None = None if isinstance(event, RoomMessageText): text = event.body elif isinstance(event, _media_types): url = getattr(event, "url", "") or "" body = getattr(event, "body", "") or "" if isinstance(event, (RoomMessageImage, RoomEncryptedImage)): text = f"[Image: {body}]" if body else "[Image]" elif isinstance(event, (RoomMessageVideo, RoomEncryptedVideo)): text = f"[Video: {body}]" if body else "[Video]" elif isinstance(event, (RoomMessageAudio, RoomEncryptedAudio)): text = f"[Audio: {body}]" if body else "[Audio]" else: text = f"[File: {body}]" if body else "[File]" if url: text += f" ({url})" else: continue sender = event.sender if room is not None: display_name = room.user_name(sender) or sender else: display_name = sender messages.append( HistoricalMessage( user_id=sender, user_name=display_name, text=text, timestamp=datetime.fromtimestamp( event.server_timestamp / 1000, tz=timezone.utc, ), message_id=event.event_id, is_bot=(sender == bot_user_id), reply_to_id=_get_reply_to_id(event), ) ) messages.reverse() return messages
# -- Internal: event callbacks ------------------------------------- def _register_callbacks(self) -> None: """Wire all the adapter's handlers onto the nio client's callback registry. Registers room-event callbacks (text via :meth:`_on_message`, invites via :meth:`_on_invite`, undecryptable Megolm events via :meth:`_on_megolm_event`, and every media event type via :meth:`_on_media`), the to-device SAS verification handlers (:meth:`_on_unknown_to_device`, :meth:`_on_key_verification_start`, :meth:`_on_key_verification_key`, :meth:`_on_key_verification_mac`, :meth:`_on_key_verification_cancel`), and the in-room verification handlers (:meth:`_on_room_verification_request`, :meth:`_on_room_verification_event`). Mutates the nio client's ``add_event_callback`` / ``add_to_device_callback`` tables; the registered handlers are later dispatched by nio during each sync. Called by :meth:`start` within this module after login and before the initial sync; it has no external callers. """ assert self.client is not None self.client.add_event_callback( self._on_message, RoomMessageText, ) self.client.add_event_callback( self._on_invite, InviteMemberEvent, ) self.client.add_event_callback( self._on_megolm_event, MegolmEvent, ) _media_types = ( RoomMessageImage, RoomEncryptedImage, RoomMessageAudio, RoomEncryptedAudio, RoomMessageVideo, RoomEncryptedVideo, RoomMessageFile, RoomEncryptedFile, ) for event_type in _media_types: self.client.add_event_callback( self._on_media, event_type, ) self.client.add_to_device_callback( self._on_unknown_to_device, UnknownToDeviceEvent, ) self.client.add_to_device_callback( self._on_key_verification_start, KeyVerificationStart, ) self.client.add_to_device_callback( self._on_key_verification_key, KeyVerificationKey, ) self.client.add_to_device_callback( self._on_key_verification_mac, KeyVerificationMac, ) self.client.add_to_device_callback( self._on_key_verification_cancel, KeyVerificationCancel, ) self.client.add_event_callback( self._on_room_verification_request, RoomMessageUnknown, ) self.client.add_event_callback( self._on_room_verification_event, UnknownEvent, ) # -- In-room SAS verification helpers --------------------------------- async def _in_room_verif_send( self, room_id: str, event_type: str, content: dict, request_event_id: str, ) -> None: """Send an in-room verification event with the required m.relates_to. In-room verification events must NOT include transaction_id in the content body; the relation is carried by m.relates_to instead. """ assert self.client is not None content.pop("transaction_id", None) content["m.relates_to"] = { "rel_type": "m.reference", "event_id": request_event_id, } await self.client.room_send( room_id, event_type, content, ignore_unverified_devices=True, ) @staticmethod def _to_device_dict(source: dict, tx_id: str) -> dict: """Build a to-device-style dict from a room event source dict. nio's KeyVerification* from_dict() methods expect a to-device envelope with ``sender`` and ``content.transaction_id``. """ content = dict(source.get("content", {})) content["transaction_id"] = tx_id return {"sender": source["sender"], "content": content} # -- In-room SAS handlers --------------------------------------------- async def _on_room_verification_request( self, room: MatrixRoom, event: RoomMessageUnknown, ) -> None: """Respond to an in-room SAS verification request with a ready event. Handler registered for ``RoomMessageUnknown`` events and dispatched by nio during sync. Ignores anything that is not an ``m.key.verification.request``, requests from the bot itself, or requests that do not offer the ``m.sas.v1`` method; otherwise it replies with an ``m.key.verification.ready`` event advertising SAS support, sent through :meth:`_in_room_verif_send` (which attaches the required ``m.relates_to`` reference and performs the ``room_send`` network call). No-op when the client is not connected. Invoked by nio's event dispatch loop; not called directly elsewhere in the codebase. Args: room (MatrixRoom): The room the request arrived in. event (RoomMessageUnknown): The raw verification-request message. """ if self.client is None: return if event.msgtype != "m.key.verification.request": return if event.sender == self.client.user_id: return content = event.source.get("content", {}) methods: list[str] = content.get("methods", []) request_event_id: str = event.event_id if "m.sas.v1" not in methods: logger.debug( "Ignoring in-room verification request from %s: unsupported methods %s", event.sender, methods, ) return logger.debug( "In-room SAS verification request from %s in %s (event %s)", event.sender, room.room_id, request_event_id, ) ready_content = { "from_device": self.client.device_id, "methods": ["m.sas.v1"], } await self._in_room_verif_send( room.room_id, "m.key.verification.ready", ready_content, request_event_id, ) async def _on_room_verification_event( self, room: MatrixRoom, event: UnknownEvent, ) -> None: """Drive the responder side of an in-room SAS verification exchange. Handler registered for ``UnknownEvent`` events and dispatched by nio during sync. Routes the various ``m.key.verification.*`` sub-events keyed by the original request's event ID: on ``start`` it builds a nio ``Sas`` from the initiator's device, recomputes the commitment, and sends an accept, stashing the live ``Sas`` in ``_in_room_sas``; on ``key`` it shares the bot's key, auto-accepts the emoji comparison, and sends the MAC; on ``mac`` it verifies and, on success, marks the device trusted (``verify_device``) and sends ``done``; ``cancel`` / ``done`` clean up state. All outbound events go through :meth:`_in_room_verif_send` (network I/O); mutates ``_in_room_sas`` and the client's device-trust state. Invoked by nio's event dispatch loop; not called directly elsewhere in the codebase. Args: room (MatrixRoom): The room the verification event arrived in. event (UnknownEvent): The raw ``m.key.verification.*`` event. """ if self.client is None: return if event.sender == self.client.user_id: return ev_type: str = event.type source: dict = event.source content: dict = source.get("content", {}) # transaction_id for in-room verification is the original request event_id relates_to: dict = content.get("m.relates_to", {}) request_event_id: str | None = relates_to.get("event_id") if ev_type == "m.key.verification.done": logger.debug( "Received in-room m.key.verification.done from %s", event.sender, ) return if ev_type == "m.key.verification.cancel": logger.debug( "In-room SAS canceled by %s (event %s): [%s] %s", event.sender, request_event_id, content.get("code", ""), content.get("reason", ""), ) if request_event_id: self._in_room_sas.pop(request_event_id, None) return if not ev_type.startswith("m.key.verification."): return if not request_event_id: logger.debug("In-room verification event %s has no m.relates_to", ev_type) return # ---- start ------------------------------------------------------- if ev_type == "m.key.verification.start": if content.get("method") != "m.sas.v1": return try: user_devices = self.client.device_store[event.sender] except KeyError: user_devices = {} from_device: str = content.get("from_device", "") olm_device: OlmDevice | None = user_devices.get(from_device) if olm_device is None: logger.debug( "In-room SAS start from unknown device %s/%s", event.sender, from_device, ) return fp_key: str = self.client.olm.account.identity_keys["ed25519"] td = self._to_device_dict(source, request_event_id) try: typed_start = KeyVerificationStart.from_dict(td) except Exception: logger.exception("Failed to parse in-room KeyVerificationStart") return try: sas = Sas.from_key_verification_start( self.client.user_id, self.client.device_id, fp_key, olm_device, typed_start, ) except Exception: logger.exception("Failed to create Sas for in-room verification") return # Recompute commitment from the original room event content # (without the injected transaction_id) so it matches what the # initiator will verify against. original_content = source.get("content", {}) sas.commitment = olm.sha256( sas.pubkey + NioApi.to_canonical_json(original_content), ) if sas.canceled: logger.debug( "In-room SAS start from %s/%s was invalid: %s", event.sender, from_device, sas.cancel_reason, ) return accept_msg = sas.accept_verification() await self._in_room_verif_send( room.room_id, accept_msg.type, dict(accept_msg.content), request_event_id, ) self._in_room_sas[request_event_id] = (sas, olm_device, room.room_id) logger.debug( "In-room SAS started with %s/%s (event %s)", event.sender, from_device, request_event_id, ) return # ---- key / mac --------------------------------------------------- entry = self._in_room_sas.get(request_event_id) if entry is None: logger.debug( "In-room verification event %s for unknown request %s", ev_type, request_event_id, ) return sas, olm_device, room_id = entry if ev_type == "m.key.verification.key": # As responder, send our key only after receiving theirs. key_msg = sas.share_key() await self._in_room_verif_send( room_id, key_msg.type, dict(key_msg.content), request_event_id, ) td = self._to_device_dict(source, request_event_id) try: typed_key = KeyVerificationKey.from_dict(td) except Exception: logger.exception("Failed to parse in-room KeyVerificationKey") return sas.receive_key_event(typed_key) if sas.canceled: logger.debug( "In-room SAS canceled after key exchange: %s", sas.cancel_reason, ) self._in_room_sas.pop(request_event_id, None) return emojis = sas.get_emoji() emoji_str = " ".join(f"{e} {d}" for e, d in emojis) logger.debug( "In-room SAS emojis (event %s): %s — auto-accepting", request_event_id, emoji_str, ) sas.accept_sas() mac_msg = sas.get_mac() await self._in_room_verif_send( room_id, mac_msg.type, dict(mac_msg.content), request_event_id, ) elif ev_type == "m.key.verification.mac": td = self._to_device_dict(source, request_event_id) try: typed_mac = KeyVerificationMac.from_dict(td) except Exception: logger.exception("Failed to parse in-room KeyVerificationMac") return sas.receive_mac_event(typed_mac) if sas.verified: self.client.verify_device(olm_device) await self._in_room_verif_send( room_id, "m.key.verification.done", {}, request_event_id, ) logger.debug( "In-room SAS verified device %s/%s (event %s)", olm_device.user_id, olm_device.id, request_event_id, ) elif sas.canceled: logger.debug( "In-room SAS MAC verification failed (event %s): %s", request_event_id, sas.cancel_reason, ) self._in_room_sas.pop(request_event_id, None) # -- SAS key-verification handlers ------------------------------------ async def _on_unknown_to_device( self, event: UnknownToDeviceEvent, ) -> None: """Handle untyped to-device verification events and offer SAS readiness. Handler registered for ``UnknownToDeviceEvent`` and dispatched by nio when it receives to-device traffic during sync. Logs and ignores ``m.key.verification.done`` and anything other than ``m.key.verification.request``; for a valid SAS request it replies with an ``m.key.verification.ready`` ``ToDeviceMessage`` (advertising ``m.sas.v1``) sent via the nio client's ``to_device`` (network I/O). This is the device-to-device counterpart to :meth:`_on_room_verification_request`. No-op when the client is not connected. Invoked by nio's to-device dispatch loop; not called directly elsewhere in the codebase. Args: event (UnknownToDeviceEvent): The raw to-device verification event. """ if event.type == "m.key.verification.done": logger.debug( "Received m.key.verification.done from %s (source: %s)", event.sender, event.source.get("content", {}), ) return if event.type != "m.key.verification.request": return if self.client is None: return content = event.source.get("content", {}) methods: list[str] = content.get("methods", []) transaction_id: str | None = content.get("transaction_id") from_device: str | None = content.get("from_device") logger.debug("Verification request content: %s", content) if "m.sas.v1" not in methods or not transaction_id or not from_device: logger.debug( "Ignoring verification request from %s: unsupported methods %s", event.sender, methods, ) return logger.debug( "SAS verification request from %s (device %s, tx %s)", event.sender, from_device, transaction_id, ) ready_content = { "from_device": self.client.device_id, "methods": ["m.sas.v1"], "transaction_id": transaction_id, } ready_msg = ToDeviceMessage( "m.key.verification.ready", event.sender, from_device, ready_content, ) await self.client.to_device(ready_msg) async def _on_key_verification_start( self, event: KeyVerificationStart, ) -> None: """Accept a to-device SAS verification start using nio's internal Sas. Handler registered for ``KeyVerificationStart`` and dispatched by nio during sync. For an ``m.sas.v1`` start it looks up the ``Sas`` object nio already created for the transaction in ``client.olm.key_verifications`` and, if it is valid, sends the accept via the client's ``to_device`` (network I/O). Bails out for unsupported methods, an unknown/cancelled transaction, or a disconnected client. Invoked by nio's to-device dispatch loop; not called directly elsewhere in the codebase. Args: event (KeyVerificationStart): The parsed SAS start event. """ if self.client is None: return if event.method != "m.sas.v1": return sas = self.client.olm.key_verifications.get(event.transaction_id) if sas is None or sas.canceled: logger.debug( "SAS start from %s/%s but no valid internal Sas (tx %s)", event.sender, event.from_device, event.transaction_id, ) return await self.client.to_device(sas.accept_verification()) logger.debug( "SAS verification accepted for %s/%s (tx %s)", event.sender, event.from_device, event.transaction_id, ) async def _on_key_verification_key( self, event: KeyVerificationKey, ) -> None: """Auto-accept the emoji comparison and send the MAC after key exchange. Handler registered for ``KeyVerificationKey`` and dispatched by nio during sync. nio's own handler has already recorded the peer's key and queued the bot's; this flushes that with ``send_to_device_messages``, logs the SAS emojis at debug level, auto-accepts the comparison (``accept_sas`` -- this is an unattended bot, so it always trusts the request), and sends the MAC via ``to_device``. All network I/O. Bails on a disconnected client, an unknown transaction, or a cancelled ``Sas``. Invoked by nio's to-device dispatch loop; not called directly elsewhere in the codebase. Args: event (KeyVerificationKey): The parsed SAS key event. """ if self.client is None: return sas = self.client.olm.key_verifications.get(event.transaction_id) if sas is None: return if sas.canceled: logger.debug( "SAS canceled after key exchange (tx %s): %s", event.transaction_id, sas.cancel_reason, ) return # nio's internal handler already called receive_key_event() and # queued share_key() in outgoing_to_device_messages — flush it. await self.client.send_to_device_messages() emojis = sas.get_emoji() emoji_str = " ".join(f"{e} {d}" for e, d in emojis) logger.debug( "SAS emojis for tx %s: %s — auto-accepting", event.transaction_id, emoji_str, ) sas.accept_sas() await self.client.to_device(sas.get_mac()) async def _on_key_verification_mac( self, event: KeyVerificationMac, ) -> None: """Finalize a to-device SAS verification once nio has checked the MAC. Handler registered for ``KeyVerificationMac`` and dispatched by nio during sync. nio's own handler has already processed the MAC and (if valid) verified the device; this inspects the resulting ``Sas`` state and, on success, sends an ``m.key.verification.done`` ``ToDeviceMessage`` via ``to_device`` (network I/O) to close out the exchange, logging the verified peer device. On cancellation it logs the reason. Bails on a disconnected client or an unknown transaction. Invoked by nio's to-device dispatch loop; not called directly elsewhere in the codebase. Args: event (KeyVerificationMac): The parsed SAS MAC event. """ if self.client is None: return sas = self.client.olm.key_verifications.get(event.transaction_id) if sas is None: return # nio's internal handler already called receive_mac_event() and # verify_device() if the MAC was valid. if sas.verified: done_content = {"transaction_id": event.transaction_id} done_msg = ToDeviceMessage( "m.key.verification.done", sas.other_olm_device.user_id, sas.other_olm_device.id, done_content, ) resp = await self.client.to_device(done_msg) logger.debug( "Device %s/%s verified via SAS (tx %s), done sent -> %s", sas.other_olm_device.user_id, sas.other_olm_device.id, event.transaction_id, type(resp).__name__, ) elif sas.canceled: logger.debug( "SAS MAC verification failed (tx %s): %s", event.transaction_id, sas.cancel_reason, ) async def _on_key_verification_cancel( self, event: KeyVerificationCancel, ) -> None: """Log that a peer cancelled an in-progress to-device SAS verification. Handler registered for ``KeyVerificationCancel`` and dispatched by nio during sync. nio has already torn down its internal ``Sas`` state for the transaction; this only records the sender, transaction ID, and the cancel code/reason at debug level so the cancellation is visible in logs. No network I/O or state mutation of its own. Invoked by nio's to-device dispatch loop; not called directly elsewhere in the codebase. Args: event (KeyVerificationCancel): The parsed SAS cancel event. """ logger.debug( "SAS verification canceled by %s (tx %s): [%s] %s", event.sender, event.transaction_id, event.code, event.reason, ) async def _on_message( self, room: MatrixRoom, event: RoomMessageText, ) -> None: """Convert an inbound Matrix text message into an :class:`IncomingMessage`. Handler registered for ``RoomMessageText`` and dispatched by nio during sync. Skips the bot's own echoes, then -- when enabled in config -- resolves custom Matrix emojis in the formatted body into image attachments by downloading them (via ``platforms.emoji_resolver``, which honors the shared ``MediaCache``). Builds the normalized :class:`IncomingMessage` with channel/sender metadata, the addressed decision (:meth:`_is_addressed`), reply target (:func:`_get_reply_to_id`), and an ``extra`` dict carrying DM/member-count/admin flags, then hands it to the adapter's ``_message_handler`` which feeds the inference pipeline. Performs network I/O for emoji downloads. Invoked by nio's event dispatch loop; not called directly elsewhere in the codebase. Args: room (MatrixRoom): The room the message was sent in. event (RoomMessageText): The inbound text event. """ assert self.client is not None if event.sender == self.client.user_id: return text = event.body attachments: list[Attachment] = [] # --- Resolve custom emojis as images -------------------------- cfg = self._config if cfg is not None and cfg.resolve_emojis_as_images and text: try: source_content = (event.source or {}).get("content", {}) formatted_body = source_content.get("formatted_body", "") if formatted_body: from platforms.emoji_resolver import ( extract_matrix_emojis, rewrite_matrix_emoji_text, download_matrix_emojis, ) emoji_matches = extract_matrix_emojis(formatted_body) if emoji_matches: emoji_atts = await download_matrix_emojis( emoji_matches, self.client, max_emojis=cfg.max_emojis_per_message, media_cache=self._media_cache, ) if emoji_atts: text = rewrite_matrix_emoji_text( text, emoji_matches[: cfg.max_emojis_per_message] ) attachments.extend(emoji_atts) logger.info( "Resolved %d/%d Matrix custom emojis as images", len(emoji_atts), len(emoji_matches), ) except Exception: logger.debug("Matrix emoji resolution failed", exc_info=True) msg = IncomingMessage( platform="matrix", channel_id=room.room_id, user_id=event.sender, user_name=room.user_name(event.sender) or event.sender, text=text, is_addressed=self._is_addressed(room, event), attachments=attachments, channel_name=room.display_name, timestamp=datetime.fromtimestamp( event.server_timestamp / 1000, tz=timezone.utc, ), message_id=event.event_id, reply_to_id=_get_reply_to_id(event), extra={ "bot_id": self.client.user_id, "member_count": room.member_count, "is_dm": room.member_count <= 2, "is_server_admin": ( room.power_levels.get_user_level(event.sender) >= 50 if room.power_levels else False ), }, ) logger.debug( "[Matrix/%s] %s: %s", room.display_name, msg.user_name, msg.text[:120], ) await self._message_handler(msg, self) async def _on_media( self, room: MatrixRoom, event: MediaEvent, ) -> None: """Convert an inbound Matrix media message into an :class:`IncomingMessage`. Handler registered for every media event type (image/audio/video/file, encrypted and plain) and dispatched by nio during sync. Skips the bot's own echoes, downloads and decrypts the attachment via :func:`download_matrix_media` -- wrapped in the nested ``_fetch_media`` / ``_download`` helpers for bounded retry and routed through the shared ``MediaCache`` when configured -- and wraps the bytes in an :class:`Attachment`. Builds the normalized :class:`IncomingMessage` (caption as text, addressed decision via :meth:`_is_addressed`, reply target via :func:`_get_reply_to_id`, plus the DM/member/admin ``extra`` flags) and hands it to ``_message_handler`` for the inference pipeline. Performs network I/O; logs and proceeds with no attachment if the download fails. Invoked by nio's event dispatch loop; not called directly elsewhere in the codebase. Args: room (MatrixRoom): The room the media message was sent in. event (MediaEvent): The inbound media event (any supported type). """ assert self.client is not None if event.sender == self.client.user_id: return is_addressed = self._is_addressed(room, event) caption = event.body or "" logger.debug( "[Matrix/%s] %s: [media] %s", room.display_name, room.user_name(event.sender), caption[:120], ) # Download the attachment (using cache when available) attachments: list[Attachment] = [] mxc_url: str = event.url try: retry_attempts = ( getattr(self._config, "media_download_retry_attempts", 3) if self._config is not None else 3 ) async def _fetch_media() -> tuple[bytes, str, str]: """Download and decrypt the current event's media in one shot. Thin closure over :func:`download_matrix_media` for the event being handled, returning ``(data, mimetype, filename)``. Defined and used only within :meth:`_on_media` as the unit of work the retry wrapper and media cache invoke; it has no other callers. Performs network I/O against the homeserver. Returns: tuple[bytes, str, str]: The decrypted bytes, MIME type, and filename of the attachment. """ return await download_matrix_media(self.client, event) # type: ignore[arg-type] async def _download() -> tuple[bytes, str, str]: """Fetch the media with bounded retry on transient blips. Wraps ``_fetch_media`` in the shared :func:`platforms.media_common.download_with_retry` helper so a flaky homeserver or CDN response is retried up to the configured attempt count before giving up. Defined and used only within :meth:`_on_media` -- it is the callable passed to the media cache or invoked directly when no cache is configured; it has no other callers. Performs network I/O. Returns: tuple[bytes, str, str]: The decrypted bytes, MIME type, and filename of the attachment. """ return await download_with_retry( _fetch_media, attempts=retry_attempts, label=event.body or "attachment", ) if self._media_cache is not None: data, mimetype, filename = await self._media_cache.get_or_download( mxc_url, _download, ) else: data, mimetype, filename = await _download() attachments.append( Attachment( data=data, mimetype=mimetype, filename=filename, source_url=mxc_url, ) ) except Exception: logger.exception( "Failed to download Matrix media from %s", event.sender, ) msg = IncomingMessage( platform="matrix", channel_id=room.room_id, user_id=event.sender, user_name=room.user_name(event.sender) or event.sender, text=caption, is_addressed=is_addressed, attachments=attachments, channel_name=room.display_name, timestamp=datetime.fromtimestamp( event.server_timestamp / 1000, tz=timezone.utc, ), message_id=event.event_id, reply_to_id=_get_reply_to_id(event), extra={ "bot_id": self.client.user_id, "member_count": room.member_count, "is_dm": room.member_count <= 2, "is_server_admin": ( room.power_levels.get_user_level(event.sender) >= 50 if room.power_levels else False ), }, ) await self._message_handler(msg, self) async def _on_invite( self, room: MatrixRoom, event: InviteMemberEvent, ) -> None: """Auto-accept a room invite that targets the bot. Handler registered for ``InviteMemberEvent`` and dispatched by nio during sync. Ignores invites whose ``state_key`` is not the bot's own user ID, then schedules :meth:`_join_room` as a background task (so the sync loop is not blocked) to actually join with retry. Logs the incoming invite at info level. Invoked by nio's event dispatch loop; not called directly elsewhere in the codebase. Args: room (MatrixRoom): The room the bot is being invited to. event (InviteMemberEvent): The membership-invite event. """ assert self.client is not None if event.state_key != self.client.user_id: return logger.info( "Matrix: received invite to %s from %s – joining", room.room_id, event.sender, ) asyncio.ensure_future(self._join_room(room.room_id)) async def _join_room(self, room_id: str) -> None: """Join a room, retrying with backoff to ride out invite-commit races. Calls the nio client's ``join`` (network I/O) up to four times with escalating delays (1, 2, 4, 8 s), because a just-received invite may not yet be committed server-side when the join is attempted. Stops as soon as the join succeeds and logs the outcome of each attempt; logs an error and gives up after the fourth failure. Called by :meth:`_on_invite` within this module (scheduled as a background task via ``asyncio.ensure_future``); it has no external callers. Args: room_id (str): The Matrix room ID to join. """ assert self.client is not None for attempt in range(4): delay = 1 if attempt == 0 else 2**attempt # 1, 2, 4, 8 s await asyncio.sleep(delay) result = await self.client.join(room_id) if not isinstance(result, JoinError): logger.info("Joined Matrix room %s", room_id) return logger.warning( "Join attempt %d for %s failed: %s", attempt + 1, room_id, result, ) logger.error("Giving up joining %s after 4 attempts", room_id) async def _on_megolm_event( self, room: MatrixRoom, event: MegolmEvent, ) -> None: """Request the missing room key when a Megolm message cannot be decrypted. Handler registered for ``MegolmEvent`` and dispatched by nio during sync whenever an encrypted message arrives that the bot lacks the session key for. Logs the gap and asks the sender's devices for the key via the nio client's ``request_room_key``, then flushes the outgoing to-device request with ``send_to_device_messages`` (both network I/O); failures are logged at debug level rather than raised. Successful key delivery lets nio decrypt the backlog on a later sync. Invoked by nio's event dispatch loop; not called directly elsewhere in the codebase. Args: room (MatrixRoom): The room containing the undecryptable message. event (MegolmEvent): The encrypted event whose room key is missing. """ logger.warning( "Matrix: unable to decrypt message in %s from %s (session %s). " "Requesting missing room key.", room.room_id, event.sender, event.session_id, ) if self.client is not None: try: await self.client.request_room_key(event) await self.client.send_to_device_messages() logger.debug( "Sent key request for session %s in %s", event.session_id, room.room_id, ) except Exception: logger.debug( "Could not send key request for session %s", event.session_id, exc_info=True, ) # -- Internal: helpers --------------------------------------------- def _is_addressed( self, room: MatrixRoom, event: object, ) -> bool: """Return ``True`` when the bot should respond. The bot responds if any of the following hold: * The room has two or fewer members (DM / small room). * The bot's user ID appears in ``m.mentions.user_ids``. * The bot's user ID appears in the plain-text body. * The message is a reply to an event the bot sent. * The message is a reply to an egregore ghost message in this room. """ assert self.client is not None # DM / small room -- always respond if room.member_count <= 2: return True source = getattr(event, "source", None) or {} content = source.get("content", {}) # Modern m.mentions spec mentions = content.get("m.mentions", {}) if self.client.user_id in mentions.get("user_ids", []): return True # Fallback: user ID appears in body text body = getattr(event, "body", "") or "" if self.client.user_id in body: return True # Reply to one of the bot's own messages or an egregore ghost message relates = content.get("m.relates_to", {}) reply_to = relates.get("m.in_reply_to", {}) reply_event_id = reply_to.get("event_id") if reply_event_id: sent = self._sent_events.get(room.room_id, set()) if reply_event_id in sent: return True ego = self._egregore_sent_events.get(room.room_id, set()) if reply_event_id in ego: return True return False
[docs] def register_egregore_event_id(self, room_id: str, event_id: str) -> None: """Register an egregore ghost message so replies to it address the bot. Egregore "ghost" messages are posted under application-service puppet identities rather than the bot's own user, so a reply to one would not otherwise be recognized. Recording the event ID in ``_egregore_sent_events`` lets :meth:`_is_addressed` treat replies to that message as directed at the bot. Mutates in-memory state only; no-ops on empty inputs. Called by the egregore/ghost-addressing layer after it sends a ghost message; exercised by ``tests/test_egregore_addressing.py``. Args: room_id (str): The Matrix room the ghost message was sent in. event_id (str): The event ID of the ghost message to track. """ if not room_id or not event_id: return self._egregore_sent_events.setdefault(room_id, set()).add(event_id)
async def _sync_loop(self) -> None: """Run the long-lived Matrix sync loop until the adapter is stopped. The background coroutine that keeps the bot live: it repeatedly calls the nio client's ``sync`` (the network long-poll that drives every registered event callback), and after each successful round re-trusts devices (:func:`trust_all_devices`) and uploads/queries/claims E2EE keys and flushes pending to-device messages. Transient network errors are retried with exponential backoff (5 s up to 60 s); ``CancelledError`` unwinds cleanly and any other exception breaks the loop. Runs until ``_stop_event`` is set by :meth:`stop`. Created as an asyncio task by :meth:`start`; it has no other callers. """ backoff = 5.0 max_backoff = 60.0 try: while not self._stop_event.is_set(): try: await self.client.sync( # type: ignore[union-attr] timeout=30000, ) backoff = 5.0 # reset on success trust_all_devices( self.client, # type: ignore[arg-type] ) if self.client.should_upload_keys: # type: ignore[union-attr] await self.client.keys_upload() # type: ignore[union-attr] if self.client.should_query_keys: # type: ignore[union-attr] await self.client.keys_query() # type: ignore[union-attr] if self.client.should_claim_keys: # type: ignore[union-attr] await self.client.keys_claim( # type: ignore[union-attr] self.client.get_users_for_key_claiming(), # type: ignore[union-attr] ) await self.client.send_to_device_messages() # type: ignore[union-attr] except asyncio.CancelledError: raise except ( TimeoutError, aiohttp.ClientError, ConnectionError, OSError, ) as e: logger.warning( "Matrix sync transient error, retrying in %.0fs: %s", backoff, e, ) await asyncio.sleep(backoff) backoff = min(backoff * 2, max_backoff) continue except Exception: logger.exception("Matrix sync loop encountered an error") break except asyncio.CancelledError: pass finally: logger.info("Matrix sync loop exited")