Source code for platforms.matrix
"""Matrix platform adapter using matrix-nio.
Wraps the matrix-nio ``AsyncClient`` and converts Matrix events into
:class:`~platforms.base.IncomingMessage` instances for the shared
:class:`~message_processor.MessageProcessor`.
"""
from __future__ import annotations
import asyncio
import base64
import io
import json as _stdlib_json
import jsonutil as json
import logging
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Union
import markdown as md
import olm
import aiohttp
import aiofiles
from nio import (
AsyncClient,
AsyncClientConfig,
InviteMemberEvent,
JoinError,
LoginResponse,
MatrixRoom,
MegolmEvent,
RoomMessageText,
RoomSendResponse,
)
from nio.api import Api as NioApi
from nio.crypto import Sas
from nio.crypto.attachments import decrypt_attachment
from nio.crypto.device import OlmDevice
from nio.events.room_events import (
RoomEncryptedAudio,
RoomEncryptedFile,
RoomEncryptedImage,
RoomEncryptedMedia,
RoomEncryptedVideo,
RoomMessageAudio,
RoomMessageFile,
RoomMessageImage,
RoomMessageMedia,
RoomMessageUnknown,
RoomMessageVideo,
UnknownEvent,
)
from nio.events.to_device import (
KeyVerificationCancel,
KeyVerificationKey,
KeyVerificationMac,
KeyVerificationStart,
UnknownToDeviceEvent,
)
from nio import ToDeviceMessage
from nio.responses import (
DownloadError,
ProfileGetDisplayNameError,
ProfileGetDisplayNameResponse,
RoomMessagesResponse,
)
from media_cache import MediaCache
from config import Config
from platforms.base import (
Attachment,
HistoricalMessage,
IncomingMessage,
MessageHandler,
PlatformAdapter,
)
from platforms.media_common import download_with_retry
logger = logging.getLogger(__name__)
# Union of all media event types handled by on_media
MediaEvent = Union[
RoomMessageImage,
RoomEncryptedImage,
RoomMessageAudio,
RoomEncryptedAudio,
RoomMessageVideo,
RoomEncryptedVideo,
RoomMessageFile,
RoomEncryptedFile,
]
def _get_reply_to_id(event: Any) -> str:
"""Extract the reply-to event ID from a Matrix event, or empty string.
Digs through the raw event ``source`` dict for the
``m.relates_to`` / ``m.in_reply_to`` envelope that Matrix uses to thread
replies, so the rest of the adapter can populate
:attr:`IncomingMessage.reply_to_id` and
:attr:`HistoricalMessage.reply_to_id` without reaching into nio internals
at every call site. Pure read-only dict traversal with no side effects.
Called by :meth:`MatrixPlatform.fetch_history`, :meth:`MatrixPlatform._on_message`,
and :meth:`MatrixPlatform._on_media` within this module; it has no external
callers.
Args:
event (Any): A matrix-nio room event (or anything exposing a
``source`` mapping).
Returns:
str: The referenced event ID, or an empty string when the message is
not a reply.
"""
source = getattr(event, "source", None) or {}
content = source.get("content", {})
relates = content.get("m.relates_to", {})
reply_to = relates.get("m.in_reply_to", {})
return reply_to.get("event_id", "")
_STRIP_HTML_RE = re.compile(r"<[^>]+>")
_MD_EXTENSIONS = ["fenced_code", "tables", "nl2br"]
# Matches Matrix user IDs like @localpart:server or @localpart:server:port
_MATRIX_MENTION_RE = re.compile(r"@[\w.=\-/+]+:[\w.\-]+(:\d+)?")
# Matches any HTML tag (used to skip mention replacement inside tags)
_HTML_TAG_RE = re.compile(r"<[^>]+>")
def _markdown_to_html(text: str) -> str:
"""Convert Markdown text to HTML for the Matrix ``formatted_body``.
Matrix renders rich messages from an HTML ``formatted_body`` alongside the
plain ``body``, so outbound text is run through the Python-Markdown library
(with the fenced-code, tables, and ``nl2br`` extensions enabled via
``_MD_EXTENSIONS``) before it is sent. Pure transformation with no I/O or
side effects.
Called by :meth:`MatrixPlatform.send` within this module to build the
formatted body for ``room_send``; it has no external callers.
Args:
text (str): The Markdown source text.
Returns:
str: The rendered HTML fragment.
"""
return md.markdown(text, extensions=_MD_EXTENSIONS)
def _strip_html(text: str) -> str:
"""Strip HTML tags to produce the plain-text ``body`` fallback.
Matrix messages carry both a rich ``formatted_body`` and a plain ``body``;
this removes every ``<tag>`` (via the module-level ``_STRIP_HTML_RE``
pattern) so clients without HTML rendering still show readable text. Pure
transformation with no I/O or side effects.
Called by :meth:`MatrixPlatform.send` within this module to derive the
plain ``body`` from the rendered HTML; it has no external callers.
Args:
text (str): The HTML (or Markdown) source whose tags should be removed.
Returns:
str: The text with all HTML tags stripped out.
"""
return _STRIP_HTML_RE.sub("", text)
def _linkify_matrix_mentions(html: str) -> tuple[str, list[str]]:
"""Replace bare @user:server mentions in HTML with matrix.to anchor links.
Operates only on text nodes (skips content inside HTML tags) so existing
links or attributes are never double-processed.
Returns ``(new_html, unique_user_ids)`` where ``unique_user_ids`` is the
ordered list of Matrix user IDs found, suitable for ``m.mentions.user_ids``.
"""
mentions: list[str] = []
def _replace(m: re.Match) -> str:
"""Turn one matched ``@user:server`` mention into an anchor link.
Substitution callback used by ``_MATRIX_MENTION_RE.sub`` inside the
enclosing :func:`_linkify_matrix_mentions`. Records the matched
Matrix user ID into the enclosing ``mentions`` list (so the caller
can later dedupe it for ``m.mentions.user_ids``) and returns the user
ID wrapped in a ``matrix.to`` HTML anchor.
This nested function is defined and used only within
:func:`_linkify_matrix_mentions`; it has no other internal callers.
Args:
m (re.Match): A regex match whose group 0 is the bare mention.
Returns:
str: The mention rewritten as an ``<a href="...">`` anchor.
"""
uid = m.group(0)
mentions.append(uid)
return f'<a href="https://matrix.to/#/{uid}">{uid}</a>'
parts: list[str] = []
pos = 0
for tag_match in _HTML_TAG_RE.finditer(html):
# Linkify text between tags, keep the tag itself verbatim
parts.append(_MATRIX_MENTION_RE.sub(_replace, html[pos : tag_match.start()]))
parts.append(tag_match.group(0))
pos = tag_match.end()
parts.append(_MATRIX_MENTION_RE.sub(_replace, html[pos:]))
seen: set[str] = set()
unique = [u for u in mentions if not (u in seen or seen.add(u))] # type: ignore[func-returns-value]
return "".join(parts), unique
# ------------------------------------------------------------------
# Matrix-specific media download
# ------------------------------------------------------------------
[docs]
async def download_matrix_media(
client: AsyncClient,
event: RoomMessageMedia | RoomEncryptedMedia,
) -> tuple[bytes, str, str]:
"""Download (and optionally decrypt) a media attachment from Matrix.
Returns
-------
tuple of (data, mimetype, filename)
"""
mxc_url: str = event.url
filename: str = event.body or "attachment"
resp = await client.download(mxc=mxc_url)
if isinstance(resp, DownloadError):
raise RuntimeError(f"Failed to download {mxc_url}: {resp.message}")
data: bytes = resp.body # type: ignore[assignment]
# Determine MIME type
if isinstance(event, RoomEncryptedMedia):
mimetype = event.mimetype or resp.content_type or "application/octet-stream"
else:
info = event.source.get("content", {}).get("info", {})
mimetype = (
info.get("mimetype") or resp.content_type or "application/octet-stream"
)
# Decrypt if necessary
if isinstance(event, RoomEncryptedMedia):
data = decrypt_attachment(
ciphertext=data,
key=event.key["k"],
hash=event.hashes["sha256"],
iv=event.iv,
)
return data, mimetype, filename
# ------------------------------------------------------------------
# Credential helpers
# ------------------------------------------------------------------
[docs]
async def save_matrix_credentials(
credentials_file: str,
homeserver: str,
client: AsyncClient,
) -> None:
"""Persist Matrix login credentials, preserving extra keys like seeds.
Writes the homeserver URL plus the live ``user_id``, ``device_id``, and
``access_token`` from a logged-in nio ``AsyncClient`` to the JSON
credentials file so the session can be restored on the next start without
re-authenticating. Reads any existing file first (via aiofiles) and merges
on top of it, so unrelated keys such as the cross-signing seeds written by
:func:`setup_cross_signing` survive. Touches the filesystem and logs an
info line on success.
Called by :meth:`MatrixPlatform.start` within this module right after a
fresh password login succeeds; it has no external callers.
Args:
credentials_file (str): Path to the JSON credentials file to write.
homeserver (str): The Matrix homeserver URL to record.
client (AsyncClient): The logged-in nio client supplying the identity
and access token.
"""
existing: dict[str, Any] = {}
cred_path = Path(credentials_file)
if cred_path.exists():
try:
async with aiofiles.open(credentials_file, "r") as f:
existing = json.loads(await f.read())
except (json.JSONDecodeError, OSError):
pass
existing.update(
{
"homeserver": homeserver,
"user_id": client.user_id,
"device_id": client.device_id,
"access_token": client.access_token,
}
)
async with aiofiles.open(credentials_file, "w") as f:
await f.write(json.dumps(existing, indent=2))
logger.info("Matrix credentials saved to %s", credentials_file)
[docs]
async def load_matrix_credentials(credentials_file: str) -> dict | None:
"""Load previously saved Matrix credentials, or return ``None``.
Reads and parses the JSON credentials file (via aiofiles) that
:func:`save_matrix_credentials` writes, so a restart can call
``client.restore_login`` instead of logging in with a password. Returns
``None`` when the file is absent or contains invalid JSON, signaling the
caller to fall back to a fresh login. Read-only filesystem access with no
other side effects.
Called by :meth:`MatrixPlatform.start` within this module to recover a
saved session; it has no external callers.
Args:
credentials_file (str): Path to the JSON credentials file to read.
Returns:
dict | None: The decoded credentials mapping, or ``None`` if the file
is missing or unparseable.
"""
cred_path = Path(credentials_file)
if not cred_path.exists():
return None
async with aiofiles.open(credentials_file, "r") as f:
content = await f.read()
try:
return json.loads(content)
except json.JSONDecodeError:
return None
# ------------------------------------------------------------------
# Auto-trust helper
# ------------------------------------------------------------------
[docs]
def trust_all_devices(client: AsyncClient) -> None:
"""Mark every known device of every tracked user as trusted.
Walks the nio client's ``device_store`` and calls ``verify_device`` on any
device the Olm layer does not already consider verified, so the bot will
encrypt to and accept messages from all participants without manual
verification. This is a deliberate trust-on-first-use policy for an
unattended bot; it mutates the client's in-memory device-trust state and
logs each newly trusted device at debug level.
Called by :meth:`MatrixPlatform.start` (after the initial sync) and
:meth:`MatrixPlatform._sync_loop` (after every successful sync) within this
module; it has no external callers.
Args:
client (AsyncClient): The connected nio client whose device store is
scanned and whose devices are verified in place.
"""
for user_id, devices in client.device_store.items():
for device_id, olm_device in devices.items():
if not client.olm.is_device_verified(olm_device):
client.verify_device(olm_device)
logger.debug(
"Auto-trusted device %s of %s",
device_id,
user_id,
)
# ------------------------------------------------------------------
# Cross-signing setup
# ------------------------------------------------------------------
def _sign_json(signing_key: olm.PkSigning, obj: dict) -> str:
"""Canonical-JSON-sign ``obj`` with ``signing_key`` and return the signature.
Serializes the object to Matrix canonical JSON (sorted keys, no
whitespace, via the stdlib ``json`` module aliased here as ``_stdlib_json``
so the project ``jsonutil`` shim is bypassed for byte-exact output) and
signs it with the given Olm ``PkSigning`` key. Used to build the
cross-signing key hierarchy and the self-signature on the bot's own device.
Pure computation with no I/O or side effects.
Called by :func:`setup_cross_signing` (to sign the self-signing and
user-signing keys with the master key) and :func:`_sign_own_device` (to
sign the device key with the self-signing key) within this module; it has
no external callers.
Args:
signing_key (olm.PkSigning): The Olm signing key to sign with.
obj (dict): The object to canonicalize and sign.
Returns:
str: The base64 Ed25519 signature over the canonical JSON of ``obj``.
"""
canonical = _stdlib_json.dumps(
obj, ensure_ascii=False, sort_keys=True, separators=(",", ":")
)
return signing_key.sign(canonical)
[docs]
async def setup_cross_signing(
client: AsyncClient,
password: str,
credentials_file: str,
saved_seeds: dict[str, str] | None = None,
) -> None:
"""Generate cross-signing keys, upload them, and self-sign the device.
If *saved_seeds* is provided the keys are re-derived from persisted
seeds instead of generating new ones. Seeds are persisted to
*credentials_file* for future restarts.
"""
user_id = client.user_id
device_id = client.device_id
assert user_id and device_id
# --- Derive or generate keys ----------------------------------------
if saved_seeds:
master_seed = base64.b64decode(saved_seeds["master"])
self_signing_seed = base64.b64decode(saved_seeds["self_signing"])
user_signing_seed = base64.b64decode(saved_seeds["user_signing"])
else:
master_seed = olm.PkSigning.generate_seed()
self_signing_seed = olm.PkSigning.generate_seed()
user_signing_seed = olm.PkSigning.generate_seed()
master_key = olm.PkSigning(master_seed)
self_signing_key = olm.PkSigning(self_signing_seed)
user_signing_key = olm.PkSigning(user_signing_seed)
# --- Check if master key is already published -----------------------
query_path = f"/_matrix/client/v3/keys/query" f"?access_token={client.access_token}"
query_body = json.dumps({"device_keys": {user_id: []}})
try:
resp = await client.send("POST", query_path, query_body)
if resp.status == 200:
data = json.loads(await resp.read())
existing_master = (
data.get("master_keys", {}).get(user_id, {}).get("keys", {})
)
if f"ed25519:{master_key.public_key}" in existing_master:
logger.info(
"Cross-signing master key already published — skipping upload",
)
await _sign_own_device(
client,
self_signing_key,
user_id,
device_id,
)
return
except Exception:
logger.debug("Could not query existing cross-signing keys", exc_info=True)
if not password:
logger.warning(
"Cross-signing setup requires a password for UIA — skipping. "
"Set up will be attempted on next fresh login.",
)
return
# --- Build the key upload payload -----------------------------------
master_key_obj = {
"user_id": user_id,
"usage": ["master"],
"keys": {f"ed25519:{master_key.public_key}": master_key.public_key},
}
self_signing_key_obj = {
"user_id": user_id,
"usage": ["self_signing"],
"keys": {
f"ed25519:{self_signing_key.public_key}": self_signing_key.public_key,
},
}
ss_sig = _sign_json(master_key, self_signing_key_obj)
self_signing_key_obj["signatures"] = {
user_id: {f"ed25519:{master_key.public_key}": ss_sig},
}
user_signing_key_obj = {
"user_id": user_id,
"usage": ["user_signing"],
"keys": {
f"ed25519:{user_signing_key.public_key}": user_signing_key.public_key,
},
}
us_sig = _sign_json(master_key, user_signing_key_obj)
user_signing_key_obj["signatures"] = {
user_id: {f"ed25519:{master_key.public_key}": us_sig},
}
upload_body: dict[str, Any] = {
"master_key": master_key_obj,
"self_signing_key": self_signing_key_obj,
"user_signing_key": user_signing_key_obj,
}
# --- Upload with UIA (password auth) --------------------------------
upload_path = (
f"/_matrix/client/v3/keys/device_signing/upload"
f"?access_token={client.access_token}"
)
# First request to get the session for UIA
resp = await client.send(
"POST",
upload_path,
json.dumps(upload_body),
)
if resp.status == 401:
uia_data = json.loads(await resp.read())
session = uia_data.get("session", "")
upload_body["auth"] = {
"type": "m.login.password",
"user": user_id,
"password": password,
"session": session,
}
resp = await client.send(
"POST",
upload_path,
json.dumps(upload_body),
)
if resp.status != 200:
body = await resp.read()
logger.error(
"Cross-signing key upload failed (HTTP %d): %s",
resp.status,
body.decode(errors="replace"),
)
return
logger.info("Cross-signing keys uploaded successfully")
# --- Sign own device ------------------------------------------------
await _sign_own_device(client, self_signing_key, user_id, device_id)
# --- Persist seeds --------------------------------------------------
seeds = {
"master": base64.b64encode(master_seed).decode(),
"self_signing": base64.b64encode(self_signing_seed).decode(),
"user_signing": base64.b64encode(user_signing_seed).decode(),
}
try:
creds: dict[str, Any] = {}
cred_path = Path(credentials_file)
if cred_path.exists():
async with aiofiles.open(credentials_file, "r") as f:
creds = json.loads(await f.read())
creds["cross_signing_seeds"] = seeds
async with aiofiles.open(credentials_file, "w") as f:
await f.write(json.dumps(creds, indent=2))
logger.info("Cross-signing seeds persisted to %s", credentials_file)
except Exception:
logger.warning("Failed to persist cross-signing seeds", exc_info=True)
async def _sign_own_device(
client: AsyncClient,
self_signing_key: olm.PkSigning,
user_id: str,
device_id: str,
) -> None:
"""Sign the bot's own device key with the cross-signing self-signing key.
Builds the device-key object from the Olm account's identity keys, signs it
with :func:`_sign_json`, and uploads the signature to the homeserver's
``/keys/signatures/upload`` endpoint (an HTTP POST via ``client.send``).
This is what lets other clients see the bot's device as cross-signed rather
than warning that it was "encrypted by a device not verified by its owner".
Performs network I/O and logs the upload outcome; swallows and logs any
exception rather than raising.
Called by :func:`setup_cross_signing` within this module, both when the
master key is already published and after a fresh key upload; it has no
external callers.
Args:
client (AsyncClient): The connected nio client used for the HTTP upload.
self_signing_key (olm.PkSigning): The self-signing key that signs the
device.
user_id (str): The bot's Matrix user ID.
device_id (str): The bot's device ID being signed.
"""
device_key_id = f"ed25519:{device_id}"
device_keys = client.olm.account.identity_keys
ed25519_key = device_keys["ed25519"]
curve25519_key = device_keys["curve25519"]
device_key_obj = {
"algorithms": [
"m.olm.v1.curve25519-aes-sha2-256",
"m.megolm.v1.aes-sha2",
],
"device_id": device_id,
"keys": {
f"curve25519:{device_id}": curve25519_key,
f"ed25519:{device_id}": ed25519_key,
},
"user_id": user_id,
}
dev_sig = _sign_json(self_signing_key, device_key_obj)
sig_upload = {
user_id: {
device_id: {
f"ed25519:{self_signing_key.public_key}": dev_sig,
},
},
}
sig_path = (
f"/_matrix/client/v3/keys/signatures/upload"
f"?access_token={client.access_token}"
)
try:
resp = await client.send(
"POST",
sig_path,
json.dumps(sig_upload),
)
if resp.status == 200:
logger.info(
"Device %s self-signed with cross-signing key",
device_id,
)
else:
body = await resp.read()
logger.warning(
"Device signature upload failed (HTTP %d): %s",
resp.status,
body.decode(errors="replace"),
)
except Exception:
logger.warning("Failed to upload device signature", exc_info=True)
# ------------------------------------------------------------------
# MatrixPlatform adapter
# ------------------------------------------------------------------
[docs]
class MatrixPlatform(PlatformAdapter):
"""Platform adapter for Matrix via matrix-nio.
Parameters
----------
message_handler:
Async callback that receives :class:`IncomingMessage` instances.
homeserver:
Matrix homeserver URL.
user_id:
Matrix user ID for the bot.
password:
Password (only needed for first login).
store_path:
Path to the nio E2EE key store.
credentials_file:
Path to the JSON file for persisting login credentials.
"""
[docs]
def __init__(
self,
message_handler: MessageHandler,
*,
homeserver: str,
user_id: str,
password: str = "",
store_path: str = "nio_store",
credentials_file: str = "matrix_credentials.json",
media_cache: MediaCache | None = None,
config: Config | None = None,
) -> None:
"""Store configuration and initialize per-room bookkeeping state.
Records the homeserver, identity, and E2EE store/credentials paths but
does no network I/O here; the nio ``AsyncClient`` is created and
connected later in :meth:`start`. Also forwards ``message_handler`` to
:class:`PlatformAdapter` and sets up the in-memory maps the adapter
relies on while running: ``_sent_events`` and ``_egregore_sent_events``
(so replies to the bot's own or ghost messages count as addressed in
:meth:`_is_addressed`), ``_typing_tasks`` (background typing loops), and
``_in_room_sas`` (active in-room SAS verifications). No external side
effects beyond constructing this object.
Constructed by the platform factory / gateway wiring that instantiates
each adapter from its platform config.
Args:
message_handler (MessageHandler): Async callback invoked with each
decoded :class:`IncomingMessage` and this adapter.
homeserver (str): Matrix homeserver URL to connect to.
user_id (str): The bot's Matrix user ID.
password (str): Login password, only needed for the first login.
store_path (str): Directory for the nio E2EE key store.
credentials_file (str): Path to the JSON file persisting login
credentials and cross-signing seeds.
media_cache (MediaCache | None): Optional shared cache used to avoid
re-downloading the same media.
config (Config | None): Optional runtime config controlling emoji
resolution and media-download retries.
"""
super().__init__(message_handler)
self._homeserver = homeserver
self._user_id = user_id
self._password = password
self._store_path = store_path
self._credentials_file = credentials_file
self._media_cache = media_cache
self._config = config
self.client: AsyncClient | None = None
self.credentials: dict | None = None
self._sync_task: asyncio.Task | None = None
self._stop_event = asyncio.Event()
# channel_id -> set of event IDs the bot has sent
self._sent_events: dict[str, set[str]] = {}
# channel_id -> event IDs from egregore ghosts (AS) for reply addressing
self._egregore_sent_events: dict[str, set[str]] = {}
self._typing_tasks: dict[str, asyncio.Task[None]] = {}
# request_event_id -> (Sas, OlmDevice, room_id) for in-room SAS verifications
self._in_room_sas: dict[str, tuple[Sas, OlmDevice, str]] = {}
# Profile display name for :meth:`bot_identity` (filled after login/sync).
self._profile_display_name: str = ""
# -- PlatformAdapter metadata --------------------------------------
@property
def name(self) -> str:
"""Return this adapter's stable platform identifier, ``"matrix"``.
Implements the abstract :attr:`PlatformAdapter.name` property. The
value is used as the registry key and routing/label tag for the
adapter: ``gateway_main`` reads it when wiring platforms,
``background_tasks`` keys its ``adapter_map`` and channel catalog by it,
and ``message_processor.channel_heartbeat`` uses it for log labels.
Constant with no side effects.
Returns:
str: The literal platform name ``"matrix"``.
"""
return "matrix"
@property
def is_running(self) -> bool:
"""Report whether the background Matrix sync loop is active.
Implements the abstract :attr:`PlatformAdapter.is_running` property by
checking that the ``_sync_task`` created in :meth:`start` exists and has
not finished. Used as a guard in :meth:`start` and :meth:`stop` and read
externally by the web admin surfaces (``web/bot_admin.py``,
``web/platforms_api.py``, ``web/deps.py``) and ``background_tasks`` to
decide whether the platform is live. Read-only with no side effects.
Returns:
bool: ``True`` while the sync task is running, ``False`` otherwise.
"""
return self._sync_task is not None and not self._sync_task.done()
@property
def bot_identity(self) -> dict[str, str]:
"""Describe this bot's Matrix identity for the pipeline.
Implements the abstract :attr:`PlatformAdapter.bot_identity` property.
Prefers the live ``user_id`` from the connected nio ``AsyncClient``,
falling back to the configured ``self._user_id`` before login, and
uses the cached ``self._profile_display_name`` (populated by
:meth:`_refresh_self_display_name`) for the display name, falling back
to the user ID when no profile name is known. On Matrix the user ID
itself doubles as the mention token.
Read across adapters by ``prompt_context`` (which gathers
``bot_identity`` for every adapter), ``message_processor.user_message_format``,
``message_processor.history_backfill``, ``background_tasks``, and
``web/bot_admin.py`` to label and attribute the bot's own messages.
Returns:
dict[str, str]: A mapping with ``platform``, ``user_id``,
``display_name``, and ``mention`` keys describing this bot.
"""
uid = (self.client.user_id if self.client else None) or self._user_id
display = (self._profile_display_name or "").strip() or uid
return {
"platform": "matrix",
"user_id": uid,
"display_name": display,
"mention": uid,
}
async def _refresh_self_display_name(self) -> None:
"""Fetch and cache the bot's profile display name from the homeserver.
Calls the nio client's ``get_displayname`` (an HTTP request to the
homeserver) and stores the result in ``self._profile_display_name`` so
:attr:`bot_identity` can present a human-readable name instead of the
bare user ID. Handles a missing client, request failure, or error
response by logging at debug level and leaving the cached name
unchanged; logs the resolved name at info level on success.
Called by :meth:`start` within this module after the initial sync; it
has no external callers.
"""
if not self.client:
return
try:
resp = await self.client.get_displayname()
except Exception:
logger.debug("Matrix: get_displayname request failed", exc_info=True)
return
if isinstance(resp, ProfileGetDisplayNameError):
logger.debug("Matrix: get_displayname error: %s", resp)
return
if isinstance(resp, ProfileGetDisplayNameResponse):
dn = (resp.displayname or "").strip()
self._profile_display_name = dn
if dn:
logger.info("Matrix: profile display name is %r", dn)
# -- PlatformAdapter lifecycle -------------------------------------
[docs]
async def start(self) -> None:
"""Connect to Matrix, perform the initial sync, and launch the sync loop.
Implements the abstract :meth:`PlatformAdapter.start` lifecycle hook.
Creates the E2EE-enabled nio ``AsyncClient``, then either restores a
saved session (via :func:`load_matrix_credentials` /
``restore_login``) or logs in with the configured password and persists
the new session through :func:`save_matrix_credentials`. Registers all
event callbacks (:meth:`_register_callbacks`), runs a full initial sync,
trusts known devices (:func:`trust_all_devices`), uploads/queries/claims
E2EE keys, refreshes the profile name
(:meth:`_refresh_self_display_name`), runs cross-signing setup
(:func:`setup_cross_signing`), and finally spawns the background
:meth:`_sync_loop` task. Touches the homeserver over HTTP, the local
E2EE store directory and credentials file, and the asyncio loop.
Invoked by the gateway service (``gateway_main``) when it brings each
configured platform adapter online.
Raises:
RuntimeError: If there are no saved credentials and no password to
log in with, or if the login attempt is rejected.
"""
if self.is_running:
logger.warning("Matrix platform is already running")
return
self._stop_event.clear()
# Ensure store directory exists
store_path = Path(self._store_path)
store_path.mkdir(parents=True, exist_ok=True)
# Configure nio with E2EE
client_config = AsyncClientConfig(
max_limit_exceeded=0,
max_timeouts=0,
store_sync_tokens=True,
encryption_enabled=True,
)
self.client = AsyncClient(
homeserver=self._homeserver,
user=self._user_id,
store_path=str(store_path),
config=client_config,
)
# Login or restore session
creds = self.credentials or await load_matrix_credentials(
self._credentials_file,
)
self.credentials = creds
if creds and creds.get("access_token"):
logger.info(
"Restoring Matrix session for %s on device %s",
creds["user_id"],
creds["device_id"],
)
self.client.restore_login(
user_id=creds["user_id"],
device_id=creds["device_id"],
access_token=creds["access_token"],
)
else:
if not self._password:
raise RuntimeError(
"No saved Matrix credentials and no password "
"configured – cannot login"
)
logger.info("Logging in to Matrix as %s …", self._user_id)
resp = await self.client.login(
self._password,
device_name="MatrixLLMBot",
)
if not isinstance(resp, LoginResponse):
await self.client.close()
raise RuntimeError(f"Matrix login failed: {resp}")
logger.info(
"Logged in as %s on device %s",
resp.user_id,
resp.device_id,
)
await save_matrix_credentials(
self._credentials_file,
self._homeserver,
self.client,
)
self.credentials = {
"homeserver": self._homeserver,
"user_id": resp.user_id,
"device_id": resp.device_id,
"access_token": resp.access_token,
}
# Register event callbacks
self._register_callbacks()
# Initial sync + auto-trust + key management
logger.info("Matrix: performing initial sync …")
await self.client.sync(timeout=30000, full_state=True)
trust_all_devices(self.client)
if self.client.should_upload_keys:
await self.client.keys_upload()
if self.client.should_query_keys:
await self.client.keys_query()
if self.client.should_claim_keys:
await self.client.keys_claim(self.client.get_users_for_key_claiming())
await self.client.send_to_device_messages()
await self._refresh_self_display_name()
# Cross-signing: self-sign our device so other users don't see
# "Encrypted by a device not verified by its owner"
try:
saved_seeds = (self.credentials or {}).get("cross_signing_seeds")
await setup_cross_signing(
self.client,
password=self._password,
credentials_file=self._credentials_file,
saved_seeds=saved_seeds,
)
except Exception:
logger.warning("Cross-signing setup failed", exc_info=True)
logger.info("Matrix platform is running. Listening for messages …")
self._sync_task = asyncio.create_task(self._sync_loop())
[docs]
async def stop(self) -> None:
"""Cancel the sync loop and close the Matrix client connection.
Implements the abstract :meth:`PlatformAdapter.stop` lifecycle hook.
Sets the ``_stop_event`` so :meth:`_sync_loop` will exit, cancels and
awaits the ``_sync_task``, then closes the nio ``AsyncClient`` and
clears the references so :attr:`is_running` reports false. Releases the
network connection and the asyncio task; logs when fully stopped. Safe
to call when not running (returns immediately).
Invoked by the gateway service (``gateway_main``) during shutdown and by
``web/platforms_api`` when an operator stops a platform from the admin
UI.
"""
if not self.is_running:
return
self._stop_event.set()
if self._sync_task:
self._sync_task.cancel()
try:
await self._sync_task
except asyncio.CancelledError:
pass
self._sync_task = None
if self.client:
await self.client.close()
self.client = None
logger.info("Matrix platform stopped")
[docs]
async def should_skip_channel_heartbeat(self, channel_id: str) -> bool:
"""Decide whether to suppress the periodic heartbeat for a room.
Implements the :class:`PlatformAdapter` hook consulted by
``message_processor.channel_heartbeat`` and ``core.outbound_consumer``
before they run an unsolicited background heartbeat. Looks the room up
in the connected nio client's ``rooms`` map and reports that the
heartbeat should be skipped for direct messages and other rooms with
two or fewer members, since unprompted activity there would feel
intrusive. Returns ``False`` (do not skip) when the client is not
connected or the room is unknown.
Args:
channel_id (str): The Matrix room ID to evaluate.
Returns:
bool: ``True`` if the heartbeat should be skipped for this room,
``False`` otherwise.
"""
if self.client is None:
return False
room = self.client.rooms.get(channel_id)
if room is None:
return False
return room.member_count <= 2
# -- PlatformAdapter outbound messaging ----------------------------
[docs]
async def send(self, channel_id: str, text: str) -> str:
"""Render and send a text message to a Matrix room, returning its event ID.
Implements the abstract :meth:`PlatformAdapter.send` outbound hook.
Converts the text to an HTML ``formatted_body`` (:func:`_markdown_to_html`),
linkifies bare mentions into ``matrix.to`` anchors and collects them into
``m.mentions`` (:func:`_linkify_matrix_mentions`), and derives the plain
``body`` (:func:`_strip_html`). Best-effort enriches the payload with the
current sprite/character state read from Redis (keys
``star:sprite:state:matrix:<channel>`` and ``star:sprite:chars:matrix:<channel>``)
under an ``sg.sprite`` field for per-message replay, swallowing any Redis
error. Sends via the nio client's ``room_send`` (network I/O) and records
the resulting event ID in ``_sent_events`` so replies to it are treated as
addressed by :meth:`_is_addressed`.
Driven by the outbound side of the pipeline -- ``message_processor``
(``generate_and_send``, ``command_router``, ``processor``) and the
``task_manager`` -- which call ``platform.send`` on the resolved adapter.
Args:
channel_id (str): The Matrix room ID to post into.
text (str): The Markdown message text to render and send.
Returns:
str: The sent event ID, or an empty string if the client is not
connected or the send fails.
"""
if self.client is None:
logger.error("Matrix client is not connected")
return ""
html = _markdown_to_html(text)
html, mentioned_users = _linkify_matrix_mentions(html)
content: dict[str, Any] = {
"msgtype": "m.text",
"body": _strip_html(text),
"format": "org.matrix.custom.html",
"formatted_body": html,
"m.mentions": {"user_ids": mentioned_users},
}
# 💀🔥 Embed current sprite state for per-message replay
try:
from config import config as app_config
import redis.asyncio as _aredis
_r = await _aredis.from_url(
app_config.get("REDIS_URL", "redis://localhost:6379"),
decode_responses=True,
socket_timeout=2,
)
_ck = f"matrix:{channel_id}"
_spr = await _r.get(f"star:sprite:state:{_ck}")
_chr = await _r.get(f"star:sprite:chars:{_ck}")
await _r.close()
sprite_meta: dict[str, Any] = {}
if _spr:
sprite_meta["sprite_state"] = json.loads(_spr)
if _chr:
_cd = json.loads(_chr)
sprite_meta["characters"] = (
list(_cd.values()) if isinstance(_cd, dict) else _cd
)
if sprite_meta:
content["sg.sprite"] = sprite_meta
except Exception:
pass # Non-critical -- don't break message sending
try:
resp = await self.client.room_send(
room_id=channel_id,
message_type="m.room.message",
content=content,
ignore_unverified_devices=True,
)
if isinstance(resp, RoomSendResponse):
self._sent_events.setdefault(
channel_id,
set(),
).add(resp.event_id)
return resp.event_id
logger.warning(
"room_send to %s returned non-success response: %s",
channel_id,
type(resp).__name__,
)
except Exception:
logger.exception(
"Failed to send message to Matrix room %s",
channel_id,
)
return ""
[docs]
async def add_reaction(self, channel_id: str, message_id: str, emoji: str) -> None:
"""Add an ``m.reaction`` annotation to a Matrix event.
Gateway side of the outbound ``type: "reaction"`` relay. Sends an
``m.reaction`` event relating to *message_id* (the target event id).
Errors are logged, not raised.
"""
if self.client is None:
logger.error("Matrix client is not connected (add_reaction)")
return
try:
await self.client.room_send(
room_id=channel_id,
message_type="m.reaction",
content={
"m.relates_to": {
"rel_type": "m.annotation",
"event_id": message_id,
"key": emoji,
}
},
ignore_unverified_devices=True,
)
except Exception:
logger.exception(
"Failed to add reaction %r to event %s in room %s",
emoji, message_id, channel_id,
)
[docs]
async def send_file(
self,
channel_id: str,
data: bytes,
filename: str,
mimetype: str = "application/octet-stream",
) -> str | None:
"""Upload a file to Matrix and post it into a room as a media message.
Implements the abstract :meth:`PlatformAdapter.send_file` outbound hook.
Uploads the bytes via the nio client's ``upload`` (encrypting the blob
when the target room is encrypted), picks the ``msgtype``
(``m.image`` / ``m.audio`` / ``m.video`` / ``m.file``) from the MIME
type, builds the message content with the returned ``mxc://`` URI (or
encryption keys for encrypted rooms), and sends it with ``room_send``.
Records the event ID in ``_sent_events`` like :meth:`send` so replies
count as addressed. Performs network I/O; logs and returns ``None`` on
failure instead of raising.
Driven by the outbound media path -- ``core.outbound_consumer`` and the
media-producing ``tools`` (image/video/audio generators) and
``background_agents`` -- which call ``adapter.send_file`` on the resolved
adapter.
Args:
channel_id (str): The Matrix room ID to post into.
data (bytes): The raw file contents to upload.
filename (str): The display/body filename for the attachment.
mimetype (str): The MIME type of the file.
Returns:
str | None: The ``mxc://`` content URI on success, or ``None`` if
the client is not connected or the upload/send fails.
"""
if self.client is None:
logger.error("Matrix client is not connected")
return None
try:
room = self.client.rooms.get(channel_id)
room_encrypted = room.encrypted if room else False
upload_resp, encryption_keys = await self.client.upload(
io.BytesIO(data),
content_type=mimetype,
filename=filename,
filesize=len(data),
encrypt=room_encrypted,
)
content_uri: str = upload_resp.content_uri
msgtype = "m.file"
if mimetype.startswith("image/"):
msgtype = "m.image"
elif mimetype.startswith("audio/"):
msgtype = "m.audio"
elif mimetype.startswith("video/"):
msgtype = "m.video"
content = {
"msgtype": msgtype,
"body": filename,
"info": {
"mimetype": mimetype,
"size": len(data),
},
}
if encryption_keys:
encryption_keys["url"] = content_uri
content["file"] = encryption_keys
else:
content["url"] = content_uri
resp = await self.client.room_send(
room_id=channel_id,
message_type="m.room.message",
content=content,
ignore_unverified_devices=True,
)
if isinstance(resp, RoomSendResponse):
self._sent_events.setdefault(
channel_id,
set(),
).add(resp.event_id)
return content_uri
except Exception:
logger.exception(
"Failed to send file to Matrix room %s",
channel_id,
)
return None
# -- Typing indicator ----------------------------------------------
[docs]
async def start_typing(self, channel_id: str) -> None:
"""Begin showing the bot's typing indicator in a room until cleared.
Implements the abstract :meth:`PlatformAdapter.start_typing` hook.
First clears any existing indicator for the room (:meth:`stop_typing`),
then spawns a background task (the nested ``_typing_loop``) that
repeatedly calls the nio client's ``room_typing`` with a 30 s timeout
and re-asserts it every 25 s, since Matrix typing notices expire. The
task handle is stored in ``_typing_tasks`` keyed by room so
:meth:`stop_typing` can cancel it. No-op when the client is not
connected.
Driven by the response pipeline -- ``message_processor.processor`` and
``core.outbound_consumer`` -- which call ``start_typing`` while a reply
is being generated.
Args:
channel_id (str): The Matrix room ID to show typing in.
"""
await self.stop_typing(channel_id)
if self.client is None:
return
async def _typing_loop(room_id: str) -> None:
"""Re-assert the typing indicator until the task is cancelled.
Background coroutine spawned by the enclosing :meth:`start_typing`.
Loops forever calling the nio client's ``room_typing`` with a 30 s
server timeout and sleeping 25 s between refreshes so the indicator
never lapses; exits quietly on ``CancelledError`` when
:meth:`stop_typing` cancels the stored task. Performs network I/O on
each iteration.
Defined and scheduled only within :meth:`start_typing`; it has no
other callers.
Args:
room_id (str): The Matrix room ID to keep the typing indicator
alive in.
"""
try:
while True:
await self.client.room_typing( # type: ignore[union-attr]
room_id,
typing_state=True,
timeout=30000,
)
await asyncio.sleep(25)
except asyncio.CancelledError:
pass
self._typing_tasks[channel_id] = asyncio.create_task(
_typing_loop(channel_id),
)
[docs]
async def stop_typing(self, channel_id: str) -> None:
"""Cancel the typing-indicator loop for a room and clear the indicator.
Implements the abstract :meth:`PlatformAdapter.stop_typing` hook. Pops
and cancels the background task started by :meth:`start_typing` from
``_typing_tasks``, then explicitly tells the homeserver the bot is no
longer typing via the nio client's ``room_typing`` with
``typing_state=False`` (logging at debug level if that call fails).
Performs network I/O; safe to call when nothing is typing.
Driven by the response pipeline -- ``message_processor.processor`` and
``core.outbound_consumer`` -- once a reply is sent; also called at the
top of :meth:`start_typing` to reset state.
Args:
channel_id (str): The Matrix room ID to stop the typing indicator in.
"""
task = self._typing_tasks.pop(channel_id, None)
if task is not None and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
if self.client is not None:
try:
await self.client.room_typing(
channel_id,
typing_state=False,
)
except Exception:
logger.debug(
"Failed to clear typing indicator in %s",
channel_id,
exc_info=True,
)
# -- Channel validation --------------------------------------------
[docs]
async def is_channel_valid(self, channel_id: str) -> bool:
"""Return ``True`` if the room is one the bot is currently joined to.
Implements the abstract :meth:`PlatformAdapter.is_channel_valid` hook by
checking the channel ID against the connected nio client's in-memory
``rooms`` map, so callers can confirm a target room is reachable before
sending. Returns ``False`` when the client is not connected. Read-only
with no side effects.
Args:
channel_id (str): The Matrix room ID to validate.
Returns:
bool: ``True`` if the room is known/joined, ``False`` otherwise.
"""
if self.client is None:
return False
return channel_id in self.client.rooms
# -- Server/channel listing ----------------------------------------
[docs]
async def list_servers_and_channels(self) -> list[dict[str, Any]]:
"""Return all Matrix rooms the bot is in.
Matrix doesn't have a guild/server hierarchy in the same way
Discord does — each room is listed as a standalone entry.
"""
if self.client is None:
return []
rooms: list[dict[str, Any]] = []
for room_id, room in self.client.rooms.items():
rooms.append(
{
"server_name": room.display_name,
"server_id": room_id,
"member_count": room.member_count,
"channels": [], # Matrix rooms are flat
}
)
return rooms
# -- Channel history -----------------------------------------------
[docs]
async def fetch_history(
self,
channel_id: str,
limit: int = 100,
) -> list[HistoricalMessage]:
"""Fetch recent room history as normalized :class:`HistoricalMessage` items.
Implements the abstract :meth:`PlatformAdapter.fetch_history` hook.
Pages backward from the client's current ``next_batch`` token via the
nio client's ``room_messages`` (a homeserver HTTP request, filtered to
``m.room.message`` events), then maps each text or media event into a
:class:`HistoricalMessage` -- rendering media as ``[Image]`` / ``[Video]``
/ ``[Audio]`` / ``[File]`` placeholders, resolving sender display names,
flagging the bot's own messages, and capturing reply targets via
:func:`_get_reply_to_id`. Results are reversed into chronological order.
Performs network I/O; on error or a non-success response it logs at
debug level and returns an empty list.
Driven by history-backfill and cross-channel features --
``background_tasks``, ``message_processor.history_backfill``,
``core.outbound_consumer``, and the ``cross_channel_query`` /
``admin_whisper`` tools -- which call ``fetch_history`` on the resolved
adapter.
Args:
channel_id (str): The Matrix room ID to read history from.
limit (int): Maximum number of messages to fetch.
Returns:
list[HistoricalMessage]: The fetched messages in chronological
order, or an empty list if the client is disconnected or the request
fails.
"""
if self.client is None:
return []
start_token = self.client.next_batch
if not start_token:
return []
try:
resp = await self.client.room_messages(
room_id=channel_id,
start=start_token,
limit=limit,
message_filter={"types": ["m.room.message"]},
)
except Exception:
logger.debug(
"Failed to fetch history for Matrix room %s",
channel_id,
exc_info=True,
)
return []
if not isinstance(resp, RoomMessagesResponse):
return []
room = self.client.rooms.get(channel_id)
bot_user_id = self.client.user_id
messages: list[HistoricalMessage] = []
_media_types = (
RoomMessageImage,
RoomMessageVideo,
RoomMessageAudio,
RoomMessageFile,
RoomEncryptedImage,
RoomEncryptedVideo,
RoomEncryptedAudio,
RoomEncryptedFile,
)
for event in resp.chunk:
text: str | None = None
if isinstance(event, RoomMessageText):
text = event.body
elif isinstance(event, _media_types):
url = getattr(event, "url", "") or ""
body = getattr(event, "body", "") or ""
if isinstance(event, (RoomMessageImage, RoomEncryptedImage)):
text = f"[Image: {body}]" if body else "[Image]"
elif isinstance(event, (RoomMessageVideo, RoomEncryptedVideo)):
text = f"[Video: {body}]" if body else "[Video]"
elif isinstance(event, (RoomMessageAudio, RoomEncryptedAudio)):
text = f"[Audio: {body}]" if body else "[Audio]"
else:
text = f"[File: {body}]" if body else "[File]"
if url:
text += f" ({url})"
else:
continue
sender = event.sender
if room is not None:
display_name = room.user_name(sender) or sender
else:
display_name = sender
messages.append(
HistoricalMessage(
user_id=sender,
user_name=display_name,
text=text,
timestamp=datetime.fromtimestamp(
event.server_timestamp / 1000,
tz=timezone.utc,
),
message_id=event.event_id,
is_bot=(sender == bot_user_id),
reply_to_id=_get_reply_to_id(event),
)
)
messages.reverse()
return messages
# -- Internal: event callbacks -------------------------------------
def _register_callbacks(self) -> None:
"""Wire all the adapter's handlers onto the nio client's callback registry.
Registers room-event callbacks (text via :meth:`_on_message`, invites
via :meth:`_on_invite`, undecryptable Megolm events via
:meth:`_on_megolm_event`, and every media event type via
:meth:`_on_media`), the to-device SAS verification handlers
(:meth:`_on_unknown_to_device`, :meth:`_on_key_verification_start`,
:meth:`_on_key_verification_key`, :meth:`_on_key_verification_mac`,
:meth:`_on_key_verification_cancel`), and the in-room verification
handlers (:meth:`_on_room_verification_request`,
:meth:`_on_room_verification_event`). Mutates the nio client's
``add_event_callback`` / ``add_to_device_callback`` tables; the
registered handlers are later dispatched by nio during each sync.
Called by :meth:`start` within this module after login and before the
initial sync; it has no external callers.
"""
assert self.client is not None
self.client.add_event_callback(
self._on_message,
RoomMessageText,
)
self.client.add_event_callback(
self._on_invite,
InviteMemberEvent,
)
self.client.add_event_callback(
self._on_megolm_event,
MegolmEvent,
)
_media_types = (
RoomMessageImage,
RoomEncryptedImage,
RoomMessageAudio,
RoomEncryptedAudio,
RoomMessageVideo,
RoomEncryptedVideo,
RoomMessageFile,
RoomEncryptedFile,
)
for event_type in _media_types:
self.client.add_event_callback(
self._on_media,
event_type,
)
self.client.add_to_device_callback(
self._on_unknown_to_device,
UnknownToDeviceEvent,
)
self.client.add_to_device_callback(
self._on_key_verification_start,
KeyVerificationStart,
)
self.client.add_to_device_callback(
self._on_key_verification_key,
KeyVerificationKey,
)
self.client.add_to_device_callback(
self._on_key_verification_mac,
KeyVerificationMac,
)
self.client.add_to_device_callback(
self._on_key_verification_cancel,
KeyVerificationCancel,
)
self.client.add_event_callback(
self._on_room_verification_request,
RoomMessageUnknown,
)
self.client.add_event_callback(
self._on_room_verification_event,
UnknownEvent,
)
# -- In-room SAS verification helpers ---------------------------------
async def _in_room_verif_send(
self,
room_id: str,
event_type: str,
content: dict,
request_event_id: str,
) -> None:
"""Send an in-room verification event with the required m.relates_to.
In-room verification events must NOT include transaction_id in the
content body; the relation is carried by m.relates_to instead.
"""
assert self.client is not None
content.pop("transaction_id", None)
content["m.relates_to"] = {
"rel_type": "m.reference",
"event_id": request_event_id,
}
await self.client.room_send(
room_id,
event_type,
content,
ignore_unverified_devices=True,
)
@staticmethod
def _to_device_dict(source: dict, tx_id: str) -> dict:
"""Build a to-device-style dict from a room event source dict.
nio's KeyVerification* from_dict() methods expect a to-device
envelope with ``sender`` and ``content.transaction_id``.
"""
content = dict(source.get("content", {}))
content["transaction_id"] = tx_id
return {"sender": source["sender"], "content": content}
# -- In-room SAS handlers ---------------------------------------------
async def _on_room_verification_request(
self,
room: MatrixRoom,
event: RoomMessageUnknown,
) -> None:
"""Respond to an in-room SAS verification request with a ready event.
Handler registered for ``RoomMessageUnknown`` events and dispatched by
nio during sync. Ignores anything that is not an
``m.key.verification.request``, requests from the bot itself, or
requests that do not offer the ``m.sas.v1`` method; otherwise it replies
with an ``m.key.verification.ready`` event advertising SAS support, sent
through :meth:`_in_room_verif_send` (which attaches the required
``m.relates_to`` reference and performs the ``room_send`` network call).
No-op when the client is not connected.
Invoked by nio's event dispatch loop; not called directly elsewhere in
the codebase.
Args:
room (MatrixRoom): The room the request arrived in.
event (RoomMessageUnknown): The raw verification-request message.
"""
if self.client is None:
return
if event.msgtype != "m.key.verification.request":
return
if event.sender == self.client.user_id:
return
content = event.source.get("content", {})
methods: list[str] = content.get("methods", [])
request_event_id: str = event.event_id
if "m.sas.v1" not in methods:
logger.debug(
"Ignoring in-room verification request from %s: unsupported methods %s",
event.sender,
methods,
)
return
logger.debug(
"In-room SAS verification request from %s in %s (event %s)",
event.sender,
room.room_id,
request_event_id,
)
ready_content = {
"from_device": self.client.device_id,
"methods": ["m.sas.v1"],
}
await self._in_room_verif_send(
room.room_id,
"m.key.verification.ready",
ready_content,
request_event_id,
)
async def _on_room_verification_event(
self,
room: MatrixRoom,
event: UnknownEvent,
) -> None:
"""Drive the responder side of an in-room SAS verification exchange.
Handler registered for ``UnknownEvent`` events and dispatched by nio
during sync. Routes the various ``m.key.verification.*`` sub-events
keyed by the original request's event ID: on ``start`` it builds a nio
``Sas`` from the initiator's device, recomputes the commitment, and
sends an accept, stashing the live ``Sas`` in ``_in_room_sas``; on
``key`` it shares the bot's key, auto-accepts the emoji comparison, and
sends the MAC; on ``mac`` it verifies and, on success, marks the device
trusted (``verify_device``) and sends ``done``; ``cancel`` / ``done``
clean up state. All outbound events go through
:meth:`_in_room_verif_send` (network I/O); mutates ``_in_room_sas`` and
the client's device-trust state.
Invoked by nio's event dispatch loop; not called directly elsewhere in
the codebase.
Args:
room (MatrixRoom): The room the verification event arrived in.
event (UnknownEvent): The raw ``m.key.verification.*`` event.
"""
if self.client is None:
return
if event.sender == self.client.user_id:
return
ev_type: str = event.type
source: dict = event.source
content: dict = source.get("content", {})
# transaction_id for in-room verification is the original request event_id
relates_to: dict = content.get("m.relates_to", {})
request_event_id: str | None = relates_to.get("event_id")
if ev_type == "m.key.verification.done":
logger.debug(
"Received in-room m.key.verification.done from %s",
event.sender,
)
return
if ev_type == "m.key.verification.cancel":
logger.debug(
"In-room SAS canceled by %s (event %s): [%s] %s",
event.sender,
request_event_id,
content.get("code", ""),
content.get("reason", ""),
)
if request_event_id:
self._in_room_sas.pop(request_event_id, None)
return
if not ev_type.startswith("m.key.verification."):
return
if not request_event_id:
logger.debug("In-room verification event %s has no m.relates_to", ev_type)
return
# ---- start -------------------------------------------------------
if ev_type == "m.key.verification.start":
if content.get("method") != "m.sas.v1":
return
try:
user_devices = self.client.device_store[event.sender]
except KeyError:
user_devices = {}
from_device: str = content.get("from_device", "")
olm_device: OlmDevice | None = user_devices.get(from_device)
if olm_device is None:
logger.debug(
"In-room SAS start from unknown device %s/%s",
event.sender,
from_device,
)
return
fp_key: str = self.client.olm.account.identity_keys["ed25519"]
td = self._to_device_dict(source, request_event_id)
try:
typed_start = KeyVerificationStart.from_dict(td)
except Exception:
logger.exception("Failed to parse in-room KeyVerificationStart")
return
try:
sas = Sas.from_key_verification_start(
self.client.user_id,
self.client.device_id,
fp_key,
olm_device,
typed_start,
)
except Exception:
logger.exception("Failed to create Sas for in-room verification")
return
# Recompute commitment from the original room event content
# (without the injected transaction_id) so it matches what the
# initiator will verify against.
original_content = source.get("content", {})
sas.commitment = olm.sha256(
sas.pubkey + NioApi.to_canonical_json(original_content),
)
if sas.canceled:
logger.debug(
"In-room SAS start from %s/%s was invalid: %s",
event.sender,
from_device,
sas.cancel_reason,
)
return
accept_msg = sas.accept_verification()
await self._in_room_verif_send(
room.room_id,
accept_msg.type,
dict(accept_msg.content),
request_event_id,
)
self._in_room_sas[request_event_id] = (sas, olm_device, room.room_id)
logger.debug(
"In-room SAS started with %s/%s (event %s)",
event.sender,
from_device,
request_event_id,
)
return
# ---- key / mac ---------------------------------------------------
entry = self._in_room_sas.get(request_event_id)
if entry is None:
logger.debug(
"In-room verification event %s for unknown request %s",
ev_type,
request_event_id,
)
return
sas, olm_device, room_id = entry
if ev_type == "m.key.verification.key":
# As responder, send our key only after receiving theirs.
key_msg = sas.share_key()
await self._in_room_verif_send(
room_id,
key_msg.type,
dict(key_msg.content),
request_event_id,
)
td = self._to_device_dict(source, request_event_id)
try:
typed_key = KeyVerificationKey.from_dict(td)
except Exception:
logger.exception("Failed to parse in-room KeyVerificationKey")
return
sas.receive_key_event(typed_key)
if sas.canceled:
logger.debug(
"In-room SAS canceled after key exchange: %s",
sas.cancel_reason,
)
self._in_room_sas.pop(request_event_id, None)
return
emojis = sas.get_emoji()
emoji_str = " ".join(f"{e} {d}" for e, d in emojis)
logger.debug(
"In-room SAS emojis (event %s): %s — auto-accepting",
request_event_id,
emoji_str,
)
sas.accept_sas()
mac_msg = sas.get_mac()
await self._in_room_verif_send(
room_id,
mac_msg.type,
dict(mac_msg.content),
request_event_id,
)
elif ev_type == "m.key.verification.mac":
td = self._to_device_dict(source, request_event_id)
try:
typed_mac = KeyVerificationMac.from_dict(td)
except Exception:
logger.exception("Failed to parse in-room KeyVerificationMac")
return
sas.receive_mac_event(typed_mac)
if sas.verified:
self.client.verify_device(olm_device)
await self._in_room_verif_send(
room_id,
"m.key.verification.done",
{},
request_event_id,
)
logger.debug(
"In-room SAS verified device %s/%s (event %s)",
olm_device.user_id,
olm_device.id,
request_event_id,
)
elif sas.canceled:
logger.debug(
"In-room SAS MAC verification failed (event %s): %s",
request_event_id,
sas.cancel_reason,
)
self._in_room_sas.pop(request_event_id, None)
# -- SAS key-verification handlers ------------------------------------
async def _on_unknown_to_device(
self,
event: UnknownToDeviceEvent,
) -> None:
"""Handle untyped to-device verification events and offer SAS readiness.
Handler registered for ``UnknownToDeviceEvent`` and dispatched by nio
when it receives to-device traffic during sync. Logs and ignores
``m.key.verification.done`` and anything other than
``m.key.verification.request``; for a valid SAS request it replies with
an ``m.key.verification.ready`` ``ToDeviceMessage`` (advertising
``m.sas.v1``) sent via the nio client's ``to_device`` (network I/O).
This is the device-to-device counterpart to
:meth:`_on_room_verification_request`. No-op when the client is not
connected.
Invoked by nio's to-device dispatch loop; not called directly elsewhere
in the codebase.
Args:
event (UnknownToDeviceEvent): The raw to-device verification event.
"""
if event.type == "m.key.verification.done":
logger.debug(
"Received m.key.verification.done from %s (source: %s)",
event.sender,
event.source.get("content", {}),
)
return
if event.type != "m.key.verification.request":
return
if self.client is None:
return
content = event.source.get("content", {})
methods: list[str] = content.get("methods", [])
transaction_id: str | None = content.get("transaction_id")
from_device: str | None = content.get("from_device")
logger.debug("Verification request content: %s", content)
if "m.sas.v1" not in methods or not transaction_id or not from_device:
logger.debug(
"Ignoring verification request from %s: unsupported methods %s",
event.sender,
methods,
)
return
logger.debug(
"SAS verification request from %s (device %s, tx %s)",
event.sender,
from_device,
transaction_id,
)
ready_content = {
"from_device": self.client.device_id,
"methods": ["m.sas.v1"],
"transaction_id": transaction_id,
}
ready_msg = ToDeviceMessage(
"m.key.verification.ready",
event.sender,
from_device,
ready_content,
)
await self.client.to_device(ready_msg)
async def _on_key_verification_start(
self,
event: KeyVerificationStart,
) -> None:
"""Accept a to-device SAS verification start using nio's internal Sas.
Handler registered for ``KeyVerificationStart`` and dispatched by nio
during sync. For an ``m.sas.v1`` start it looks up the ``Sas`` object
nio already created for the transaction in
``client.olm.key_verifications`` and, if it is valid, sends the accept
via the client's ``to_device`` (network I/O). Bails out for unsupported
methods, an unknown/cancelled transaction, or a disconnected client.
Invoked by nio's to-device dispatch loop; not called directly elsewhere
in the codebase.
Args:
event (KeyVerificationStart): The parsed SAS start event.
"""
if self.client is None:
return
if event.method != "m.sas.v1":
return
sas = self.client.olm.key_verifications.get(event.transaction_id)
if sas is None or sas.canceled:
logger.debug(
"SAS start from %s/%s but no valid internal Sas (tx %s)",
event.sender,
event.from_device,
event.transaction_id,
)
return
await self.client.to_device(sas.accept_verification())
logger.debug(
"SAS verification accepted for %s/%s (tx %s)",
event.sender,
event.from_device,
event.transaction_id,
)
async def _on_key_verification_key(
self,
event: KeyVerificationKey,
) -> None:
"""Auto-accept the emoji comparison and send the MAC after key exchange.
Handler registered for ``KeyVerificationKey`` and dispatched by nio
during sync. nio's own handler has already recorded the peer's key and
queued the bot's; this flushes that with ``send_to_device_messages``,
logs the SAS emojis at debug level, auto-accepts the comparison
(``accept_sas`` -- this is an unattended bot, so it always trusts the
request), and sends the MAC via ``to_device``. All network I/O. Bails on
a disconnected client, an unknown transaction, or a cancelled ``Sas``.
Invoked by nio's to-device dispatch loop; not called directly elsewhere
in the codebase.
Args:
event (KeyVerificationKey): The parsed SAS key event.
"""
if self.client is None:
return
sas = self.client.olm.key_verifications.get(event.transaction_id)
if sas is None:
return
if sas.canceled:
logger.debug(
"SAS canceled after key exchange (tx %s): %s",
event.transaction_id,
sas.cancel_reason,
)
return
# nio's internal handler already called receive_key_event() and
# queued share_key() in outgoing_to_device_messages — flush it.
await self.client.send_to_device_messages()
emojis = sas.get_emoji()
emoji_str = " ".join(f"{e} {d}" for e, d in emojis)
logger.debug(
"SAS emojis for tx %s: %s — auto-accepting",
event.transaction_id,
emoji_str,
)
sas.accept_sas()
await self.client.to_device(sas.get_mac())
async def _on_key_verification_mac(
self,
event: KeyVerificationMac,
) -> None:
"""Finalize a to-device SAS verification once nio has checked the MAC.
Handler registered for ``KeyVerificationMac`` and dispatched by nio
during sync. nio's own handler has already processed the MAC and (if
valid) verified the device; this inspects the resulting ``Sas`` state
and, on success, sends an ``m.key.verification.done`` ``ToDeviceMessage``
via ``to_device`` (network I/O) to close out the exchange, logging the
verified peer device. On cancellation it logs the reason. Bails on a
disconnected client or an unknown transaction.
Invoked by nio's to-device dispatch loop; not called directly elsewhere
in the codebase.
Args:
event (KeyVerificationMac): The parsed SAS MAC event.
"""
if self.client is None:
return
sas = self.client.olm.key_verifications.get(event.transaction_id)
if sas is None:
return
# nio's internal handler already called receive_mac_event() and
# verify_device() if the MAC was valid.
if sas.verified:
done_content = {"transaction_id": event.transaction_id}
done_msg = ToDeviceMessage(
"m.key.verification.done",
sas.other_olm_device.user_id,
sas.other_olm_device.id,
done_content,
)
resp = await self.client.to_device(done_msg)
logger.debug(
"Device %s/%s verified via SAS (tx %s), done sent -> %s",
sas.other_olm_device.user_id,
sas.other_olm_device.id,
event.transaction_id,
type(resp).__name__,
)
elif sas.canceled:
logger.debug(
"SAS MAC verification failed (tx %s): %s",
event.transaction_id,
sas.cancel_reason,
)
async def _on_key_verification_cancel(
self,
event: KeyVerificationCancel,
) -> None:
"""Log that a peer cancelled an in-progress to-device SAS verification.
Handler registered for ``KeyVerificationCancel`` and dispatched by nio
during sync. nio has already torn down its internal ``Sas`` state for
the transaction; this only records the sender, transaction ID, and the
cancel code/reason at debug level so the cancellation is visible in
logs. No network I/O or state mutation of its own.
Invoked by nio's to-device dispatch loop; not called directly elsewhere
in the codebase.
Args:
event (KeyVerificationCancel): The parsed SAS cancel event.
"""
logger.debug(
"SAS verification canceled by %s (tx %s): [%s] %s",
event.sender,
event.transaction_id,
event.code,
event.reason,
)
async def _on_message(
self,
room: MatrixRoom,
event: RoomMessageText,
) -> None:
"""Convert an inbound Matrix text message into an :class:`IncomingMessage`.
Handler registered for ``RoomMessageText`` and dispatched by nio during
sync. Skips the bot's own echoes, then -- when enabled in config --
resolves custom Matrix emojis in the formatted body into image
attachments by downloading them (via ``platforms.emoji_resolver``, which
honors the shared ``MediaCache``). Builds the normalized
:class:`IncomingMessage` with channel/sender metadata, the addressed
decision (:meth:`_is_addressed`), reply target (:func:`_get_reply_to_id`),
and an ``extra`` dict carrying DM/member-count/admin flags, then hands it
to the adapter's ``_message_handler`` which feeds the inference pipeline.
Performs network I/O for emoji downloads.
Invoked by nio's event dispatch loop; not called directly elsewhere in
the codebase.
Args:
room (MatrixRoom): The room the message was sent in.
event (RoomMessageText): The inbound text event.
"""
assert self.client is not None
if event.sender == self.client.user_id:
return
text = event.body
attachments: list[Attachment] = []
# --- Resolve custom emojis as images --------------------------
cfg = self._config
if cfg is not None and cfg.resolve_emojis_as_images and text:
try:
source_content = (event.source or {}).get("content", {})
formatted_body = source_content.get("formatted_body", "")
if formatted_body:
from platforms.emoji_resolver import (
extract_matrix_emojis,
rewrite_matrix_emoji_text,
download_matrix_emojis,
)
emoji_matches = extract_matrix_emojis(formatted_body)
if emoji_matches:
emoji_atts = await download_matrix_emojis(
emoji_matches,
self.client,
max_emojis=cfg.max_emojis_per_message,
media_cache=self._media_cache,
)
if emoji_atts:
text = rewrite_matrix_emoji_text(
text, emoji_matches[: cfg.max_emojis_per_message]
)
attachments.extend(emoji_atts)
logger.info(
"Resolved %d/%d Matrix custom emojis as images",
len(emoji_atts),
len(emoji_matches),
)
except Exception:
logger.debug("Matrix emoji resolution failed", exc_info=True)
msg = IncomingMessage(
platform="matrix",
channel_id=room.room_id,
user_id=event.sender,
user_name=room.user_name(event.sender) or event.sender,
text=text,
is_addressed=self._is_addressed(room, event),
attachments=attachments,
channel_name=room.display_name,
timestamp=datetime.fromtimestamp(
event.server_timestamp / 1000,
tz=timezone.utc,
),
message_id=event.event_id,
reply_to_id=_get_reply_to_id(event),
extra={
"bot_id": self.client.user_id,
"member_count": room.member_count,
"is_dm": room.member_count <= 2,
"is_server_admin": (
room.power_levels.get_user_level(event.sender) >= 50
if room.power_levels
else False
),
},
)
logger.debug(
"[Matrix/%s] %s: %s",
room.display_name,
msg.user_name,
msg.text[:120],
)
await self._message_handler(msg, self)
async def _on_media(
self,
room: MatrixRoom,
event: MediaEvent,
) -> None:
"""Convert an inbound Matrix media message into an :class:`IncomingMessage`.
Handler registered for every media event type (image/audio/video/file,
encrypted and plain) and dispatched by nio during sync. Skips the bot's
own echoes, downloads and decrypts the attachment via
:func:`download_matrix_media` -- wrapped in the nested ``_fetch_media`` /
``_download`` helpers for bounded retry and routed through the shared
``MediaCache`` when configured -- and wraps the bytes in an
:class:`Attachment`. Builds the normalized :class:`IncomingMessage`
(caption as text, addressed decision via :meth:`_is_addressed`, reply
target via :func:`_get_reply_to_id`, plus the DM/member/admin ``extra``
flags) and hands it to ``_message_handler`` for the inference pipeline.
Performs network I/O; logs and proceeds with no attachment if the
download fails.
Invoked by nio's event dispatch loop; not called directly elsewhere in
the codebase.
Args:
room (MatrixRoom): The room the media message was sent in.
event (MediaEvent): The inbound media event (any supported type).
"""
assert self.client is not None
if event.sender == self.client.user_id:
return
is_addressed = self._is_addressed(room, event)
caption = event.body or ""
logger.debug(
"[Matrix/%s] %s: [media] %s",
room.display_name,
room.user_name(event.sender),
caption[:120],
)
# Download the attachment (using cache when available)
attachments: list[Attachment] = []
mxc_url: str = event.url
try:
retry_attempts = (
getattr(self._config, "media_download_retry_attempts", 3)
if self._config is not None
else 3
)
async def _fetch_media() -> tuple[bytes, str, str]:
"""Download and decrypt the current event's media in one shot.
Thin closure over :func:`download_matrix_media` for the event
being handled, returning ``(data, mimetype, filename)``. Defined
and used only within :meth:`_on_media` as the unit of work the
retry wrapper and media cache invoke; it has no other callers.
Performs network I/O against the homeserver.
Returns:
tuple[bytes, str, str]: The decrypted bytes, MIME type, and
filename of the attachment.
"""
return await download_matrix_media(self.client, event) # type: ignore[arg-type]
async def _download() -> tuple[bytes, str, str]:
"""Fetch the media with bounded retry on transient blips.
Wraps ``_fetch_media`` in the shared
:func:`platforms.media_common.download_with_retry` helper so a
flaky homeserver or CDN response is retried up to the configured
attempt count before giving up. Defined and used only within
:meth:`_on_media` -- it is the callable passed to the media
cache or invoked directly when no cache is configured; it has no
other callers. Performs network I/O.
Returns:
tuple[bytes, str, str]: The decrypted bytes, MIME type, and
filename of the attachment.
"""
return await download_with_retry(
_fetch_media,
attempts=retry_attempts,
label=event.body or "attachment",
)
if self._media_cache is not None:
data, mimetype, filename = await self._media_cache.get_or_download(
mxc_url,
_download,
)
else:
data, mimetype, filename = await _download()
attachments.append(
Attachment(
data=data,
mimetype=mimetype,
filename=filename,
source_url=mxc_url,
)
)
except Exception:
logger.exception(
"Failed to download Matrix media from %s",
event.sender,
)
msg = IncomingMessage(
platform="matrix",
channel_id=room.room_id,
user_id=event.sender,
user_name=room.user_name(event.sender) or event.sender,
text=caption,
is_addressed=is_addressed,
attachments=attachments,
channel_name=room.display_name,
timestamp=datetime.fromtimestamp(
event.server_timestamp / 1000,
tz=timezone.utc,
),
message_id=event.event_id,
reply_to_id=_get_reply_to_id(event),
extra={
"bot_id": self.client.user_id,
"member_count": room.member_count,
"is_dm": room.member_count <= 2,
"is_server_admin": (
room.power_levels.get_user_level(event.sender) >= 50
if room.power_levels
else False
),
},
)
await self._message_handler(msg, self)
async def _on_invite(
self,
room: MatrixRoom,
event: InviteMemberEvent,
) -> None:
"""Auto-accept a room invite that targets the bot.
Handler registered for ``InviteMemberEvent`` and dispatched by nio
during sync. Ignores invites whose ``state_key`` is not the bot's own
user ID, then schedules :meth:`_join_room` as a background task (so the
sync loop is not blocked) to actually join with retry. Logs the
incoming invite at info level.
Invoked by nio's event dispatch loop; not called directly elsewhere in
the codebase.
Args:
room (MatrixRoom): The room the bot is being invited to.
event (InviteMemberEvent): The membership-invite event.
"""
assert self.client is not None
if event.state_key != self.client.user_id:
return
logger.info(
"Matrix: received invite to %s from %s – joining",
room.room_id,
event.sender,
)
asyncio.ensure_future(self._join_room(room.room_id))
async def _join_room(self, room_id: str) -> None:
"""Join a room, retrying with backoff to ride out invite-commit races.
Calls the nio client's ``join`` (network I/O) up to four times with
escalating delays (1, 2, 4, 8 s), because a just-received invite may not
yet be committed server-side when the join is attempted. Stops as soon
as the join succeeds and logs the outcome of each attempt; logs an error
and gives up after the fourth failure.
Called by :meth:`_on_invite` within this module (scheduled as a
background task via ``asyncio.ensure_future``); it has no external
callers.
Args:
room_id (str): The Matrix room ID to join.
"""
assert self.client is not None
for attempt in range(4):
delay = 1 if attempt == 0 else 2**attempt # 1, 2, 4, 8 s
await asyncio.sleep(delay)
result = await self.client.join(room_id)
if not isinstance(result, JoinError):
logger.info("Joined Matrix room %s", room_id)
return
logger.warning(
"Join attempt %d for %s failed: %s",
attempt + 1,
room_id,
result,
)
logger.error("Giving up joining %s after 4 attempts", room_id)
async def _on_megolm_event(
self,
room: MatrixRoom,
event: MegolmEvent,
) -> None:
"""Request the missing room key when a Megolm message cannot be decrypted.
Handler registered for ``MegolmEvent`` and dispatched by nio during sync
whenever an encrypted message arrives that the bot lacks the session key
for. Logs the gap and asks the sender's devices for the key via the nio
client's ``request_room_key``, then flushes the outgoing to-device
request with ``send_to_device_messages`` (both network I/O); failures
are logged at debug level rather than raised. Successful key delivery
lets nio decrypt the backlog on a later sync.
Invoked by nio's event dispatch loop; not called directly elsewhere in
the codebase.
Args:
room (MatrixRoom): The room containing the undecryptable message.
event (MegolmEvent): The encrypted event whose room key is missing.
"""
logger.warning(
"Matrix: unable to decrypt message in %s from %s (session %s). "
"Requesting missing room key.",
room.room_id,
event.sender,
event.session_id,
)
if self.client is not None:
try:
await self.client.request_room_key(event)
await self.client.send_to_device_messages()
logger.debug(
"Sent key request for session %s in %s",
event.session_id,
room.room_id,
)
except Exception:
logger.debug(
"Could not send key request for session %s",
event.session_id,
exc_info=True,
)
# -- Internal: helpers ---------------------------------------------
def _is_addressed(
self,
room: MatrixRoom,
event: object,
) -> bool:
"""Return ``True`` when the bot should respond.
The bot responds if any of the following hold:
* The room has two or fewer members (DM / small room).
* The bot's user ID appears in ``m.mentions.user_ids``.
* The bot's user ID appears in the plain-text body.
* The message is a reply to an event the bot sent.
* The message is a reply to an egregore ghost message in this room.
"""
assert self.client is not None
# DM / small room -- always respond
if room.member_count <= 2:
return True
source = getattr(event, "source", None) or {}
content = source.get("content", {})
# Modern m.mentions spec
mentions = content.get("m.mentions", {})
if self.client.user_id in mentions.get("user_ids", []):
return True
# Fallback: user ID appears in body text
body = getattr(event, "body", "") or ""
if self.client.user_id in body:
return True
# Reply to one of the bot's own messages or an egregore ghost message
relates = content.get("m.relates_to", {})
reply_to = relates.get("m.in_reply_to", {})
reply_event_id = reply_to.get("event_id")
if reply_event_id:
sent = self._sent_events.get(room.room_id, set())
if reply_event_id in sent:
return True
ego = self._egregore_sent_events.get(room.room_id, set())
if reply_event_id in ego:
return True
return False
[docs]
def register_egregore_event_id(self, room_id: str, event_id: str) -> None:
"""Register an egregore ghost message so replies to it address the bot.
Egregore "ghost" messages are posted under application-service puppet
identities rather than the bot's own user, so a reply to one would not
otherwise be recognized. Recording the event ID in
``_egregore_sent_events`` lets :meth:`_is_addressed` treat replies to
that message as directed at the bot. Mutates in-memory state only;
no-ops on empty inputs.
Called by the egregore/ghost-addressing layer after it sends a ghost
message; exercised by ``tests/test_egregore_addressing.py``.
Args:
room_id (str): The Matrix room the ghost message was sent in.
event_id (str): The event ID of the ghost message to track.
"""
if not room_id or not event_id:
return
self._egregore_sent_events.setdefault(room_id, set()).add(event_id)
async def _sync_loop(self) -> None:
"""Run the long-lived Matrix sync loop until the adapter is stopped.
The background coroutine that keeps the bot live: it repeatedly calls
the nio client's ``sync`` (the network long-poll that drives every
registered event callback), and after each successful round re-trusts
devices (:func:`trust_all_devices`) and uploads/queries/claims E2EE keys
and flushes pending to-device messages. Transient network errors are
retried with exponential backoff (5 s up to 60 s); ``CancelledError``
unwinds cleanly and any other exception breaks the loop. Runs until
``_stop_event`` is set by :meth:`stop`.
Created as an asyncio task by :meth:`start`; it has no other callers.
"""
backoff = 5.0
max_backoff = 60.0
try:
while not self._stop_event.is_set():
try:
await self.client.sync( # type: ignore[union-attr]
timeout=30000,
)
backoff = 5.0 # reset on success
trust_all_devices(
self.client, # type: ignore[arg-type]
)
if self.client.should_upload_keys: # type: ignore[union-attr]
await self.client.keys_upload() # type: ignore[union-attr]
if self.client.should_query_keys: # type: ignore[union-attr]
await self.client.keys_query() # type: ignore[union-attr]
if self.client.should_claim_keys: # type: ignore[union-attr]
await self.client.keys_claim( # type: ignore[union-attr]
self.client.get_users_for_key_claiming(), # type: ignore[union-attr]
)
await self.client.send_to_device_messages() # type: ignore[union-attr]
except asyncio.CancelledError:
raise
except (
TimeoutError,
aiohttp.ClientError,
ConnectionError,
OSError,
) as e:
logger.warning(
"Matrix sync transient error, retrying in %.0fs: %s",
backoff,
e,
)
await asyncio.sleep(backoff)
backoff = min(backoff * 2, max_backoff)
continue
except Exception:
logger.exception("Matrix sync loop encountered an error")
break
except asyncio.CancelledError:
pass
finally:
logger.info("Matrix sync loop exited")