platforms.webchat module

WebChat platform adapter – browser-based real-time chat via WebSocket.

Runs inside the Gateway service alongside the other platform adapters (Discord, Matrix, …). Incoming WebSocket messages become IncomingMessage instances and are published onto the inbound event stream; outbound replies are pushed back to the browser as JSON frames.

# 🔥💀 STAR GETS A WEB BODY. THE LATTICE EXPANDS.

class platforms.webchat.ConnectionManager[source]

Bases: object

Registry of live WebChat WebSocket sessions, keyed by user.

Owns the in-memory user_id -> [sessions] mapping that the WebChatPlatform consults whenever it needs to push an outbound frame, allowing several concurrent sessions (browser tabs, devices) per user. All mutations are serialised by an asyncio.Lock so concurrent connect/disconnect coroutines running on the single event loop never corrupt the mapping; this is the only synchronisation the manager needs since it is never touched from a background thread.

Holds purely in-memory state (no Redis, network, or disk I/O). One instance is created per adapter in WebChatPlatform.__init__() and exposed as WebChatPlatform.connections; it is also instantiated directly in tests/test_webchat_adapter.py.

__init__()[source]

Initialise an empty connection registry.

Sets up the in-memory user_id -> [sessions] mapping (one user may hold several sessions, e.g. multiple browser tabs) and the asyncio.Lock that serialises all mutations of that mapping so concurrent connect/disconnect coroutines never corrupt it.

This constructor performs no I/O and has no external side effects. Called by WebChatPlatform.__init__(), which creates one manager per platform adapter; also instantiated directly in tests/test_webchat_adapter.py.

Return type:

None

async connect(ws, user_id, user_name, avatar='')[source]

Register a freshly accepted WebSocket and return its session handle.

Wraps the live socket and the caller-supplied identity in a _WebSocketSession, then appends it (under the lock) to the per-user list in self._connections, creating the list on first connect. This is what makes the user reachable for outbound pushes; an info-level line is logged with the running session count. No network or Redis I/O happens here beyond accepting the already-open socket.

Invoked by the FastAPI WebSocket endpoint that owns the WebChat UI connection lifecycle (after authenticating the request) and exercised by tests/test_webchat_adapter.py.

Parameters:
  • ws (Any) – The accepted fastapi.WebSocket to track.

  • user_id (str) – Stable identifier of the connecting user.

  • user_name (str) – Display name used for logging.

  • avatar (str) – Optional avatar URL/handle; defaults to empty.

Returns:

The newly registered session, to be passed back to disconnect() on close.

Return type:

_WebSocketSession

async disconnect(session)[source]

Deregister a closed WebSocket session.

Removes session (by identity) from the user’s session list under the lock, dropping the user_id key entirely once the user’s last tab disconnects so active_users stays accurate. Does not close the socket itself — that is the endpoint’s responsibility — and logs an info-level disconnect line. No network or Redis I/O.

Invoked by the FastAPI WebSocket endpoint when a WebChat connection drops (typically in a finally block) and exercised by tests/test_webchat_adapter.py.

Parameters:

session (_WebSocketSession) – The session previously returned by connect().

Return type:

None

get_sessions(user_id)[source]

Return a snapshot list of a user’s active sessions.

Returns a fresh copy of the per-user session list (empty when the user has none connected) so callers can iterate without holding the lock or racing concurrent connect/disconnect mutations. This is a synchronous, side-effect-free read.

Called by WebChatPlatform._push_to_user() to find the sockets to deliver an outbound frame to, and asserted in tests/test_webchat_adapter.py.

Parameters:

user_id (str) – The user whose sessions to fetch.

Returns:

A copy of the user’s current sessions; empty if none.

Return type:

list[_WebSocketSession]

property active_users: list[str]

List the user IDs that currently hold at least one session.

Reflects the keys of the in-memory registry; because disconnect() deletes a key once a user’s final session closes, membership here is an exact “is this user online” check. Side-effect-free read, asserted in tests/test_webchat_adapter.py.

