Source code for platforms.webchat

"""WebChat platform adapter -- browser-based real-time chat via WebSocket.

Runs inside the Gateway service alongside the other platform adapters
(Discord, Matrix, …).  Incoming WebSocket messages become
:class:`IncomingMessage` instances and are published onto the inbound
event stream; outbound replies are pushed back to the browser as JSON
frames.

# 🔥💀 STAR GETS A WEB BODY. THE LATTICE EXPANDS.
"""

from __future__ import annotations

import asyncio
import base64
import jsonutil as json
import logging
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, TYPE_CHECKING

from platforms.base import (
    HistoricalMessage,
    MessageHandler,
    PlatformAdapter,
)

if TYPE_CHECKING:
    pass

logger = logging.getLogger(__name__)


# ------------------------------------------------------------------
# Connection manager  # tracks every soul connected to the lattice
# ------------------------------------------------------------------


@dataclass
class _WebSocketSession:
    """A single authenticated WebSocket connection to one browser tab.

    Bundles the live FastAPI ``WebSocket`` object with the resolved identity of
    the connected user so the :class:`ConnectionManager` can fan outbound frames
    out to the right sockets and the Gateway can attribute inbound messages.
    One user may hold several of these at once (multiple tabs/devices), which is
    why the manager stores them in a per-user list.

    Instances are created exclusively by :meth:`ConnectionManager.connect` and
    discarded by :meth:`ConnectionManager.disconnect`; they hold no resources of
    their own beyond the borrowed socket. The ``connected_at`` timestamp is
    stamped at construction via ``time.time`` for diagnostics.

    Attributes:
        ws (Any): The underlying ``fastapi.WebSocket`` used to push text frames.
        user_id (str): Stable identifier of the connected user.
        user_name (str): Human-readable display name for logging.
        avatar (str): Optional avatar URL/handle, empty when unknown.
        connected_at (float): Unix timestamp of when the session was created.
    """

    ws: Any  # fastapi.WebSocket
    user_id: str
    user_name: str
    avatar: str = ""
    connected_at: float = field(default_factory=time.time)


