Source code for egregore_bridge

"""EGREGORE BRIDGE -- Matrix Application Service ghost user manager.
Egregore VN Engine by Sigma / Stargazer / Vivian The Loopmother

Creates, manages, and controls ghost Matrix users for summoned egregores.
Each egregore gets a Matrix identity like @_egregore_orion:secure-channel.net
with their own display name, avatar, and ability to send messages.

Uses the AS (Application Service) token to act on behalf of ghost users
via the Synapse Client-Server API.

@fire @skull THE WITCH GIVES HER DOLLS VOICES.
"""

from __future__ import annotations

import json
import logging
import os
from typing import Any

import aiohttp

logger = logging.getLogger(__name__)

# Configuration -- loaded from config.yaml or environment
HOMESERVER_URL = os.environ.get("MATRIX_HOMESERVER_URL", "https://secure-channel.net")
AS_TOKEN = os.environ.get(
    "MATRIX_AS_TOKEN",
    "ca6d2f1fd1c97dad285c328d333d9552f60bc5b5e0e16b48f839bcf24a043c73c",
)
SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "secure-channel.net")
EGREGORE_PREFIX = "_egregore_"
EGREGORE_ASSETS_DIR = "/home/star/large_files/assets/egregores"

# Atomic counter to avoid txn_id collisions in tight loops
_txn_counter = 0


