Source code for core.proxy_adapter

"""Proxy PlatformAdapter that publishes to Redis instead of calling SDK APIs."""

import logging
import base64
import uuid
import json
import asyncio
from typing import Any
from collections import defaultdict

from core.event_bus import RedisEventBus
from platforms.base import HistoricalMessage
from datetime import datetime, timezone

logger = logging.getLogger("stargazer.proxy_adapter")


[docs] class ProxyPlatformAdapter: """Drop-in replacement for PlatformAdapter on worker nodes. Instead of holding a live Discord/Matrix client, this adapter serializes all send() calls into outbound stream envelopes. The gateway consumes these and dispatches to the real adapter. """
[docs] def __init__(self, event_bus: RedisEventBus, platform: str, trace_id: str | None = None) -> None: """Initialize the proxy adapter for one platform on a worker node. Stores the shared :class:`~core.event_bus.RedisEventBus` used to publish outbound envelopes, the platform name (e.g. ``"discord"``) that selects the destination stream, and an optional ``trace_id`` used to derive deterministic per-action idempotency keys. A ``defaultdict(int)`` of per-action-type counters backs that key derivation in :meth:`_maybe_inject_key`. This constructor has no side effects of its own. It is instantiated by the inference worker when it needs an adapter to hand to the message processor / tools (via ``ToolContext.adapter``) so their ``send`` calls become Redis envelopes instead of live SDK calls. Args: event_bus: Shared event bus whose ``publish_outbound`` writes the outbound Redis Stream and whose ``_redis`` backs RPC replies. platform: Platform identifier selecting the outbound stream and gateway consumer group (e.g. ``"discord"``, ``"matrix"``). trace_id: Optional request trace id; when set, send actions get a derived ``message_key`` for gateway-side idempotency. """ self._event_bus = event_bus self._platform = platform self.platform_name = platform self._trace_id = trace_id self._action_counters = defaultdict(int)
def _maybe_inject_key(self, action_type: str, kwargs: dict[str, Any]) -> None: """Inject a deterministic ``message_key`` into outbound kwargs for idempotency. When a ``trace_id`` is set and the caller has not already supplied a ``message_key``, this bumps the per-``action_type`` counter and writes a key of the form ``msg:{trace_id}:{action_type}:{n}`` into ``kwargs`` in place. The gateway's outbound consumer uses that key to deduplicate and claim sends so a retried/replayed inference run does not post the same message twice. It mutates the passed ``kwargs`` dict and increments ``self._action_counters[action_type]``; it has no other side effects. Called by the send-style methods :meth:`send`, :meth:`send_file`, :meth:`send_with_buttons`, and :meth:`add_reaction` just before they publish their envelope. Global, non-message actions (presence, typing) intentionally do not call it. Args: action_type: Logical action label (e.g. ``"message"``, ``"file"``, ``"buttons"``, ``"reaction"``) namespacing the counter and key. kwargs: Outbound keyword arguments mutated in place to receive the derived ``message_key`` when one is not already present. """ if "message_key" not in kwargs and self._trace_id: self._action_counters[action_type] += 1 kwargs["message_key"] = f"msg:{self._trace_id}:{action_type}:{self._action_counters[action_type]}"
[docs] async def send(self, channel_id: str, text: str, **kwargs) -> None: """Publish a text message envelope for the Gateway to deliver. Serializes a ``type: "message"`` envelope (channel, text, and any non-``None`` extra kwargs such as ``reply_to``) instead of calling a live SDK; the Gateway's outbound consumer reconstructs and sends it on the real client. It first calls :meth:`_maybe_inject_key` to stamp a ``message_key`` for idempotency, then awaits ``event_bus.publish_outbound(platform, ...)``, which XADDs onto the ``sg:stream:outbound:{platform}`` stream. Called broadly across the worker: the message processor and command router (``message_processor/processor.py``, ``command_router.py``), tools via ``ctx.adapter.send`` / ``platform.send``, and ``task_manager.py`` all use it to emit chat output. Args: channel_id: Destination channel/instance identifier. text: Message body to send. **kwargs: Extra envelope fields (e.g. ``reply_to``, ``message_key``); ``None`` values are dropped. """ self._maybe_inject_key("message", kwargs) await self._event_bus.publish_outbound(self._platform, { "channel_id": channel_id, "type": "message", "text": text, **{k: v for k, v in kwargs.items() if v is not None}, })
[docs] async def send_file(self, channel_id: str, file_data: bytes, filename: str = "file", mimetype: str = "application/octet-stream", **kwargs) -> None: """Publish a file-attachment envelope for the Gateway to upload. Base64-encodes the raw ``file_data`` so the bytes survive Redis Stream serialization, then emits a ``type: "file"`` envelope carrying the channel, encoded payload, filename, and mimetype. It calls :meth:`_maybe_inject_key` (action ``"file"``) for idempotency and awaits ``event_bus.publish_outbound``, XADDing onto ``sg:stream:outbound:{platform}``; the Gateway's outbound consumer decodes and uploads via the live adapter's ``send_file``. Called by the outbound consumer's own delegation path and by many media tools through ``ctx.adapter.send_file`` (e.g. ``tools/generate_image.py``, ``tools/elevenlabs_tts.py``, ``tools/render_mermaid.py``) and background agents to attach generated artifacts. Args: channel_id: Destination channel/instance identifier. file_data: Raw file bytes to send (base64-encoded for transport). filename: Suggested attachment filename. Defaults to ``"file"``. mimetype: MIME type of the attachment. Defaults to ``"application/octet-stream"``. **kwargs: Extra envelope fields (e.g. caption ``text``, ``message_key``); ``None`` values are dropped. """ self._maybe_inject_key("file", kwargs) encoded_data = base64.b64encode(file_data).decode() await self._event_bus.publish_outbound(self._platform, { "channel_id": channel_id, "type": "file", "file_data": encoded_data, "filename": filename, "mimetype": mimetype, **{k: v for k, v in kwargs.items() if v is not None}, })
[docs] async def send_with_buttons(self, channel_id: str, text: str, view: Any = None, **kwargs) -> None: """Publish a text-with-buttons envelope, normalizing the button spec. Accepts either a plain list of button dicts or a Discord-style ``view`` object and flattens it into a serializable ``buttons`` list (label, ``custom_id``, optional ``emoji``, and a string-ified ``style``) since live UI ``view`` objects cannot cross Redis. An explicit ``buttons`` kwarg, if present, overrides whatever was derived from ``view``. After building the list it calls :meth:`_maybe_inject_key` (action ``"buttons"``) and awaits ``event_bus.publish_outbound`` to XADD a ``type: "buttons"`` envelope onto ``sg:stream:outbound:{platform}``; the Gateway's outbound consumer rebuilds the interactive view. Called by the message processor's send path (``message_processor/generate_and_send.py``, ``processor.py``) and the outbound consumer's delegation branch. Args: channel_id: Destination channel/instance identifier. text: Message body shown above the buttons. view: Either a list of button dicts or a view object exposing ``children`` with ``label``/``custom_id``/``emoji``/``style``. **kwargs: Extra envelope fields; an explicit ``buttons`` key replaces the derived list. ``None`` values are dropped. """ buttons = [] if isinstance(view, list): buttons = view elif view is not None: if hasattr(view, "children"): for child in view.children: if hasattr(child, "label"): btn = { "label": child.label or "", "custom_id": getattr(child, "custom_id", "") or "", } emoji = getattr(child, "emoji", None) if emoji: btn["emoji"] = str(emoji) style = getattr(child, "style", None) if style is not None: btn["style"] = ( style.name if hasattr(style, "name") else str(style) ) buttons.append(btn) if "buttons" in kwargs: buttons = kwargs.pop("buttons") self._maybe_inject_key("buttons", kwargs) await self._event_bus.publish_outbound(self._platform, { "channel_id": channel_id, "type": "buttons", "text": text, "buttons": buttons, **{k: v for k, v in kwargs.items() if v is not None}, })
@property def name(self) -> str: """Return this adapter's platform identifier. Exposes the ``platform`` passed at construction (e.g. ``"discord"``) so the proxy satisfies the same read-only ``name`` contract as a live :class:`PlatformAdapter`. Read-only with no side effects. Read by the message processor and helpers that index adapters by name, e.g. the ``{a.name: a for a in adapters}`` maps built in ``message_processor/processor.py``, ``generate_and_send.py``, and ``channel_heartbeat.py``. Returns: str: The platform name. """ return self._platform @property def bot_identity(self) -> dict[str, str]: """Return the bot's identity descriptor for this platform. Provides the static identity the live :class:`PlatformAdapter` would normally derive from its connected client, which the worker-side proxy cannot reach. The mapping carries this adapter's ``platform`` plus the fixed ``"Stargazer"`` ``user_id`` and display name. Read-only and built fresh on each access with no side effects. Satisfies the ``bot_identity`` slot of the adapter interface; no internal caller in this repo reads it on the proxy specifically. Returns: dict[str, str]: Keys ``platform``, ``user_id``, and ``display_name``. """ return { "platform": self._platform, "user_id": "Stargazer", "display_name": "Stargazer", }
[docs] async def send_typing(self, channel_id: str) -> None: """Publish a one-shot typing indicator for the channel. Emits a ``type: "typing"`` envelope (no idempotency key, since typing is transient/fire-and-forget) by awaiting ``event_bus.publish_outbound`` onto ``sg:stream:outbound:{platform}``; the Gateway's outbound consumer triggers a single typing pulse on the live client. Invoked from that same consumer's delegation branch (``core/outbound_consumer.py``) as the fallback when continuous typing is not requested. Args: channel_id: Channel/instance to show the typing indicator in. """ await self._event_bus.publish_outbound(self._platform, { "channel_id": channel_id, "type": "typing", })
[docs] async def start_typing(self, channel_id: str) -> None: """Publish a request to begin a sustained typing indicator. Emits a ``type: "start_typing"`` envelope via ``event_bus.publish_outbound`` onto ``sg:stream:outbound:{platform}``; the Gateway's outbound consumer keeps a continuous typing loop alive on the live client until a matching :meth:`stop_typing` arrives. Called by the message processor around long operations (``message_processor/processor.py``) and by the outbound consumer's own delegation branch (``core/outbound_consumer.py``). Args: channel_id: Channel/instance to begin showing typing in. """ await self._event_bus.publish_outbound(self._platform, { "channel_id": channel_id, "type": "start_typing", })
[docs] async def stop_typing(self, channel_id: str) -> None: """Publish a request to stop a sustained typing indicator. Emits a ``type: "stop_typing"`` envelope via ``event_bus.publish_outbound`` onto ``sg:stream:outbound:{platform}``; the Gateway's outbound consumer tears down the typing loop started by :meth:`start_typing`. Called by the message processor when long operations finish (``message_processor/processor.py``). Args: channel_id: Channel/instance to stop showing typing in. """ await self._event_bus.publish_outbound(self._platform, { "channel_id": channel_id, "type": "stop_typing", })
[docs] async def add_reaction(self, channel_id: str, message_id: str, emoji: str, **kwargs) -> None: """Publish a reaction envelope to add an emoji to a message. Emits a ``type: "reaction"`` envelope carrying the target ``message_id`` and ``emoji``; the Gateway's outbound consumer adds the reaction on the live client. It calls :meth:`_maybe_inject_key` (action ``"reaction"``) for idempotency and awaits ``event_bus.publish_outbound`` onto ``sg:stream:outbound:{platform}``. Invoked from the outbound consumer's delegation branch (``core/outbound_consumer.py``); higher-level reaction tooling (e.g. ``tools/discord_react.py``) reaches this path through the adapter interface. Args: channel_id: Channel/instance containing the target message. message_id: Identifier of the message to react to. emoji: Emoji (unicode or platform shortcode) to add. **kwargs: Extra envelope fields (e.g. ``message_key``); ``None`` values are dropped. """ self._maybe_inject_key("reaction", kwargs) await self._event_bus.publish_outbound(self._platform, { "channel_id": channel_id, "type": "reaction", "message_id": message_id, "emoji": emoji, **{k: v for k, v in kwargs.items() if v is not None}, })
[docs] async def set_presence(self, text: str, emoji: str | None = None) -> None: """Publish a presence/status update for the Gateway to apply. Presence is a global (non-channel) action: the Gateway's outbound consumer calls ``adapter.set_presence`` on the live client. No ``channel_id`` or ``message_key`` is attached — it is fire-and-forget and must not go through the per-message idempotency/claim path. """ await self._event_bus.publish_outbound(self._platform, { "type": "presence", "text": text, "emoji": emoji, })
[docs] async def fetch_history(self, channel_id: str, limit: int = 10, **kwargs): """Alias for :meth:`fetch_channel_history`. Tools call the :class:`PlatformAdapter` interface method name ``fetch_history``; the proxy implements the RPC under ``fetch_channel_history``. Without this alias those calls raise ``AttributeError`` on worker nodes. """ return await self.fetch_channel_history(channel_id, limit=limit, **kwargs)
[docs] async def fetch_channel_history(self, channel_id: str, limit: int = 10, **kwargs): r"""Fetch recent channel history from the Gateway over a Redis-Streams RPC. A worker node has no live client, so it cannot read history locally. This publishes a ``type: "rpc_request"`` envelope (method ``fetch_channel_history``) onto the platform's outbound stream via ``event_bus.publish_outbound`` and then blocks on the event bus's Redis client with ``blpop`` (30s) on a unique ``sg:rpc:reply:{uuid}`` list key; the Gateway's :meth:`core.outbound_consumer.OutboundStreamConsumer._handle_rpc_request` runs the real ``fetch_history`` and pushes a JSON result back, which is decoded and returned. On timeout or any error it logs and returns an empty list, and it always best-effort ``delete``\ s the reply key in a ``finally`` so abandoned replies do not accumulate. Args: channel_id: Channel/instance whose recent messages to fetch. limit: Maximum number of messages to request. Defaults to ``10``. **kwargs: Extra arguments forwarded to the Gateway-side fetch. Returns: list[HistoricalMessage]: Decoded history entries, or an empty list on timeout/error. """ reply_id = str(uuid.uuid4()) reply_key = f"sg:rpc:reply:{reply_id}" request_payload = { "channel_id": channel_id, "type": "rpc_request", "rpc_method": "fetch_channel_history", "rpc_args": {"limit": limit, **kwargs}, "reply_key": reply_key, } logger.info( "fetch_channel_history: Initiating RPC history fetch request for platform=%s, channel_id=%s, limit=%d", self._platform, channel_id, limit, ) try: await self._event_bus.publish_outbound(self._platform, request_payload) # Wait up to 30s for a reply using BLPOP redis = self._event_bus._redis res = await redis.blpop(reply_key, timeout=30) if not res: logger.warning("fetch_channel_history: rpc_timeout method=fetch_channel_history reply_key=%s", reply_key) return [] _, data = res raw_list = json.loads(data) if not isinstance(raw_list, list): logger.warning("fetch_channel_history: RPC returned non-list data for channel_id=%s: %s", channel_id, type(raw_list)) return [] logger.info( "fetch_channel_history: Received RPC response, deserialized %d raw messages for channel_id=%s", len(raw_list), channel_id, ) history = [] for idx, item in enumerate(raw_list): if not isinstance(item, dict): logger.warning("fetch_channel_history: Message item at index %d is not a dict: %s", idx, type(item)) continue ts_val = item.get("timestamp") if isinstance(ts_val, (int, float)): try: dt = datetime.fromtimestamp(ts_val, tz=timezone.utc) except Exception: logger.warning("fetch_channel_history: Invalid float timestamp: %s, falling back to current time", ts_val) dt = datetime.now(timezone.utc) elif isinstance(ts_val, str): try: dt = datetime.fromisoformat(ts_val) except ValueError: logger.warning("fetch_channel_history: Malformed ISO timestamp string: %s, falling back to current time", ts_val) dt = datetime.now(timezone.utc) else: dt = datetime.now(timezone.utc) msg_id = item.get("message_id") or "" user_id = item.get("user_id") or "" logger.debug( "fetch_channel_history: Re-inflating message_id=%s sent by user_id=%s", msg_id, user_id, ) history.append( HistoricalMessage( user_id=str(user_id), user_name=str(item.get("user_name") or ""), text=str(item.get("text") or ""), timestamp=dt, message_id=str(msg_id), is_bot=bool(item.get("is_bot", False)), reply_to_id=str(item.get("reply_to_id") or ""), reactions=str(item.get("reactions") or ""), ) ) return history except Exception as e: logger.error("fetch_channel_history: rpc_error method=fetch_channel_history error=%s", e, exc_info=True) return [] finally: try: # Clean up the key just in case await self._event_bus._redis.delete(reply_key) except Exception: pass
[docs] async def delegate_to_gateway(self, method: str, **kwargs) -> Any: """Run an administrative or destructive adapter action on the Gateway, with retries. The escalated RPC path for privileged operations (e.g. banning a user) that must execute against the live client rather than be queued as an ordinary outbound message. For up to three attempts it publishes a ``type: "rpc_request"`` envelope (carrying ``method`` and ``kwargs``) via ``event_bus.publish_outbound`` and blocks on a unique ``sg:rpc:reply:{uuid}`` list with ``blpop`` (10s) on the event bus's Redis client; the Gateway's ``_handle_rpc_request`` performs the action and pushes back a JSON result that is decoded and returned on the first reply. Between failed or timed-out attempts it sleeps with exponential backoff (``2 ** attempt`` seconds) and always best-effort ``delete``\\ s the reply key in a ``finally``. Returns ``None`` if all attempts are exhausted. Driven by the RPC migration tests (``tests/core/migration/test_proxy_adapter_rpc.py``); tools invoke it through the adapter interface for privileged moderation actions. Args: method: Name of the Gateway-side adapter method to invoke. **kwargs: Keyword arguments forwarded to that method as ``rpc_args``. Returns: Any: The decoded JSON result on success, or ``None`` if every attempt times out or errors. """ max_attempts = 3 for attempt in range(max_attempts): reply_id = str(uuid.uuid4()) reply_key = f"sg:rpc:reply:{reply_id}" request_payload = { "type": "rpc_request", "rpc_method": method, "rpc_args": kwargs, "reply_key": reply_key, } try: await self._event_bus.publish_outbound(self._platform, request_payload) redis = self._event_bus._redis res = await redis.blpop(reply_key, timeout=10) if res: _, data = res return json.loads(data) logger.warning(f"rpc_timeout method={method} attempt={attempt+1}/{max_attempts}") except Exception as e: logger.error(f"rpc_error method={method} attempt={attempt+1}/{max_attempts} error={e}") finally: try: await self._event_bus._redis.delete(reply_key) except Exception: pass if attempt < max_attempts - 1: await asyncio.sleep(2 ** attempt) # Exponential backoff return None
[docs] async def should_skip_channel_heartbeat(self, channel_id: str) -> bool: """Ask the Gateway over RPC whether this channel's heartbeat should be skipped. Heartbeat suppression depends on live presence/state that only the Gateway's real client holds, so the worker proxies the decision. It publishes a ``type: "rpc_request"`` envelope (method ``should_skip_channel_heartbeat``) via ``event_bus.publish_outbound`` and blocks on a unique ``sg:rpc:reply:{uuid}`` list with ``blpop`` (10s) on the event bus's Redis client; the Gateway's ``_handle_rpc_request`` calls the real adapter method and pushes back a JSON boolean. On timeout or any error it logs and conservatively returns ``False`` (do not skip), and it always best-effort ``delete``\\ s the reply key in a ``finally``. Called by the channel heartbeat loop (``message_processor/channel_heartbeat.py``) before emitting a heartbeat, and covered by the RPC migration tests (``tests/core/migration/test_proxy_adapter_rpc.py``). Args: channel_id: Channel/instance whose heartbeat eligibility to check. Returns: bool: ``True`` to skip the heartbeat; ``False`` to proceed (also the safe default on timeout/error). """ reply_id = str(uuid.uuid4()) reply_key = f"sg:rpc:reply:{reply_id}" request_payload = { "channel_id": channel_id, "type": "rpc_request", "rpc_method": "should_skip_channel_heartbeat", "rpc_args": {}, "reply_key": reply_key, } try: await self._event_bus.publish_outbound(self._platform, request_payload) # Block until reply arrives or timeout redis = self._event_bus._redis res = await redis.blpop(reply_key, timeout=10) if not res: logger.warning(f"rpc_timeout method=should_skip_channel_heartbeat reply_key={reply_key}") return False _, data = res return json.loads(data) except Exception as e: logger.error(f"rpc_error method=should_skip_channel_heartbeat error={e}") return False finally: try: await self._event_bus._redis.delete(reply_key) except Exception: pass
[docs] async def get_channel_name(self, channel_id: str) -> str: """Return a synthetic display name for a channel on the worker node. Worker nodes do not hold a live client and cannot resolve real channel names cheaply, so this returns the deterministic placeholder ``"channel_{channel_id}"`` rather than performing an RPC. It has no side effects and makes no network calls. No internal callers were found in this repo; it satisfies the :class:`PlatformAdapter` interface so code expecting ``get_channel_name`` does not raise on worker nodes. Args: channel_id: Channel/instance identifier to label. Returns: str: The placeholder name ``"channel_{channel_id}"``. """ return f"channel_{channel_id}"