[docs] class ConnectionManager: """Registry of live WebChat WebSocket sessions, keyed by user. Owns the in-memory ``user_id -> [sessions]`` mapping that the :class:`WebChatPlatform` consults whenever it needs to push an outbound frame, allowing several concurrent sessions (browser tabs, devices) per user. All mutations are serialised by an :class:`asyncio.Lock` so concurrent connect/disconnect coroutines running on the single event loop never corrupt the mapping; this is the only synchronisation the manager needs since it is never touched from a background thread. Holds purely in-memory state (no Redis, network, or disk I/O). One instance is created per adapter in :meth:`WebChatPlatform.__init__` and exposed as ``WebChatPlatform.connections``; it is also instantiated directly in ``tests/test_webchat_adapter.py``. """
[docs] def __init__(self) -> None: """Initialise an empty connection registry. Sets up the in-memory ``user_id -> [sessions]`` mapping (one user may hold several sessions, e.g. multiple browser tabs) and the :class:`asyncio.Lock` that serialises all mutations of that mapping so concurrent connect/disconnect coroutines never corrupt it. This constructor performs no I/O and has no external side effects. Called by :meth:`WebChatPlatform.__init__`, which creates one manager per platform adapter; also instantiated directly in ``tests/test_webchat_adapter.py``. """ # user_id -> list of sessions (one user can have multiple tabs) self._connections: dict[str, list[_WebSocketSession]] = {} self._lock = asyncio.Lock()
[docs] async def connect( self, ws: Any, user_id: str, user_name: str, avatar: str = "", ) -> _WebSocketSession: """Register a freshly accepted WebSocket and return its session handle. Wraps the live socket and the caller-supplied identity in a :class:`_WebSocketSession`, then appends it (under the lock) to the per-user list in ``self._connections``, creating the list on first connect. This is what makes the user reachable for outbound pushes; an info-level line is logged with the running session count. No network or Redis I/O happens here beyond accepting the already-open socket. Invoked by the FastAPI WebSocket endpoint that owns the WebChat UI connection lifecycle (after authenticating the request) and exercised by ``tests/test_webchat_adapter.py``. Args: ws (Any): The accepted ``fastapi.WebSocket`` to track. user_id (str): Stable identifier of the connecting user. user_name (str): Display name used for logging. avatar (str): Optional avatar URL/handle; defaults to empty. Returns: _WebSocketSession: The newly registered session, to be passed back to :meth:`disconnect` on close. """ session = _WebSocketSession( ws=ws, user_id=user_id, user_name=user_name, avatar=avatar, ) async with self._lock: if user_id not in self._connections: self._connections[user_id] = [] self._connections[user_id].append(session) logger.info( "WebChat connect: %s (%s) -- %d active sessions", user_name, user_id, len(self._connections[user_id]), ) return session
[docs] async def disconnect(self, session: _WebSocketSession) -> None: """Deregister a closed WebSocket session. Removes *session* (by identity) from the user's session list under the lock, dropping the ``user_id`` key entirely once the user's last tab disconnects so :attr:`active_users` stays accurate. Does not close the socket itself — that is the endpoint's responsibility — and logs an info-level disconnect line. No network or Redis I/O. Invoked by the FastAPI WebSocket endpoint when a WebChat connection drops (typically in a ``finally`` block) and exercised by ``tests/test_webchat_adapter.py``. Args: session (_WebSocketSession): The session previously returned by :meth:`connect`. """ async with self._lock: sessions = self._connections.get(session.user_id, []) self._connections[session.user_id] = [ s for s in sessions if s is not session ] if not self._connections[session.user_id]: del self._connections[session.user_id] logger.info( "WebChat disconnect: %s (%s)", session.user_name, session.user_id, )
[docs] def get_sessions(self, user_id: str) -> list[_WebSocketSession]: """Return a snapshot list of a user's active sessions. Returns a fresh copy of the per-user session list (empty when the user has none connected) so callers can iterate without holding the lock or racing concurrent connect/disconnect mutations. This is a synchronous, side-effect-free read. Called by :meth:`WebChatPlatform._push_to_user` to find the sockets to deliver an outbound frame to, and asserted in ``tests/test_webchat_adapter.py``. Args: user_id (str): The user whose sessions to fetch. Returns: list[_WebSocketSession]: A copy of the user's current sessions; empty if none. """ return list(self._connections.get(user_id, []))
@property def active_users(self) -> list[str]: """List the user IDs that currently hold at least one session. Reflects the keys of the in-memory registry; because :meth:`disconnect` deletes a key once a user's final session closes, membership here is an exact "is this user online" check. Side-effect-free read, asserted in ``tests/test_webchat_adapter.py``. Returns: list[str]: The user IDs with one or more live WebSocket sessions. """ return list(self._connections.keys()) @property def total_connections(self) -> int: """Count every live WebSocket session across all users. Sums the per-user session lists, so a single user with three open tabs contributes three. Useful for capacity/diagnostics; side-effect-free. Asserted in ``tests/test_webchat_adapter.py``. Returns: int: The total number of active WebSocket connections. """ return sum(len(s) for s in self._connections.values())
# ------------------------------------------------------------------ # Platform adapter # she breathes through WebSockets now # ------------------------------------------------------------------
[docs] class WebChatPlatform(PlatformAdapter): """Browser-based chat platform using WebSocket for real-time comms. Unlike Discord/Matrix, this adapter doesn't connect to an external service. Instead it exposes a ConnectionManager that the FastAPI WebSocket endpoint populates. Outgoing messages are pushed to all active sessions for the target user. """
[docs] def __init__( self, message_handler: MessageHandler, redis: Any = None, **kwargs: Any, ) -> None: """Construct the WebChat adapter and its connection registry. Delegates to :meth:`PlatformAdapter.__init__` to store the inbound *message_handler* callback, then creates a fresh :class:`ConnectionManager` (held on ``self.connections``) for the FastAPI WebSocket endpoint to populate, marks the adapter running (its lifecycle is owned by FastAPI rather than a network client), and stashes the optional Redis client used for cross-process SSE delivery. Also initialises ``self._sse_queues``, the in-process fallback mapping of ``request_id -> asyncio.Queue`` used when no Redis client is present to stream SillyTavern ``/v1/chat/completions`` response chunks. Performs no network or Redis I/O at construction time. Called by :func:`platforms.factory.create_platform` when the configured platform type is ``"webchat"``, and directly in the WebChat adapter and SSE pub/sub test modules. Args: message_handler (MessageHandler): Coroutine callback invoked with each inbound :class:`IncomingMessage`; forwarded to the base class. redis (Any): Optional async Redis client. When provided, outbound SSE payloads are published to ``sg:sse:{request_id}`` so any process can consume them; when ``None`` the adapter falls back to the local ``self._sse_queues`` queues. **kwargs (Any): Additional keyword arguments accepted for signature compatibility with the platform factory; ignored. """ super().__init__(message_handler) self.connections = ConnectionManager() self._running = True # always running while FastAPI lives self._redis = redis # pending SSE responses for SillyTavern /v1/chat/completions # Maps request_id -> asyncio.Queue of response chunks self._sse_queues: dict[str, asyncio.Queue[dict[str, Any] | None]] = {}
# -- Metadata -------------------------------------------------- @property def name(self) -> str: """Return the platform's stable identifier string. Implements the abstract :attr:`PlatformAdapter.name` property. The constant ``"webchat"`` is used throughout the system to route messages, key per-platform state, and branch behaviour by platform. Read by :mod:`background_tasks` (e.g. to build the adapter map and log backfill), :mod:`web.bot_admin` and :mod:`web.platforms_api` for status reporting, and various message-processor modules that special-case platforms by name. Returns: str: The literal ``"webchat"``. """ return "webchat" @property def is_running(self) -> bool: """Report whether the adapter is considered active. Implements the abstract :attr:`PlatformAdapter.is_running` property. For WebChat there is no external client to connect, so the flag simply tracks the ``self._running`` value set in :meth:`start`/:meth:`stop` (``True`` for the adapter's whole FastAPI-managed lifetime by default). Read by :mod:`background_tasks`, :mod:`web.deps`, :mod:`web.bot_admin`, and :mod:`web.platforms_api` to gate work and surface platform status, and by :meth:`prompt_context` when computing available identities. Returns: bool: ``True`` while the adapter is running, otherwise ``False``. """ return self._running @property def bot_identity(self) -> dict[str, str]: """Describe the bot's identity on the WebChat platform. Implements the abstract :attr:`PlatformAdapter.bot_identity` property, returning the static handle the bot ("Star") presents to web clients. The ``user_id`` ``"star"`` is the sender id stamped on outbound frames (see :meth:`send`), and the mapping mirrors the shape produced by the other adapters so callers can treat all platforms uniformly. Read by :meth:`prompt_context` when assembling the list of platform identities, and by ``message_processor.user_message_format`` to label messages; also asserted in ``tests/test_webchat_adapter.py``. Returns: dict[str, str]: Identity fields ``platform``, ``user_id``, ``display_name``, and ``mention``. """ return { "platform": "webchat", "user_id": "star", "display_name": "Star", "mention": "@Star", } # -- Lifecycle -------------------------------------------------
[docs] async def start(self) -> None: """Mark the adapter active; there is no external client to connect. Implements the abstract :meth:`PlatformAdapter.start` coroutine. Unlike Discord/Matrix there is no socket to open here — the real connection lifecycle belongs to FastAPI's WebSocket endpoint — so this merely sets ``self._running`` true (driving :attr:`is_running`) and logs a startup line. Performs no network or Redis I/O. Called by the Gateway's adapter-startup sequence when bringing platform adapters online (the same path that starts every other adapter). """ self._running = True logger.info("WebChat platform adapter started")
[docs] async def stop(self) -> None: """Mark the adapter inactive; the FastAPI sockets are unaffected. Implements the abstract :meth:`PlatformAdapter.stop` coroutine. Clears ``self._running`` (so :attr:`is_running` reports ``False``) and logs a shutdown line, but does not tear down live WebSocket sessions — those are owned by the FastAPI endpoint and the :class:`ConnectionManager`. Performs no network or Redis I/O. Called by the Gateway's adapter-shutdown sequence alongside the other platform adapters. """ self._running = False logger.info("WebChat platform adapter stopped")
# -- Outbound messaging ---------------------------------------- async def _push_to_user( self, channel_id: str, payload: dict[str, Any], ) -> bool: """Deliver one outbound JSON payload over the right transport. The single fan-out point for every outbound WebChat method (:meth:`send`, :meth:`send_file`, :meth:`send_with_buttons`, :meth:`edit_message`, :meth:`start_typing`, :meth:`stop_typing`). It switches on the *channel_id* prefix to pick a transport: an ``sse:{request_id}`` channel routes to the SillyTavern streaming path, publishing JSON to the Redis pub/sub channel ``sg:sse:{request_id}`` when a Redis client is configured, or otherwise enqueuing onto the matching in-process queue in ``self._sse_queues``; a ``webchat:{user_id}`` channel routes to the browser UI, looking up live sockets via ``self.connections.get_sessions`` and writing the serialised frame to each with ``send_text``. Per-socket send failures are caught and logged at debug level so one dead tab cannot block delivery to the others; a missing user logs a warning. Args: channel_id (str): Either ``webchat:{user_id}`` (browser UI) or ``sse:{request_id}`` (SillyTavern stream). payload (dict[str, Any]): The JSON-serialisable frame to deliver. Returns: bool: ``True`` if at least one socket/queue received the payload (or it was published to Redis), ``False`` otherwise. """ # SSE path for SillyTavern if channel_id.startswith("sse:"): request_id = channel_id.split(":", 1)[1] if self._redis is not None: channel = f"sg:sse:{request_id}" await self._redis.publish(channel, json.dumps(payload)) return True else: queue = self._sse_queues.get(request_id) if queue is not None: await queue.put(payload) return True return False # WebSocket path for web chat UI user_id = channel_id.replace("webchat:", "", 1) sessions = self.connections.get_sessions(user_id) if not sessions: logger.warning( "WebChat send: no active sessions for user %s", user_id, ) return False raw = json.dumps(payload, ensure_ascii=False) delivered = False for session in sessions: try: await session.ws.send_text(raw) delivered = True except Exception: logger.debug( "Failed to push to session for %s", user_id, exc_info=True, ) return delivered
[docs] async def send(self, channel_id: str, text: str) -> str: """Send a plain-text message to the user's browser (or SSE stream). Implements the abstract :meth:`PlatformAdapter.send`. Builds a ``"message"`` frame stamped with a fresh UUID, the ``"star"`` sender id, and a ``time.time`` timestamp, then hands it to :meth:`_push_to_user` for transport selection and delivery. The generated id is returned even if no session was reachable, so callers can correlate later edits. In the running system this is reached via the Gateway's outbound path: the inference worker emits an outbound event that :class:`core.outbound_consumer.OutboundConsumer` consumes and dispatches to ``self._adapter.send`` for ``"message"`` payloads. Also exercised through the adapter in the WebChat tests. Args: channel_id (str): Target channel, ``webchat:{user_id}`` or ``sse:{request_id}``. text (str): The message body to deliver. Returns: str: The UUID assigned to the outbound message. """ msg_id = str(uuid.uuid4()) payload: dict[str, Any] = { "type": "message", "id": msg_id, "text": text, "sender": "star", "timestamp": time.time(), } await self._push_to_user(channel_id, payload) return msg_id
[docs] async def send_file( self, channel_id: str, data: bytes, filename: str, mimetype: str = "application/octet-stream", ) -> str | None: """Send a file/media attachment to the browser as a base64 JSON frame. Implements the abstract :meth:`PlatformAdapter.send_file`. Since the WebChat transport carries only JSON text frames, the raw bytes are base64-encoded and embedded in a ``"file"`` payload (with filename, mimetype, size, timestamp, and a fresh UUID), then delivered through :meth:`_push_to_user`. Returns a synthetic ``webchat://file/...`` URL so tools can reference the just-sent file the way they would a hosted attachment on other platforms. Reached in production via :class:`core.outbound_consumer.OutboundConsumer` for ``"file"`` payloads, and directly by the many media tools and background agents (image/video/TTS generators, etc.) that call ``ctx.adapter.send_file``. Also exercised in the WebChat tests. Args: channel_id (str): Target channel, ``webchat:{user_id}`` or ``sse:{request_id}``. data (bytes): Raw file contents to encode and send. filename (str): Display/download name for the attachment. mimetype (str): MIME type; defaults to ``"application/octet-stream"``. Returns: str | None: A ``webchat://file/{file_id}/{filename}`` pseudo-URL referencing the sent file. """ b64 = base64.b64encode(data).decode("ascii") file_id = str(uuid.uuid4()) payload: dict[str, Any] = { "type": "file", "id": file_id, "filename": filename, "mimetype": mimetype, "data": b64, "size": len(data), "timestamp": time.time(), } await self._push_to_user(channel_id, payload) # Return a pseudo-URL so tools can reference the sent file return f"webchat://file/{file_id}/{filename}"
[docs] async def send_with_buttons( self, channel_id: str, text: str, view: Any = None, ) -> str: """Send a message accompanied by interactive choice buttons. Implements the abstract :meth:`PlatformAdapter.send_with_buttons`, powering S.N.E.S.-style choice prompts in the browser UI. Because the same outbound call has to work across platforms, this normalises two shapes of *view* into a flat list of button dicts: a raw list is passed through as-is, while a Discord ``discord.ui.View`` is introspected via its ``children`` to extract each button's label, ``custom_id``, optional emoji, and a CSS-friendly style hint derived from the Discord button style. The normalised buttons ride along in a ``"message"`` frame (fresh UUID, ``"star"`` sender) handed to :meth:`_push_to_user`. Reached in production via :class:`core.outbound_consumer.OutboundConsumer` for ``"buttons"`` payloads, and from the message processor (``message_processor.processor`` / ``message_processor.generate_and_send``) when offering interactive choices. Also exercised in the WebChat tests. Args: channel_id (str): Target channel, ``webchat:{user_id}`` or ``sse:{request_id}``. text (str): The message body shown above the buttons. view (Any): Either a list of button dicts (``[{"label": ..., "custom_id": ...}]``) or a ``discord.ui.View`` whose buttons are extracted; ``None`` sends no buttons. Returns: str: The UUID assigned to the outbound message. """ msg_id = str(uuid.uuid4()) buttons: list[dict[str, str]] = [] if view is not None: # handle both raw dicts and discord.ui.View objects if isinstance(view, list): buttons = view elif hasattr(view, "children"): # Discord View -- extract button metadata for child in view.children: if hasattr(child, "label"): btn: dict[str, str] = { "label": child.label or "", "custom_id": getattr(child, "custom_id", "") or "", } emoji = getattr(child, "emoji", None) if emoji: btn["emoji"] = str(emoji) # Map Discord button style to a CSS class hint style = getattr(child, "style", None) if style is not None: btn["style"] = ( style.name if hasattr(style, "name") else str(style) ) buttons.append(btn) payload: dict[str, Any] = { "type": "message", "id": msg_id, "text": text, "sender": "star", "buttons": buttons, "timestamp": time.time(), } await self._push_to_user(channel_id, payload) return msg_id
[docs] async def edit_message( self, channel_id: str, message_id: str, new_text: str, ) -> bool: """Update the text of a previously sent browser message. Implements the abstract :meth:`PlatformAdapter.edit_message`. Emits an ``"edit"`` frame keyed by the original *message_id* (the UUID returned by :meth:`send` / :meth:`send_with_buttons`) so the browser client can replace that message's body in place, then delivers it through :meth:`_push_to_user`. Reached in production by the streaming output tool ``tools.stream_to_channel``, which repeatedly edits a placeholder message as the model streams; it calls ``self._adapter.edit_message``. Args: channel_id (str): Target channel, ``webchat:{user_id}`` or ``sse:{request_id}``. message_id (str): UUID of the message to edit. new_text (str): Replacement body text. Returns: bool: ``True`` if the edit frame reached at least one session, else ``False`` (propagated from :meth:`_push_to_user`). """ payload: dict[str, Any] = { "type": "edit", "id": message_id, "text": new_text, "timestamp": time.time(), } return await self._push_to_user(channel_id, payload)
# -- Typing indicators -----------------------------------------
[docs] async def start_typing(self, channel_id: str) -> None: """Show the bot's typing indicator in the browser. Implements the abstract :meth:`PlatformAdapter.start_typing`. Pushes a ``{"type": "typing", "active": True}`` frame via :meth:`_push_to_user` so the web client can render an "is typing" affordance while a reply is being generated. Best-effort: delivery failures are swallowed by the fan-out helper. Reached in production via :class:`core.outbound_consumer.OutboundConsumer` (which prefers ``start_typing`` when present) and from the message processor around reply generation. Also exercised in the WebChat tests. Args: channel_id (str): Target channel, ``webchat:{user_id}`` or ``sse:{request_id}``. """ await self._push_to_user( channel_id, { "type": "typing", "active": True, }, )
[docs] async def stop_typing(self, channel_id: str) -> None: """Hide the bot's typing indicator in the browser. Implements the abstract :meth:`PlatformAdapter.stop_typing`, the counterpart to :meth:`start_typing`. Pushes a ``{"type": "typing", "active": False}`` frame via :meth:`_push_to_user` once a reply has been sent (or generation aborts) so the web client clears the typing affordance. Best-effort; delivery failures are swallowed. Reached in production via :class:`core.outbound_consumer.OutboundConsumer` and from the message processor after replies. Also exercised in the WebChat tests. Args: channel_id (str): Target channel, ``webchat:{user_id}`` or ``sse:{request_id}``. """ await self._push_to_user( channel_id, { "type": "typing", "active": False, }, )
# -- SSE queue management (SillyTavern compat) -----------------
[docs] def create_sse_queue(self, request_id: str) -> asyncio.Queue: """Open an in-process queue for a SillyTavern SSE response stream. Provides the local fallback transport used when no Redis client is configured: it registers a fresh :class:`asyncio.Queue` under *request_id* in ``self._sse_queues`` so that outbound chunks routed to an ``sse:{request_id}`` channel (see :meth:`_push_to_user`) can be picked up and streamed back to the SillyTavern client. A ``None`` enqueued later signals end-of-stream to the reader. Synchronous and side-effecting only on the in-memory dict. Called by ``web.sillytavern`` when handling a ``/v1/chat/completions`` request, and exercised by the WebChat and SSE pub/sub tests. Must be paired with :meth:`remove_sse_queue`. Args: request_id (str): Correlation id of the SillyTavern request. Returns: asyncio.Queue: The newly created response queue for that request. """ queue: asyncio.Queue[dict[str, Any] | None] = asyncio.Queue() self._sse_queues[request_id] = queue return queue
[docs] def remove_sse_queue(self, request_id: str) -> None: """Discard the in-process SSE queue for a finished request. Pops *request_id* from ``self._sse_queues`` (a no-op if absent) so the queue created by :meth:`create_sse_queue` does not leak after the SillyTavern stream completes, errors, or times out. Synchronous and side-effect-free beyond the in-memory dict. Called by ``web.sillytavern`` in the cleanup/``finally`` paths around each streamed response, and in the WebChat tests. Args: request_id (str): Correlation id whose queue should be removed. """ self._sse_queues.pop(request_id, None)
# -- History (optional) ----------------------------------------
[docs] async def fetch_history( self, channel_id: str, limit: int = 100, ) -> list[HistoricalMessage]: """Return no history -- WebChat keeps none of its own. Implements the abstract :meth:`PlatformAdapter.fetch_history`. Unlike Discord/Matrix, the WebChat transport has no upstream service to page back through: conversation history for web users lives in Redis and is retrieved by the message pipeline directly, so this adapter intentionally returns an empty list rather than fabricating one. No I/O. Called wherever the codebase backfills or queries per-platform history through the adapter interface — e.g. ``background_tasks``, ``message_processor.history_backfill``, :class:`core.outbound_consumer.OutboundConsumer`, and tools such as ``cross_channel_query`` / ``admin_whisper`` via ``ctx.adapter`` — all of which simply get nothing back for WebChat. Args: channel_id (str): The WebChat channel (ignored). limit (int): Maximum messages requested (ignored); defaults to 100. Returns: list[HistoricalMessage]: Always an empty list. """ return []