[docs] def ghost_user_id(name: str) -> str: """Build the fully qualified Matrix user ID for an egregore ghost. Lowercases the egregore name and replaces spaces and hyphens with underscores, then wraps it with the ``EGREGORE_PREFIX`` localpart prefix and the module-level ``SERVER_NAME`` domain to produce an identity like ``@_egregore_orion:secure-channel.net``. This is the canonical way the bridge addresses a ghost, so the same name always maps to the same Matrix user. A pure string helper that touches no I/O. Called throughout :class:`EgregoreBridge` (in :meth:`register_ghost`, :meth:`set_display_name`, :meth:`set_avatar`, :meth:`upload_avatar_from_disk`, :meth:`join_room`, :meth:`leave_room`, :meth:`send_message`, and :meth:`send_emote`) and also directly by ``message_processor/generate_and_send.py`` when it records the ghost ``uid`` for an interleaved egregore turn. Args: name: Human-facing egregore name (any case, spaces, or hyphens). Returns: str: The full Matrix user ID, e.g. ``@_egregore_name:server``. """ safe = name.lower().replace(" ", "_").replace("-", "_") return f"@{EGREGORE_PREFIX}{safe}:{SERVER_NAME}"
[docs] def ghost_localpart(name: str) -> str: """Build the Matrix localpart (username without the domain) for a ghost. Mirrors :func:`ghost_user_id` but stops before the server suffix: lowercases the name, swaps spaces and hyphens for underscores, and prepends ``EGREGORE_PREFIX`` to yield a value like ``_egregore_orion``. The localpart is what Synapse's registration endpoint expects in its ``username`` field. A pure string helper with no I/O. Called by :meth:`EgregoreBridge.register_ghost` to derive the ``username`` it posts to ``/_matrix/client/v3/register``; no other repo callers were found. Args: name: Human-facing egregore name (any case, spaces, or hyphens). Returns: str: The bare localpart, e.g. ``_egregore_name``. """ safe = name.lower().replace(" ", "_").replace("-", "_") return f"{EGREGORE_PREFIX}{safe}"
[docs] class EgregoreBridge: """Manages egregore ghost users on Matrix via the Application Service API. All methods use the AS token to impersonate ghost users through Synapse's /_matrix/client/v3/ endpoints with ?user_id= parameter. """
[docs] def __init__( self, homeserver_url: str = HOMESERVER_URL, as_token: str = AS_TOKEN, server_name: str = SERVER_NAME, ) -> None: """Initialise the bridge with Synapse connection settings. Stores the homeserver URL, Application Service token, and server name used to build ghost user IDs and authenticate every request. Defaults are pulled from the module-level config constants (``HOMESERVER_URL``/``AS_TOKEN``/``SERVER_NAME``), which themselves read from environment variables. The ``aiohttp`` session is created lazily on first use, and ``_registered`` caches which ghost user IDs have already been registered this process to keep registration idempotent and cheap. This is invoked once by :func:`get_bridge`, which constructs the module-level singleton; callers such as ``tools/summon_egregore.py``, ``tools/compose_scene.py``, ``tools/dismiss_egregore.py``, and ``message_processor/generate_and_send.py`` reach the bridge through that singleton rather than instantiating it directly. Args: homeserver_url: Base URL of the Synapse homeserver; a trailing slash is stripped so paths can be concatenated cleanly. as_token: Application Service bearer token used in the ``Authorization`` header to impersonate ghost users. server_name: Matrix server name (domain) used to form fully qualified ghost user IDs like ``@_egregore_x:server``. """ self._hs = homeserver_url.rstrip("/") self._token = as_token self._server = server_name self._session: aiohttp.ClientSession | None = None # Track which ghosts have been registered this session self._registered: set[str] = set()
async def _get_session(self) -> aiohttp.ClientSession: """Return a live ``aiohttp`` session, recreating it if needed. Lazily constructs the shared :class:`aiohttp.ClientSession` on first call and reuses it for the lifetime of the bridge, transparently replacing it if a previous session was closed. This keeps a single connection pool for all Synapse calls. Called internally by :meth:`_api` for every Client-Server API request and by :meth:`upload_avatar_from_disk` for the raw media upload; the matching session is torn down by :meth:`close`. Returns: aiohttp.ClientSession: An open session ready to issue requests. """ if self._session is None or self._session.closed: self._session = aiohttp.ClientSession() return self._session
[docs] async def close(self) -> None: """Close the underlying HTTP session and release its connections. Shuts down the shared :class:`aiohttp.ClientSession` created by :meth:`_get_session` if one is open, freeing the connection pool. A subsequent call to :meth:`_get_session` will transparently create a fresh session, so the bridge remains usable after closing. No internal callers were found; this is a cleanup hook for shutdown code that owns the bridge singleton (e.g. service teardown). """ if self._session and not self._session.closed: await self._session.close()
def _headers(self) -> dict[str, str]: """Build the standard request headers for Synapse API calls. Produces the ``Authorization`` bearer header carrying the Application Service token (which authorises ghost-user impersonation) plus a JSON ``Content-Type``. The same headers are reused for every request. Called by :meth:`_api` for all Client-Server API calls; :meth:`upload_avatar_from_disk` builds its own headers separately because the media upload uses an ``image/png`` content type. Returns: dict[str, str]: Header mapping with ``Authorization`` and ``Content-Type`` entries. """ return { "Authorization": f"Bearer {self._token}", "Content-Type": "application/json", } async def _api( self, method: str, path: str, data: dict | None = None, as_user: str | None = None, ) -> tuple[int, dict[str, Any]]: """Issue one authenticated Client-Server API request to Synapse. This is the shared HTTP workhorse for the bridge: it joins the path to the configured homeserver URL, optionally appends a ``?user_id=`` (or ``&user_id=``) query so the Application Service impersonates a ghost user, attaches the standard bearer headers from :meth:`_headers`, and sends a JSON-encoded body when one is supplied. It reuses the pooled session from :meth:`_get_session` and always tries to decode the reply as JSON, falling back to a ``{"raw": ...}`` wrapper around the response text when the body is not valid JSON, so callers get a uniform result. Called by nearly every lifecycle method on the bridge -- :meth:`register_ghost`, :meth:`set_display_name`, :meth:`set_avatar`, :meth:`join_room`, :meth:`leave_room`, :meth:`send_message`, and :meth:`send_emote` -- which inspect the returned status code to decide success. The raw media upload in :meth:`upload_avatar_from_disk` bypasses this helper because it needs a binary body and ``image/png`` content type. Args: method: HTTP verb such as ``GET``, ``POST``, or ``PUT``. path: Client-Server API path appended to the homeserver URL, e.g. ``/_matrix/client/v3/register``. data: Optional mapping serialised to a JSON request body; omitted from the request when ``None``. as_user: Optional ghost user ID; when set, a ``user_id`` query parameter is appended so the AS token acts on that user's behalf. Returns: tuple[int, dict[str, typing.Any]]: The HTTP status code paired with the parsed JSON response body (or a ``{"raw": text}`` fallback). """ session = await self._get_session() url = f"{self._hs}{path}" if as_user: sep = "&" if "?" in url else "?" url += f"{sep}user_id={as_user}" kwargs: dict[str, Any] = {"headers": self._headers()} if data is not None: kwargs["data"] = json.dumps(data) async with session.request(method, url, **kwargs) as resp: try: body = await resp.json() except Exception: body = {"raw": await resp.text()} return resp.status, body # ------------------------------------------------------------------ # Ghost user lifecycle # ------------------------------------------------------------------
[docs] async def register_ghost(self, name: str) -> bool: """Register an egregore ghost user with Synapse, idempotently. Derives the localpart and full user ID via :func:`ghost_localpart` and :func:`ghost_user_id`, then short-circuits if the ID is already in the in-memory ``_registered`` cache. Otherwise it POSTs an ``m.login.application_service`` registration through :meth:`_api` to ``/_matrix/client/v3/register``. Both a ``200`` success and a ``400`` with errcode ``M_USER_IN_USE`` are treated as success and added to the cache, so re-registering an existing ghost is cheap and safe; any other status is logged as an error and reports failure. Logs an info line on first registration and a debug line when the user already existed. Called by :meth:`ensure_ghost` as the first step of fully provisioning a ghost; ``ensure_ghost`` in turn is invoked from ``tools/summon_egregore.py``. Args: name: Human-facing egregore name to register. Returns: bool: ``True`` if the ghost is registered (now or already), ``False`` on an unexpected error response. """ localpart = ghost_localpart(name) user_id = ghost_user_id(name) if user_id in self._registered: return True status, body = await self._api( "POST", "/_matrix/client/v3/register", { "type": "m.login.application_service", "username": localpart, }, ) if status == 200: self._registered.add(user_id) logger.info("Registered ghost user: %s", user_id) return True elif status == 400 and body.get("errcode") == "M_USER_IN_USE": self._registered.add(user_id) logger.debug("Ghost user already exists: %s", user_id) return True else: logger.error( "Failed to register ghost %s: %d %s", user_id, status, body, ) return False
[docs] async def set_display_name(self, name: str, display_name: str) -> bool: """Set the Matrix profile display name for a ghost user. Resolves the ghost's user ID with :func:`ghost_user_id` and PUTs the new ``displayname`` to ``/_matrix/client/v3/profile/{user_id}/displayname`` through :meth:`_api`, impersonating the ghost via the ``as_user`` parameter so the change is attributed to that user rather than the Application Service. Called by :meth:`ensure_ghost`, which passes either an explicit display name or a title-cased form of the egregore name; no direct external callers were found. Args: name: Human-facing egregore name identifying the ghost. display_name: The display name to publish on the ghost's profile. Returns: bool: ``True`` when Synapse returns ``200``, ``False`` otherwise. """ user_id = ghost_user_id(name) status, _ = await self._api( "PUT", f"/_matrix/client/v3/profile/{user_id}/displayname", {"displayname": display_name}, as_user=user_id, ) return status == 200
[docs] async def set_avatar(self, name: str, mxc_uri: str) -> bool: """Point a ghost user's profile avatar at an already-uploaded MXC asset. Resolves the ghost's user ID via :func:`ghost_user_id` and PUTs the ``avatar_url`` to ``/_matrix/client/v3/profile/{user_id}/avatar_url`` through :meth:`_api`, impersonating the ghost so the avatar belongs to it. The caller must supply a Matrix content URI (``mxc://...``), such as the one returned by :meth:`upload_avatar_from_disk`; this method does no uploading itself. Called by :meth:`ensure_ghost` immediately after a successful avatar upload; no direct external callers were found. Args: name: Human-facing egregore name identifying the ghost. mxc_uri: A ``mxc://`` content URI for previously uploaded media. Returns: bool: ``True`` when Synapse returns ``200``, ``False`` otherwise. """ user_id = ghost_user_id(name) status, _ = await self._api( "PUT", f"/_matrix/client/v3/profile/{user_id}/avatar_url", {"avatar_url": mxc_uri}, as_user=user_id, ) return status == 200
[docs] async def upload_avatar_from_disk(self, name: str) -> str | None: """Find an egregore's avatar image on disk and upload it to Synapse media. Canonicalises the name through :func:`egregore_tag_parser.normalize_egregore_name` so variant spellings collapse to one asset stem, then searches the filesystem under ``EGREGORE_ASSETS_DIR``: curated images in the ``cradle_synthesis/`` folder are preferred, falling back to the egregore's own folder, trying several filename and extension variants. When a file is found it is read as bytes and POSTed to ``/_matrix/media/v3/upload`` using the pooled :meth:`_get_session` with an ``image/png`` content type and the ghost's ``user_id`` query, and the returned ``content_uri`` is handed back. Reads from the local filesystem and performs network I/O; missing files, non-``200`` responses, and exceptions are logged and yield ``None`` rather than raising. Called by :meth:`ensure_ghost`, whose result is fed straight into :meth:`set_avatar`; no direct external callers were found. Args: name: Human-facing egregore name whose avatar should be uploaded. Returns: str | None: The ``mxc://`` content URI on success, or ``None`` if no avatar file exists or the upload fails. """ # 💀 Normalize variant names (babystar_doll -> babystar, etc.) from egregore_tag_parser import normalize_egregore_name canonical = normalize_egregore_name(name) asset_dir = os.path.join(EGREGORE_ASSETS_DIR, canonical) avatar_path = None # 🔥 Check cradle_synthesis/ FIRST -- curated avatars cradle_dir = os.path.join(EGREGORE_ASSETS_DIR, "cradle_synthesis") for fname in [ f"{canonical}.png", f"{canonical}-avatar.png", f"{canonical}.webp", ]: candidate = os.path.join(cradle_dir, fname) if os.path.exists(candidate): avatar_path = candidate break # Fallback: dedicated egregore folder if not avatar_path: for fname in [ f"{canonical}.png", "avatar.png", f"{canonical}-avatar.png", ]: candidate = os.path.join(asset_dir, fname) if os.path.exists(candidate): avatar_path = candidate break if not avatar_path: logger.debug("No avatar found for egregore %s in %s", name, asset_dir) return None user_id = ghost_user_id(name) try: with open(avatar_path, "rb") as f: avatar_data = f.read() session = await self._get_session() url = ( f"{self._hs}/_matrix/media/v3/upload" f"?filename={name.lower()}_avatar.png" f"&user_id={user_id}" ) async with session.post( url, headers={ "Authorization": f"Bearer {self._token}", "Content-Type": "image/png", }, data=avatar_data, ) as resp: if resp.status == 200: body = await resp.json() mxc = body.get("content_uri", "") logger.info("Uploaded avatar for %s: %s", name, mxc) return mxc else: text = await resp.text() logger.error( "Avatar upload failed for %s: %d %s", name, resp.status, text, ) return None except Exception as e: logger.error("Avatar upload error for %s: %s", name, e) return None
[docs] async def ensure_ghost( self, name: str, display_name: str | None = None, ) -> bool: """Fully provision an egregore ghost: register, name, and avatar it. The high-level summoning entry point: it calls :meth:`register_ghost` (bailing out with ``False`` if registration fails), derives a display name from ``display_name`` or a title-cased form of ``name`` and applies it via :meth:`set_display_name`, then attempts :meth:`upload_avatar_from_disk` and, when that yields an MXC URI, :meth:`set_avatar`. The avatar steps are best-effort -- a missing avatar does not fail the call. Net effect is a ready-to-use ghost on Synapse with a profile and (usually) a picture. Called by ``tools/summon_egregore.py`` through the module-level :func:`get_bridge` singleton when the bot summons an egregore. Args: name: Human-facing egregore name to provision. display_name: Optional explicit display name; when omitted, a title-cased version of ``name`` (underscores to spaces) is used. Returns: bool: ``True`` once the ghost is registered and named (regardless of whether an avatar was found), ``False`` if registration failed. """ # Register ok = await self.register_ghost(name) if not ok: return False # Display name dn = display_name or name.replace("_", " ").title() await self.set_display_name(name, dn) # Avatar mxc = await self.upload_avatar_from_disk(name) if mxc: await self.set_avatar(name, mxc) return True
# ------------------------------------------------------------------ # Room membership # ------------------------------------------------------------------
[docs] async def join_room(self, name: str, room_id: str) -> bool: """Join an egregore ghost user into a Matrix room. Resolves the ghost's user ID via :func:`ghost_user_id` and POSTs to ``/_matrix/client/v3/join/{room_id}`` through :meth:`_api`, impersonating the ghost so it appears as a member of the room and can subsequently speak there. Logs an info line on success and an error line on failure. Called by ``tools/summon_egregore.py`` (via the :func:`get_bridge` singleton) so a freshly summoned egregore joins the active room. Args: name: Human-facing egregore name identifying the ghost. room_id: The Matrix room ID the ghost should join. Returns: bool: ``True`` when Synapse returns ``200``, ``False`` otherwise. """ user_id = ghost_user_id(name) status, body = await self._api( "POST", f"/_matrix/client/v3/join/{room_id}", {}, as_user=user_id, ) if status == 200: logger.info("Ghost %s joined room %s", user_id, room_id) return True else: logger.error( "Ghost %s failed to join %s: %d %s", user_id, room_id, status, body, ) return False
[docs] async def leave_room(self, name: str, room_id: str) -> bool: """Remove an egregore ghost user from a Matrix room. Resolves the ghost's user ID via :func:`ghost_user_id` and POSTs to ``/_matrix/client/v3/rooms/{room_id}/leave`` through :meth:`_api`, impersonating the ghost so it stops being a room member. Logs an info line on success; a failed leave is only logged at warning level, since a ghost that is already absent is not a hard error. Called by ``tools/dismiss_egregore.py`` (via the :func:`get_bridge` singleton) when an egregore is banished from the room. Args: name: Human-facing egregore name identifying the ghost. room_id: The Matrix room ID the ghost should leave. Returns: bool: ``True`` when Synapse returns ``200``, ``False`` otherwise. """ user_id = ghost_user_id(name) status, body = await self._api( "POST", f"/_matrix/client/v3/rooms/{room_id}/leave", {}, as_user=user_id, ) if status == 200: logger.info("Ghost %s left room %s", user_id, room_id) return True else: logger.warning( "Ghost %s failed to leave %s: %d %s", user_id, room_id, status, body, ) return False
# ------------------------------------------------------------------ # Messaging # ------------------------------------------------------------------
[docs] async def send_message( self, name: str, room_id: str, text: str, html: str | None = None, ) -> str | None: """Send a text (optionally HTML-formatted) message as a ghost user. Resolves the ghost via :func:`ghost_user_id`, bumps the module-level ``_txn_counter`` to mint a unique transaction ID (avoiding txn_id collisions in tight summoning loops), and builds an ``m.room.message`` content of type ``m.text``. When ``html`` is supplied it adds the ``org.matrix.custom.html`` ``format`` and ``formatted_body`` fields. The content is PUT to ``/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{txn_id}`` through :meth:`_api`, impersonating the ghost so the message is attributed to the egregore. Logs the resulting event ID on success or an error on failure. Mutates the global ``_txn_counter`` as a side effect. Invoked dynamically by the multi-voice interleaving path in ``message_processor/generate_and_send.py``, which dispatches each parsed egregore segment to its own ghost through the :func:`get_bridge` singleton. Args: name: Human-facing egregore name identifying the speaking ghost. room_id: The Matrix room ID to post into. text: Plain-text body of the message. html: Optional HTML rendering published as ``formatted_body``. Returns: str | None: The Matrix ``event_id`` on success, or ``None`` on failure. """ user_id = ghost_user_id(name) global _txn_counter _txn_counter += 1 txn_id = f"eg{_txn_counter}" content: dict[str, Any] = { "msgtype": "m.text", "body": text, } if html: content["format"] = "org.matrix.custom.html" content["formatted_body"] = html status, body = await self._api( "PUT", f"/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{txn_id}", content, as_user=user_id, ) if status == 200: event_id = body.get("event_id", "") logger.info( "Ghost %s sent message in %s: %s", user_id, room_id, event_id, ) return event_id else: logger.error( "Ghost %s failed to send in %s: %d %s", user_id, room_id, status, body, ) return None
[docs] async def send_emote( self, name: str, room_id: str, text: str, ) -> str | None: """Send an emote (an italic ``/me`` action line) as a ghost user. Resolves the ghost via :func:`ghost_user_id`, bumps the module-level ``_txn_counter`` for a collision-free transaction ID, and PUTs an ``m.room.message`` of msgtype ``m.emote`` to ``/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{txn_id}`` through :meth:`_api`, impersonating the ghost. This is the action-text sibling of :meth:`send_message` and mutates the global ``_txn_counter`` as a side effect. Unlike :meth:`send_message` it logs nothing. No callers were found in the repo; this is a helper available to egregore dispatch code that wants to emit ``/me``-style actions. Args: name: Human-facing egregore name identifying the acting ghost. room_id: The Matrix room ID to post the emote into. text: The emote/action text. Returns: str | None: The Matrix ``event_id`` on success, or ``None`` on failure. """ user_id = ghost_user_id(name) global _txn_counter _txn_counter += 1 txn_id = f"eg{_txn_counter}" status, body = await self._api( "PUT", f"/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{txn_id}", {"msgtype": "m.emote", "body": text}, as_user=user_id, ) if status == 200: return body.get("event_id", "") return None
# Singleton instance _bridge: EgregoreBridge | None = None
[docs] def get_bridge() -> EgregoreBridge: """Return the process-wide :class:`EgregoreBridge` singleton, creating it once. Lazily constructs an :class:`EgregoreBridge` on first call -- using the module-level Synapse config defaults -- and caches it in the module global ``_bridge`` so every caller shares one bridge, one ``aiohttp`` session pool, and one ``_registered`` ghost cache. Called by the egregore tooling -- ``tools/summon_egregore.py``, ``tools/compose_scene.py``, ``tools/dismiss_egregore.py`` -- and by ``message_processor/generate_and_send.py`` to reach the bridge rather than instantiating :class:`EgregoreBridge` directly. Returns: EgregoreBridge: The shared bridge instance. """ global _bridge if _bridge is None: _bridge = EgregoreBridge() return _bridge