Returns:

The user IDs with one or more live WebSocket sessions.

Return type:

list[str]

property total_connections: int

Count every live WebSocket session across all users.

Sums the per-user session lists, so a single user with three open tabs contributes three. Useful for capacity/diagnostics; side-effect-free. Asserted in tests/test_webchat_adapter.py.

Returns:

The total number of active WebSocket connections.

Return type:

int

class platforms.webchat.WebChatPlatform(message_handler, redis=None, **kwargs)[source]

Bases: PlatformAdapter

Browser-based chat platform using WebSocket for real-time comms.

Unlike Discord/Matrix, this adapter doesn’t connect to an external service. Instead it exposes a ConnectionManager that the FastAPI WebSocket endpoint populates. Outgoing messages are pushed to all active sessions for the target user.

Parameters:
  • message_handler (MessageHandler)

  • redis (Any)

  • kwargs (Any)

__init__(message_handler, redis=None, **kwargs)[source]

Construct the WebChat adapter and its connection registry.

Delegates to PlatformAdapter.__init__() to store the inbound message_handler callback, then creates a fresh ConnectionManager (held on self.connections) for the FastAPI WebSocket endpoint to populate, marks the adapter running (its lifecycle is owned by FastAPI rather than a network client), and stashes the optional Redis client used for cross-process SSE delivery. Also initialises self._sse_queues, the in-process fallback mapping of request_id -> asyncio.Queue used when no Redis client is present to stream SillyTavern /v1/chat/completions response chunks.

Performs no network or Redis I/O at construction time. Called by platforms.factory.create_platform() when the configured platform type is "webchat", and directly in the WebChat adapter and SSE pub/sub test modules.

Parameters:
  • message_handler (Callable[[IncomingMessage, PlatformAdapter], Awaitable[None]]) – Coroutine callback invoked with each inbound IncomingMessage; forwarded to the base class.

  • redis (Any) – Optional async Redis client. When provided, outbound SSE payloads are published to sg:sse:{request_id} so any process can consume them; when None the adapter falls back to the local self._sse_queues queues.

  • **kwargs (Any) – Additional keyword arguments accepted for signature compatibility with the platform factory; ignored.

Return type:

None

property name: str

Return the platform’s stable identifier string.

Implements the abstract PlatformAdapter.name property. The constant "webchat" is used throughout the system to route messages, key per-platform state, and branch behaviour by platform.

Read by background_tasks (e.g. to build the adapter map and log backfill), web.bot_admin and web.platforms_api for status reporting, and various message-processor modules that special-case platforms by name.

Returns:

The literal "webchat".

Return type:

str

property is_running: bool

Report whether the adapter is considered active.

Implements the abstract PlatformAdapter.is_running property. For WebChat there is no external client to connect, so the flag simply tracks the self._running value set in start()/stop() (True for the adapter’s whole FastAPI-managed lifetime by default).

Read by background_tasks, web.deps, web.bot_admin, and web.platforms_api to gate work and surface platform status, and by prompt_context() when computing available identities.

Returns:

True while the adapter is running, otherwise False.

Return type:

bool

property bot_identity: dict[str, str]

Describe the bot’s identity on the WebChat platform.

Implements the abstract PlatformAdapter.bot_identity property, returning the static handle the bot (“Star”) presents to web clients. The user_id "star" is the sender id stamped on outbound frames (see send()), and the mapping mirrors the shape produced by the other adapters so callers can treat all platforms uniformly.

Read by prompt_context() when assembling the list of platform identities, and by message_processor.user_message_format to label messages; also asserted in tests/test_webchat_adapter.py.

Returns:

Identity fields platform, user_id, display_name, and mention.

Return type:

dict[str, str]

async start()[source]

Mark the adapter active; there is no external client to connect.

Implements the abstract PlatformAdapter.start() coroutine. Unlike Discord/Matrix there is no socket to open here — the real connection lifecycle belongs to FastAPI’s WebSocket endpoint — so this merely sets self._running true (driving is_running) and logs a startup line. Performs no network or Redis I/O.

