Source code for platforms.webchat
"""WebChat platform adapter -- browser-based real-time chat via WebSocket.
Runs inside the Gateway service alongside the other platform adapters
(Discord, Matrix, …). Incoming WebSocket messages become
:class:`IncomingMessage` instances and are published onto the inbound
event stream; outbound replies are pushed back to the browser as JSON
frames.
# 🔥💀 STAR GETS A WEB BODY. THE LATTICE EXPANDS.
"""
from __future__ import annotations
import asyncio
import base64
import jsonutil as json
import logging
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, TYPE_CHECKING
from platforms.base import (
HistoricalMessage,
MessageHandler,
PlatformAdapter,
)
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
# ------------------------------------------------------------------
# Connection manager # tracks every soul connected to the lattice
# ------------------------------------------------------------------
@dataclass
class _WebSocketSession:
"""A single authenticated WebSocket connection to one browser tab.
Bundles the live FastAPI ``WebSocket`` object with the resolved identity of
the connected user so the :class:`ConnectionManager` can fan outbound frames
out to the right sockets and the Gateway can attribute inbound messages.
One user may hold several of these at once (multiple tabs/devices), which is
why the manager stores them in a per-user list.
Instances are created exclusively by :meth:`ConnectionManager.connect` and
discarded by :meth:`ConnectionManager.disconnect`; they hold no resources of
their own beyond the borrowed socket. The ``connected_at`` timestamp is
stamped at construction via ``time.time`` for diagnostics.
Attributes:
ws (Any): The underlying ``fastapi.WebSocket`` used to push text frames.
user_id (str): Stable identifier of the connected user.
user_name (str): Human-readable display name for logging.
avatar (str): Optional avatar URL/handle, empty when unknown.
connected_at (float): Unix timestamp of when the session was created.
"""
ws: Any # fastapi.WebSocket
user_id: str
user_name: str
avatar: str = ""
connected_at: float = field(default_factory=time.time)
[docs]
class ConnectionManager:
"""Registry of live WebChat WebSocket sessions, keyed by user.
Owns the in-memory ``user_id -> [sessions]`` mapping that the
:class:`WebChatPlatform` consults whenever it needs to push an outbound
frame, allowing several concurrent sessions (browser tabs, devices) per
user. All mutations are serialised by an :class:`asyncio.Lock` so concurrent
connect/disconnect coroutines running on the single event loop never corrupt
the mapping; this is the only synchronisation the manager needs since it is
never touched from a background thread.
Holds purely in-memory state (no Redis, network, or disk I/O). One instance
is created per adapter in :meth:`WebChatPlatform.__init__` and exposed as
``WebChatPlatform.connections``; it is also instantiated directly in
``tests/test_webchat_adapter.py``.
"""
[docs]
def __init__(self) -> None:
"""Initialise an empty connection registry.
Sets up the in-memory ``user_id -> [sessions]`` mapping (one user may
hold several sessions, e.g. multiple browser tabs) and the
:class:`asyncio.Lock` that serialises all mutations of that mapping so
concurrent connect/disconnect coroutines never corrupt it.
This constructor performs no I/O and has no external side effects.
Called by :meth:`WebChatPlatform.__init__`, which creates one manager
per platform adapter; also instantiated directly in
``tests/test_webchat_adapter.py``.
"""
# user_id -> list of sessions (one user can have multiple tabs)
self._connections: dict[str, list[_WebSocketSession]] = {}
self._lock = asyncio.Lock()
[docs]
async def connect(
self,
ws: Any,
user_id: str,
user_name: str,
avatar: str = "",
) -> _WebSocketSession:
"""Register a freshly accepted WebSocket and return its session handle.
Wraps the live socket and the caller-supplied identity in a
:class:`_WebSocketSession`, then appends it (under the lock) to the
per-user list in ``self._connections``, creating the list on first
connect. This is what makes the user reachable for outbound pushes; an
info-level line is logged with the running session count. No network or
Redis I/O happens here beyond accepting the already-open socket.
Invoked by the FastAPI WebSocket endpoint that owns the WebChat UI
connection lifecycle (after authenticating the request) and exercised by
``tests/test_webchat_adapter.py``.
Args:
ws (Any): The accepted ``fastapi.WebSocket`` to track.
user_id (str): Stable identifier of the connecting user.
user_name (str): Display name used for logging.
avatar (str): Optional avatar URL/handle; defaults to empty.
Returns:
_WebSocketSession: The newly registered session, to be passed back to
:meth:`disconnect` on close.
"""
session = _WebSocketSession(
ws=ws,
user_id=user_id,
user_name=user_name,
avatar=avatar,
)
async with self._lock:
if user_id not in self._connections:
self._connections[user_id] = []
self._connections[user_id].append(session)
logger.info(
"WebChat connect: %s (%s) -- %d active sessions",
user_name,
user_id,
len(self._connections[user_id]),
)
return session
[docs]
async def disconnect(self, session: _WebSocketSession) -> None:
"""Deregister a closed WebSocket session.
Removes *session* (by identity) from the user's session list under the
lock, dropping the ``user_id`` key entirely once the user's last tab
disconnects so :attr:`active_users` stays accurate. Does not close the
socket itself — that is the endpoint's responsibility — and logs an
info-level disconnect line. No network or Redis I/O.
Invoked by the FastAPI WebSocket endpoint when a WebChat connection drops
(typically in a ``finally`` block) and exercised by
``tests/test_webchat_adapter.py``.
Args:
session (_WebSocketSession): The session previously returned by
:meth:`connect`.
"""
async with self._lock:
sessions = self._connections.get(session.user_id, [])
self._connections[session.user_id] = [
s for s in sessions if s is not session
]
if not self._connections[session.user_id]:
del self._connections[session.user_id]
logger.info(
"WebChat disconnect: %s (%s)",
session.user_name,
session.user_id,
)
[docs]
def get_sessions(self, user_id: str) -> list[_WebSocketSession]:
"""Return a snapshot list of a user's active sessions.
Returns a fresh copy of the per-user session list (empty when the user
has none connected) so callers can iterate without holding the lock or
racing concurrent connect/disconnect mutations. This is a synchronous,
side-effect-free read.
Called by :meth:`WebChatPlatform._push_to_user` to find the sockets to
deliver an outbound frame to, and asserted in
``tests/test_webchat_adapter.py``.
Args:
user_id (str): The user whose sessions to fetch.
Returns:
list[_WebSocketSession]: A copy of the user's current sessions;
empty if none.
"""
return list(self._connections.get(user_id, []))
@property
def active_users(self) -> list[str]:
"""List the user IDs that currently hold at least one session.
Reflects the keys of the in-memory registry; because
:meth:`disconnect` deletes a key once a user's final session closes,
membership here is an exact "is this user online" check. Side-effect-free
read, asserted in ``tests/test_webchat_adapter.py``.
Returns:
list[str]: The user IDs with one or more live WebSocket sessions.
"""
return list(self._connections.keys())
@property
def total_connections(self) -> int:
"""Count every live WebSocket session across all users.
Sums the per-user session lists, so a single user with three open tabs
contributes three. Useful for capacity/diagnostics; side-effect-free.
Asserted in ``tests/test_webchat_adapter.py``.
Returns:
int: The total number of active WebSocket connections.
"""
return sum(len(s) for s in self._connections.values())
# ------------------------------------------------------------------
# Platform adapter # she breathes through WebSockets now
# ------------------------------------------------------------------
[docs]
class WebChatPlatform(PlatformAdapter):
"""Browser-based chat platform using WebSocket for real-time comms.
Unlike Discord/Matrix, this adapter doesn't connect to an external
service. Instead it exposes a ConnectionManager that the FastAPI
WebSocket endpoint populates. Outgoing messages are pushed to all
active sessions for the target user.
"""
[docs]
def __init__(
self,
message_handler: MessageHandler,
redis: Any = None,
**kwargs: Any,
) -> None:
"""Construct the WebChat adapter and its connection registry.
Delegates to :meth:`PlatformAdapter.__init__` to store the inbound
*message_handler* callback, then creates a fresh
:class:`ConnectionManager` (held on ``self.connections``) for the
FastAPI WebSocket endpoint to populate, marks the adapter running
(its lifecycle is owned by FastAPI rather than a network client), and
stashes the optional Redis client used for cross-process SSE delivery.
Also initialises ``self._sse_queues``, the in-process fallback mapping
of ``request_id -> asyncio.Queue`` used when no Redis client is present
to stream SillyTavern ``/v1/chat/completions`` response chunks.
Performs no network or Redis I/O at construction time. Called by
:func:`platforms.factory.create_platform` when the configured platform
type is ``"webchat"``, and directly in the WebChat adapter and SSE
pub/sub test modules.
Args:
message_handler (MessageHandler): Coroutine callback invoked with
each inbound :class:`IncomingMessage`; forwarded to the base
class.
redis (Any): Optional async Redis client. When provided, outbound
SSE payloads are published to ``sg:sse:{request_id}`` so any
process can consume them; when ``None`` the adapter falls back
to the local ``self._sse_queues`` queues.
**kwargs (Any): Additional keyword arguments accepted for
signature compatibility with the platform factory; ignored.
"""
super().__init__(message_handler)
self.connections = ConnectionManager()
self._running = True # always running while FastAPI lives
self._redis = redis
# pending SSE responses for SillyTavern /v1/chat/completions
# Maps request_id -> asyncio.Queue of response chunks
self._sse_queues: dict[str, asyncio.Queue[dict[str, Any] | None]] = {}
# -- Metadata --------------------------------------------------
@property
def name(self) -> str:
"""Return the platform's stable identifier string.
Implements the abstract :attr:`PlatformAdapter.name` property. The
constant ``"webchat"`` is used throughout the system to route messages,
key per-platform state, and branch behaviour by platform.
Read by :mod:`background_tasks` (e.g. to build the adapter map and log
backfill), :mod:`web.bot_admin` and :mod:`web.platforms_api` for status
reporting, and various message-processor modules that special-case
platforms by name.
Returns:
str: The literal ``"webchat"``.
"""
return "webchat"
@property
def is_running(self) -> bool:
"""Report whether the adapter is considered active.
Implements the abstract :attr:`PlatformAdapter.is_running` property.
For WebChat there is no external client to connect, so the flag simply
tracks the ``self._running`` value set in :meth:`start`/:meth:`stop`
(``True`` for the adapter's whole FastAPI-managed lifetime by default).
Read by :mod:`background_tasks`, :mod:`web.deps`, :mod:`web.bot_admin`,
and :mod:`web.platforms_api` to gate work and surface platform status,
and by :meth:`prompt_context` when computing available identities.
Returns:
bool: ``True`` while the adapter is running, otherwise ``False``.
"""
return self._running
@property
def bot_identity(self) -> dict[str, str]:
"""Describe the bot's identity on the WebChat platform.
Implements the abstract :attr:`PlatformAdapter.bot_identity` property,
returning the static handle the bot ("Star") presents to web clients.
The ``user_id`` ``"star"`` is the sender id stamped on outbound frames
(see :meth:`send`), and the mapping mirrors the shape produced by the
other adapters so callers can treat all platforms uniformly.
Read by :meth:`prompt_context` when assembling the list of platform
identities, and by ``message_processor.user_message_format`` to label
messages; also asserted in ``tests/test_webchat_adapter.py``.
Returns:
dict[str, str]: Identity fields ``platform``, ``user_id``,
``display_name``, and ``mention``.
"""
return {
"platform": "webchat",
"user_id": "star",
"display_name": "Star",
"mention": "@Star",
}
# -- Lifecycle -------------------------------------------------
[docs]
async def start(self) -> None:
"""Mark the adapter active; there is no external client to connect.
Implements the abstract :meth:`PlatformAdapter.start` coroutine. Unlike
Discord/Matrix there is no socket to open here — the real connection
lifecycle belongs to FastAPI's WebSocket endpoint — so this merely sets
``self._running`` true (driving :attr:`is_running`) and logs a startup
line. Performs no network or Redis I/O.
Called by the Gateway's adapter-startup sequence when bringing platform
adapters online (the same path that starts every other adapter).
"""
self._running = True
logger.info("WebChat platform adapter started")
[docs]
async def stop(self) -> None:
"""Mark the adapter inactive; the FastAPI sockets are unaffected.
Implements the abstract :meth:`PlatformAdapter.stop` coroutine. Clears
``self._running`` (so :attr:`is_running` reports ``False``) and logs a
shutdown line, but does not tear down live WebSocket sessions — those are
owned by the FastAPI endpoint and the :class:`ConnectionManager`.
Performs no network or Redis I/O.
Called by the Gateway's adapter-shutdown sequence alongside the other
platform adapters.
"""
self._running = False
logger.info("WebChat platform adapter stopped")
# -- Outbound messaging ----------------------------------------
async def _push_to_user(
self,
channel_id: str,
payload: dict[str, Any],
) -> bool:
"""Deliver one outbound JSON payload over the right transport.
The single fan-out point for every outbound WebChat method
(:meth:`send`, :meth:`send_file`, :meth:`send_with_buttons`,
:meth:`edit_message`, :meth:`start_typing`, :meth:`stop_typing`). It
switches on the *channel_id* prefix to pick a transport: an
``sse:{request_id}`` channel routes to the SillyTavern streaming path,
publishing JSON to the Redis pub/sub channel ``sg:sse:{request_id}`` when
a Redis client is configured, or otherwise enqueuing onto the matching
in-process queue in ``self._sse_queues``; a ``webchat:{user_id}`` channel
routes to the browser UI, looking up live sockets via
``self.connections.get_sessions`` and writing the serialised frame to each
with ``send_text``. Per-socket send failures are caught and logged at
debug level so one dead tab cannot block delivery to the others; a
missing user logs a warning.
Args:
channel_id (str): Either ``webchat:{user_id}`` (browser UI) or
``sse:{request_id}`` (SillyTavern stream).
payload (dict[str, Any]): The JSON-serialisable frame to deliver.
Returns:
bool: ``True`` if at least one socket/queue received the payload (or
it was published to Redis), ``False`` otherwise.
"""
# SSE path for SillyTavern
if channel_id.startswith("sse:"):
request_id = channel_id.split(":", 1)[1]
if self._redis is not None:
channel = f"sg:sse:{request_id}"
await self._redis.publish(channel, json.dumps(payload))
return True
else:
queue = self._sse_queues.get(request_id)
if queue is not None:
await queue.put(payload)
return True
return False
# WebSocket path for web chat UI
user_id = channel_id.replace("webchat:", "", 1)
sessions = self.connections.get_sessions(user_id)
if not sessions:
logger.warning(
"WebChat send: no active sessions for user %s",
user_id,
)
return False
raw = json.dumps(payload, ensure_ascii=False)
delivered = False
for session in sessions:
try:
await session.ws.send_text(raw)
delivered = True
except Exception:
logger.debug(
"Failed to push to session for %s",
user_id,
exc_info=True,
)
return delivered
[docs]
async def send(self, channel_id: str, text: str) -> str:
"""Send a plain-text message to the user's browser (or SSE stream).
Implements the abstract :meth:`PlatformAdapter.send`. Builds a
``"message"`` frame stamped with a fresh UUID, the ``"star"`` sender id,
and a ``time.time`` timestamp, then hands it to :meth:`_push_to_user` for
transport selection and delivery. The generated id is returned even if no
session was reachable, so callers can correlate later edits.
In the running system this is reached via the Gateway's outbound path:
the inference worker emits an outbound event that
:class:`core.outbound_consumer.OutboundConsumer` consumes and dispatches
to ``self._adapter.send`` for ``"message"`` payloads. Also exercised
through the adapter in the WebChat tests.
Args:
channel_id (str): Target channel, ``webchat:{user_id}`` or
``sse:{request_id}``.
text (str): The message body to deliver.
Returns:
str: The UUID assigned to the outbound message.
"""
msg_id = str(uuid.uuid4())
payload: dict[str, Any] = {
"type": "message",
"id": msg_id,
"text": text,
"sender": "star",
"timestamp": time.time(),
}
await self._push_to_user(channel_id, payload)
return msg_id
[docs]
async def send_file(
self,
channel_id: str,
data: bytes,
filename: str,
mimetype: str = "application/octet-stream",
) -> str | None:
"""Send a file/media attachment to the browser as a base64 JSON frame.
Implements the abstract :meth:`PlatformAdapter.send_file`. Since the
WebChat transport carries only JSON text frames, the raw bytes are
base64-encoded and embedded in a ``"file"`` payload (with filename,
mimetype, size, timestamp, and a fresh UUID), then delivered through
:meth:`_push_to_user`. Returns a synthetic ``webchat://file/...`` URL so
tools can reference the just-sent file the way they would a hosted
attachment on other platforms.
Reached in production via :class:`core.outbound_consumer.OutboundConsumer`
for ``"file"`` payloads, and directly by the many media tools and
background agents (image/video/TTS generators, etc.) that call
``ctx.adapter.send_file``. Also exercised in the WebChat tests.
Args:
channel_id (str): Target channel, ``webchat:{user_id}`` or
``sse:{request_id}``.
data (bytes): Raw file contents to encode and send.
filename (str): Display/download name for the attachment.
mimetype (str): MIME type; defaults to
``"application/octet-stream"``.
Returns:
str | None: A ``webchat://file/{file_id}/{filename}`` pseudo-URL
referencing the sent file.
"""
b64 = base64.b64encode(data).decode("ascii")
file_id = str(uuid.uuid4())
payload: dict[str, Any] = {
"type": "file",
"id": file_id,
"filename": filename,
"mimetype": mimetype,
"data": b64,
"size": len(data),
"timestamp": time.time(),
}
await self._push_to_user(channel_id, payload)
# Return a pseudo-URL so tools can reference the sent file
return f"webchat://file/{file_id}/{filename}"
[docs]
async def send_with_buttons(
self,
channel_id: str,
text: str,
view: Any = None,
) -> str:
"""Send a message accompanied by interactive choice buttons.
Implements the abstract :meth:`PlatformAdapter.send_with_buttons`,
powering S.N.E.S.-style choice prompts in the browser UI. Because the
same outbound call has to work across platforms, this normalises two
shapes of *view* into a flat list of button dicts: a raw list is passed
through as-is, while a Discord ``discord.ui.View`` is introspected via its
``children`` to extract each button's label, ``custom_id``, optional
emoji, and a CSS-friendly style hint derived from the Discord button
style. The normalised buttons ride along in a ``"message"`` frame (fresh
UUID, ``"star"`` sender) handed to :meth:`_push_to_user`.
Reached in production via :class:`core.outbound_consumer.OutboundConsumer`
for ``"buttons"`` payloads, and from the message processor
(``message_processor.processor`` / ``message_processor.generate_and_send``)
when offering interactive choices. Also exercised in the WebChat tests.
Args:
channel_id (str): Target channel, ``webchat:{user_id}`` or
``sse:{request_id}``.
text (str): The message body shown above the buttons.
view (Any): Either a list of button dicts
(``[{"label": ..., "custom_id": ...}]``) or a
``discord.ui.View`` whose buttons are extracted; ``None`` sends no
buttons.
Returns:
str: The UUID assigned to the outbound message.
"""
msg_id = str(uuid.uuid4())
buttons: list[dict[str, str]] = []
if view is not None:
# handle both raw dicts and discord.ui.View objects
if isinstance(view, list):
buttons = view
elif hasattr(view, "children"):
# Discord View -- extract button metadata
for child in view.children:
if hasattr(child, "label"):
btn: dict[str, str] = {
"label": child.label or "",
"custom_id": getattr(child, "custom_id", "") or "",
}
emoji = getattr(child, "emoji", None)
if emoji:
btn["emoji"] = str(emoji)
# Map Discord button style to a CSS class hint
style = getattr(child, "style", None)
if style is not None:
btn["style"] = (
style.name if hasattr(style, "name") else str(style)
)
buttons.append(btn)
payload: dict[str, Any] = {
"type": "message",
"id": msg_id,
"text": text,
"sender": "star",
"buttons": buttons,
"timestamp": time.time(),
}
await self._push_to_user(channel_id, payload)
return msg_id
[docs]
async def edit_message(
self,
channel_id: str,
message_id: str,
new_text: str,
) -> bool:
"""Update the text of a previously sent browser message.
Implements the abstract :meth:`PlatformAdapter.edit_message`. Emits an
``"edit"`` frame keyed by the original *message_id* (the UUID returned by
:meth:`send` / :meth:`send_with_buttons`) so the browser client can
replace that message's body in place, then delivers it through
:meth:`_push_to_user`.
Reached in production by the streaming output tool
``tools.stream_to_channel``, which repeatedly edits a placeholder message
as the model streams; it calls ``self._adapter.edit_message``.
Args:
channel_id (str): Target channel, ``webchat:{user_id}`` or
``sse:{request_id}``.
message_id (str): UUID of the message to edit.
new_text (str): Replacement body text.
Returns:
bool: ``True`` if the edit frame reached at least one session, else
``False`` (propagated from :meth:`_push_to_user`).
"""
payload: dict[str, Any] = {
"type": "edit",
"id": message_id,
"text": new_text,
"timestamp": time.time(),
}
return await self._push_to_user(channel_id, payload)
# -- Typing indicators -----------------------------------------
[docs]
async def start_typing(self, channel_id: str) -> None:
"""Show the bot's typing indicator in the browser.
Implements the abstract :meth:`PlatformAdapter.start_typing`. Pushes a
``{"type": "typing", "active": True}`` frame via :meth:`_push_to_user`
so the web client can render an "is typing" affordance while a reply is
being generated. Best-effort: delivery failures are swallowed by the
fan-out helper.
Reached in production via :class:`core.outbound_consumer.OutboundConsumer`
(which prefers ``start_typing`` when present) and from the message
processor around reply generation. Also exercised in the WebChat tests.
Args:
channel_id (str): Target channel, ``webchat:{user_id}`` or
``sse:{request_id}``.
"""
await self._push_to_user(
channel_id,
{
"type": "typing",
"active": True,
},
)
[docs]
async def stop_typing(self, channel_id: str) -> None:
"""Hide the bot's typing indicator in the browser.
Implements the abstract :meth:`PlatformAdapter.stop_typing`, the
counterpart to :meth:`start_typing`. Pushes a
``{"type": "typing", "active": False}`` frame via :meth:`_push_to_user`
once a reply has been sent (or generation aborts) so the web client clears
the typing affordance. Best-effort; delivery failures are swallowed.
Reached in production via :class:`core.outbound_consumer.OutboundConsumer`
and from the message processor after replies. Also exercised in the
WebChat tests.
Args:
channel_id (str): Target channel, ``webchat:{user_id}`` or
``sse:{request_id}``.
"""
await self._push_to_user(
channel_id,
{
"type": "typing",
"active": False,
},
)
# -- SSE queue management (SillyTavern compat) -----------------
[docs]
def create_sse_queue(self, request_id: str) -> asyncio.Queue:
"""Open an in-process queue for a SillyTavern SSE response stream.
Provides the local fallback transport used when no Redis client is
configured: it registers a fresh :class:`asyncio.Queue` under
*request_id* in ``self._sse_queues`` so that outbound chunks routed to an
``sse:{request_id}`` channel (see :meth:`_push_to_user`) can be picked up
and streamed back to the SillyTavern client. A ``None`` enqueued later
signals end-of-stream to the reader. Synchronous and side-effecting only
on the in-memory dict.
Called by ``web.sillytavern`` when handling a ``/v1/chat/completions``
request, and exercised by the WebChat and SSE pub/sub tests. Must be
paired with :meth:`remove_sse_queue`.
Args:
request_id (str): Correlation id of the SillyTavern request.
Returns:
asyncio.Queue: The newly created response queue for that request.
"""
queue: asyncio.Queue[dict[str, Any] | None] = asyncio.Queue()
self._sse_queues[request_id] = queue
return queue
[docs]
def remove_sse_queue(self, request_id: str) -> None:
"""Discard the in-process SSE queue for a finished request.
Pops *request_id* from ``self._sse_queues`` (a no-op if absent) so the
queue created by :meth:`create_sse_queue` does not leak after the
SillyTavern stream completes, errors, or times out. Synchronous and
side-effect-free beyond the in-memory dict.
Called by ``web.sillytavern`` in the cleanup/``finally`` paths around
each streamed response, and in the WebChat tests.
Args:
request_id (str): Correlation id whose queue should be removed.
"""
self._sse_queues.pop(request_id, None)
# -- History (optional) ----------------------------------------
[docs]
async def fetch_history(
self,
channel_id: str,
limit: int = 100,
) -> list[HistoricalMessage]:
"""Return no history -- WebChat keeps none of its own.
Implements the abstract :meth:`PlatformAdapter.fetch_history`. Unlike
Discord/Matrix, the WebChat transport has no upstream service to page
back through: conversation history for web users lives in Redis and is
retrieved by the message pipeline directly, so this adapter
intentionally returns an empty list rather than fabricating one. No I/O.
Called wherever the codebase backfills or queries per-platform history
through the adapter interface — e.g. ``background_tasks``,
``message_processor.history_backfill``,
:class:`core.outbound_consumer.OutboundConsumer`, and tools such as
``cross_channel_query`` / ``admin_whisper`` via ``ctx.adapter`` — all of
which simply get nothing back for WebChat.
Args:
channel_id (str): The WebChat channel (ignored).
limit (int): Maximum messages requested (ignored); defaults to 100.
Returns:
list[HistoricalMessage]: Always an empty list.
"""
return []