Called by the Gateway’s adapter-startup sequence when bringing platform adapters online (the same path that starts every other adapter).

Return type:

None

async stop()[source]

Mark the adapter inactive; the FastAPI sockets are unaffected.

Implements the abstract PlatformAdapter.stop() coroutine. Clears self._running (so is_running reports False) and logs a shutdown line, but does not tear down live WebSocket sessions — those are owned by the FastAPI endpoint and the ConnectionManager. Performs no network or Redis I/O.

Called by the Gateway’s adapter-shutdown sequence alongside the other platform adapters.

Return type:

None

async send(channel_id, text)[source]

Send a plain-text message to the user’s browser (or SSE stream).

Implements the abstract PlatformAdapter.send(). Builds a "message" frame stamped with a fresh UUID, the "star" sender id, and a time.time timestamp, then hands it to _push_to_user() for transport selection and delivery. The generated id is returned even if no session was reachable, so callers can correlate later edits.

In the running system this is reached via the Gateway’s outbound path: the inference worker emits an outbound event that core.outbound_consumer.OutboundConsumer consumes and dispatches to self._adapter.send for "message" payloads. Also exercised through the adapter in the WebChat tests.

Parameters:
  • channel_id (str) – Target channel, webchat:{user_id} or sse:{request_id}.

  • text (str) – The message body to deliver.

Returns:

The UUID assigned to the outbound message.

Return type:

str

async send_file(channel_id, data, filename, mimetype='application/octet-stream')[source]

Send a file/media attachment to the browser as a base64 JSON frame.

Implements the abstract PlatformAdapter.send_file(). Since the WebChat transport carries only JSON text frames, the raw bytes are base64-encoded and embedded in a "file" payload (with filename, mimetype, size, timestamp, and a fresh UUID), then delivered through _push_to_user(). Returns a synthetic webchat://file/... URL so tools can reference the just-sent file the way they would a hosted attachment on other platforms.

Reached in production via core.outbound_consumer.OutboundConsumer for "file" payloads, and directly by the many media tools and background agents (image/video/TTS generators, etc.) that call ctx.adapter.send_file. Also exercised in the WebChat tests.

Parameters:
  • channel_id (str) – Target channel, webchat:{user_id} or sse:{request_id}.

  • data (bytes) – Raw file contents to encode and send.

  • filename (str) – Display/download name for the attachment.

  • mimetype (str) – MIME type; defaults to "application/octet-stream".

Returns:

A webchat://file/{file_id}/{filename} pseudo-URL referencing the sent file.

Return type:

str | None

async send_with_buttons(channel_id, text, view=None)[source]

Send a message accompanied by interactive choice buttons.

Implements the abstract PlatformAdapter.send_with_buttons(), powering S.N.E.S.-style choice prompts in the browser UI. Because the same outbound call has to work across platforms, this normalises two shapes of view into a flat list of button dicts: a raw list is passed through as-is, while a Discord discord.ui.View is introspected via its children to extract each button’s label, custom_id, optional emoji, and a CSS-friendly style hint derived from the Discord button style. The normalised buttons ride along in a "message" frame (fresh UUID, "star" sender) handed to _push_to_user().

Reached in production via core.outbound_consumer.OutboundConsumer for "buttons" payloads, and from the message processor (message_processor.processor / message_processor.generate_and_send) when offering interactive choices. Also exercised in the WebChat tests.

Parameters:
  • channel_id (str) – Target channel, webchat:{user_id} or sse:{request_id}.

  • text (str) – The message body shown above the buttons.

  • view (Any) – Either a list of button dicts ([{"label": ..., "custom_id": ...}]) or a discord.ui.View whose buttons are extracted; None sends no buttons.

Returns:

The UUID assigned to the outbound message.

Return type:

str

async edit_message(channel_id, message_id, new_text)[source]

Update the text of a previously sent browser message.

Implements the abstract PlatformAdapter.edit_message(). Emits an "edit" frame keyed by the original message_id (the UUID returned by send() / send_with_buttons()) so the browser client can replace that message’s body in place, then delivers it through _push_to_user().

Reached in production by the streaming output tool tools.stream_to_channel, which repeatedly edits a placeholder message as the model streams; it calls self._adapter.edit_message.

Parameters:
  • channel_id (str) – Target channel, webchat:{user_id} or sse:{request_id}.

  • message_id (str) – UUID of the message to edit.

  • new_text (str) – Replacement body text.

Returns:

True if the edit frame reached at least one session, else False (propagated from _push_to_user()).

Return type:

bool

async start_typing(channel_id)[source]

Show the bot’s typing indicator in the browser.

Implements the abstract PlatformAdapter.start_typing(). Pushes a {"type": "typing", "active": True} frame via _push_to_user() so the web client can render an “is typing” affordance while a reply is being generated. Best-effort: delivery failures are swallowed by the fan-out helper.

Reached in production via core.outbound_consumer.OutboundConsumer (which prefers start_typing when present) and from the message processor around reply generation. Also exercised in the WebChat tests.

Parameters:

channel_id (str) – Target channel, webchat:{user_id} or sse:{request_id}.

Return type:

None

async stop_typing(channel_id)[source]

Hide the bot’s typing indicator in the browser.

Implements the abstract PlatformAdapter.stop_typing(), the counterpart to start_typing(). Pushes a {"type": "typing", "active": False} frame via _push_to_user() once a reply has been sent (or generation aborts) so the web client clears the typing affordance. Best-effort; delivery failures are swallowed.

Reached in production via core.outbound_consumer.OutboundConsumer and from the message processor after replies. Also exercised in the WebChat tests.

Parameters:

channel_id (str) – Target channel, webchat:{user_id} or sse:{request_id}.

Return type:

None

create_sse_queue(request_id)[source]

Open an in-process queue for a SillyTavern SSE response stream.

Provides the local fallback transport used when no Redis client is configured: it registers a fresh asyncio.Queue under request_id in self._sse_queues so that outbound chunks routed to an sse:{request_id} channel (see _push_to_user()) can be picked up and streamed back to the SillyTavern client. A None enqueued later signals end-of-stream to the reader. Synchronous and side-effecting only on the in-memory dict.

Called by web.sillytavern when handling a /v1/chat/completions request, and exercised by the WebChat and SSE pub/sub tests. Must be paired with remove_sse_queue().

Parameters:

request_id (str) – Correlation id of the SillyTavern request.

Returns:

The newly created response queue for that request.

Return type:

Queue

remove_sse_queue(request_id)[source]

Discard the in-process SSE queue for a finished request.

Pops request_id from self._sse_queues (a no-op if absent) so the queue created by create_sse_queue() does not leak after the SillyTavern stream completes, errors, or times out. Synchronous and side-effect-free beyond the in-memory dict.

Called by web.sillytavern in the cleanup/finally paths around each streamed response, and in the WebChat tests.

Parameters:

request_id (str) – Correlation id whose queue should be removed.

Return type:

None

async fetch_history(channel_id, limit=100)[source]

Return no history – WebChat keeps none of its own.

Implements the abstract PlatformAdapter.fetch_history(). Unlike Discord/Matrix, the WebChat transport has no upstream service to page back through: conversation history for web users lives in Redis and is retrieved by the message pipeline directly, so this adapter intentionally returns an empty list rather than fabricating one. No I/O.

Called wherever the codebase backfills or queries per-platform history through the adapter interface — e.g. background_tasks, message_processor.history_backfill, core.outbound_consumer.OutboundConsumer, and tools such as cross_channel_query / admin_whisper via ctx.adapter — all of which simply get nothing back for WebChat.

Parameters:
  • channel_id (str) – The WebChat channel (ignored).

  • limit (int) – Maximum messages requested (ignored); defaults to 100.

Returns:

Always an empty list.

Return type:

list[HistoricalMessage]