Source code for core.proxy_adapter
"""Proxy PlatformAdapter that publishes to Redis instead of calling SDK APIs."""
import logging
import base64
import uuid
import json
import asyncio
from typing import Any
from collections import defaultdict
from core.event_bus import RedisEventBus
from platforms.base import HistoricalMessage
from datetime import datetime, timezone
logger = logging.getLogger("stargazer.proxy_adapter")
[docs]
class ProxyPlatformAdapter:
"""Drop-in replacement for PlatformAdapter on worker nodes.
Instead of holding a live Discord/Matrix client, this adapter
serializes all send() calls into outbound stream envelopes.
The gateway consumes these and dispatches to the real adapter.
"""
[docs]
def __init__(self, event_bus: RedisEventBus, platform: str, trace_id: str | None = None) -> None:
"""Initialize the proxy adapter for one platform on a worker node.
Stores the shared :class:`~core.event_bus.RedisEventBus` used to
publish outbound envelopes, the platform name (e.g. ``"discord"``)
that selects the destination stream, and an optional ``trace_id``
used to derive deterministic per-action idempotency keys. A
``defaultdict(int)`` of per-action-type counters backs that key
derivation in :meth:`_maybe_inject_key`.
This constructor has no side effects of its own. It is instantiated
by the inference worker when it needs an adapter to hand to the
message processor / tools (via ``ToolContext.adapter``) so their
``send`` calls become Redis envelopes instead of live SDK calls.
Args:
event_bus: Shared event bus whose ``publish_outbound`` writes the
outbound Redis Stream and whose ``_redis`` backs RPC replies.
platform: Platform identifier selecting the outbound stream and
gateway consumer group (e.g. ``"discord"``, ``"matrix"``).
trace_id: Optional request trace id; when set, send actions get a
derived ``message_key`` for gateway-side idempotency.
"""
self._event_bus = event_bus
self._platform = platform
self.platform_name = platform
self._trace_id = trace_id
self._action_counters = defaultdict(int)
def _maybe_inject_key(self, action_type: str, kwargs: dict[str, Any]) -> None:
"""Inject a deterministic ``message_key`` into outbound kwargs for idempotency.
When a ``trace_id`` is set and the caller has not already supplied a
``message_key``, this bumps the per-``action_type`` counter and writes
a key of the form ``msg:{trace_id}:{action_type}:{n}`` into ``kwargs``
in place. The gateway's outbound consumer uses that key to deduplicate
and claim sends so a retried/replayed inference run does not post the
same message twice.
It mutates the passed ``kwargs`` dict and increments
``self._action_counters[action_type]``; it has no other side effects.
Called by the send-style methods :meth:`send`, :meth:`send_file`,
:meth:`send_with_buttons`, and :meth:`add_reaction` just before they
publish their envelope. Global, non-message actions (presence, typing)
intentionally do not call it.
Args:
action_type: Logical action label (e.g. ``"message"``, ``"file"``,
``"buttons"``, ``"reaction"``) namespacing the counter and key.
kwargs: Outbound keyword arguments mutated in place to receive the
derived ``message_key`` when one is not already present.
"""
if "message_key" not in kwargs and self._trace_id:
self._action_counters[action_type] += 1
kwargs["message_key"] = f"msg:{self._trace_id}:{action_type}:{self._action_counters[action_type]}"
[docs]
async def send(self, channel_id: str, text: str, **kwargs) -> None:
"""Publish a text message envelope for the Gateway to deliver.
Serializes a ``type: "message"`` envelope (channel, text, and any
non-``None`` extra kwargs such as ``reply_to``) instead of calling a
live SDK; the Gateway's outbound consumer reconstructs and sends it on
the real client.
It first calls :meth:`_maybe_inject_key` to stamp a ``message_key`` for
idempotency, then awaits ``event_bus.publish_outbound(platform, ...)``,
which XADDs onto the ``sg:stream:outbound:{platform}`` stream. Called
broadly across the worker: the message processor and command router
(``message_processor/processor.py``, ``command_router.py``), tools via
``ctx.adapter.send`` / ``platform.send``, and ``task_manager.py`` all
use it to emit chat output.
Args:
channel_id: Destination channel/instance identifier.
text: Message body to send.
**kwargs: Extra envelope fields (e.g. ``reply_to``,
``message_key``); ``None`` values are dropped.
"""
self._maybe_inject_key("message", kwargs)
await self._event_bus.publish_outbound(self._platform, {
"channel_id": channel_id,
"type": "message",
"text": text,
**{k: v for k, v in kwargs.items() if v is not None},
})
[docs]
async def send_file(self, channel_id: str, file_data: bytes,
filename: str = "file", mimetype: str = "application/octet-stream", **kwargs) -> None:
"""Publish a file-attachment envelope for the Gateway to upload.
Base64-encodes the raw ``file_data`` so the bytes survive Redis Stream
serialization, then emits a ``type: "file"`` envelope carrying the
channel, encoded payload, filename, and mimetype.
It calls :meth:`_maybe_inject_key` (action ``"file"``) for idempotency
and awaits ``event_bus.publish_outbound``, XADDing onto
``sg:stream:outbound:{platform}``; the Gateway's outbound consumer
decodes and uploads via the live adapter's ``send_file``. Called by
the outbound consumer's own delegation path and by many media tools
through ``ctx.adapter.send_file`` (e.g. ``tools/generate_image.py``,
``tools/elevenlabs_tts.py``, ``tools/render_mermaid.py``) and
background agents to attach generated artifacts.
Args:
channel_id: Destination channel/instance identifier.
file_data: Raw file bytes to send (base64-encoded for transport).
filename: Suggested attachment filename. Defaults to ``"file"``.
mimetype: MIME type of the attachment. Defaults to
``"application/octet-stream"``.
**kwargs: Extra envelope fields (e.g. caption ``text``,
``message_key``); ``None`` values are dropped.
"""
self._maybe_inject_key("file", kwargs)
encoded_data = base64.b64encode(file_data).decode()
await self._event_bus.publish_outbound(self._platform, {
"channel_id": channel_id,
"type": "file",
"file_data": encoded_data,
"filename": filename,
"mimetype": mimetype,
**{k: v for k, v in kwargs.items() if v is not None},
})
[docs]
async def send_with_buttons(self, channel_id: str, text: str,
view: Any = None, **kwargs) -> None:
"""Publish a text-with-buttons envelope, normalizing the button spec.
Accepts either a plain list of button dicts or a Discord-style ``view``
object and flattens it into a serializable ``buttons`` list (label,
``custom_id``, optional ``emoji``, and a string-ified ``style``) since
live UI ``view`` objects cannot cross Redis. An explicit ``buttons``
kwarg, if present, overrides whatever was derived from ``view``.
After building the list it calls :meth:`_maybe_inject_key` (action
``"buttons"``) and awaits ``event_bus.publish_outbound`` to XADD a
``type: "buttons"`` envelope onto ``sg:stream:outbound:{platform}``;
the Gateway's outbound consumer rebuilds the interactive view. Called
by the message processor's send path
(``message_processor/generate_and_send.py``, ``processor.py``) and the
outbound consumer's delegation branch.
Args:
channel_id: Destination channel/instance identifier.
text: Message body shown above the buttons.
view: Either a list of button dicts or a view object exposing
``children`` with ``label``/``custom_id``/``emoji``/``style``.
**kwargs: Extra envelope fields; an explicit ``buttons`` key
replaces the derived list. ``None`` values are dropped.
"""
buttons = []
if isinstance(view, list):
buttons = view
elif view is not None:
if hasattr(view, "children"):
for child in view.children:
if hasattr(child, "label"):
btn = {
"label": child.label or "",
"custom_id": getattr(child, "custom_id", "") or "",
}
emoji = getattr(child, "emoji", None)
if emoji:
btn["emoji"] = str(emoji)
style = getattr(child, "style", None)
if style is not None:
btn["style"] = (
style.name if hasattr(style, "name") else str(style)
)
buttons.append(btn)
if "buttons" in kwargs:
buttons = kwargs.pop("buttons")
self._maybe_inject_key("buttons", kwargs)
await self._event_bus.publish_outbound(self._platform, {
"channel_id": channel_id,
"type": "buttons",
"text": text,
"buttons": buttons,
**{k: v for k, v in kwargs.items() if v is not None},
})
@property
def name(self) -> str:
"""Return this adapter's platform identifier.
Exposes the ``platform`` passed at construction (e.g. ``"discord"``) so
the proxy satisfies the same read-only ``name`` contract as a live
:class:`PlatformAdapter`. Read-only with no side effects.
Read by the message processor and helpers that index adapters by name,
e.g. the ``{a.name: a for a in adapters}`` maps built in
``message_processor/processor.py``, ``generate_and_send.py``, and
``channel_heartbeat.py``.
Returns:
str: The platform name.
"""
return self._platform
@property
def bot_identity(self) -> dict[str, str]:
"""Return the bot's identity descriptor for this platform.
Provides the static identity the live :class:`PlatformAdapter` would
normally derive from its connected client, which the worker-side proxy
cannot reach. The mapping carries this adapter's ``platform`` plus the
fixed ``"Stargazer"`` ``user_id`` and display name. Read-only and built
fresh on each access with no side effects.
Satisfies the ``bot_identity`` slot of the adapter interface; no internal
caller in this repo reads it on the proxy specifically.
Returns:
dict[str, str]: Keys ``platform``, ``user_id``, and ``display_name``.
"""
return {
"platform": self._platform,
"user_id": "Stargazer",
"display_name": "Stargazer",
}
[docs]
async def send_typing(self, channel_id: str) -> None:
"""Publish a one-shot typing indicator for the channel.
Emits a ``type: "typing"`` envelope (no idempotency key, since typing
is transient/fire-and-forget) by awaiting ``event_bus.publish_outbound``
onto ``sg:stream:outbound:{platform}``; the Gateway's outbound consumer
triggers a single typing pulse on the live client. Invoked from that
same consumer's delegation branch (``core/outbound_consumer.py``) as
the fallback when continuous typing is not requested.
Args:
channel_id: Channel/instance to show the typing indicator in.
"""
await self._event_bus.publish_outbound(self._platform, {
"channel_id": channel_id,
"type": "typing",
})
[docs]
async def start_typing(self, channel_id: str) -> None:
"""Publish a request to begin a sustained typing indicator.
Emits a ``type: "start_typing"`` envelope via
``event_bus.publish_outbound`` onto ``sg:stream:outbound:{platform}``;
the Gateway's outbound consumer keeps a continuous typing loop alive on
the live client until a matching :meth:`stop_typing` arrives. Called by
the message processor around long operations
(``message_processor/processor.py``) and by the outbound consumer's own
delegation branch (``core/outbound_consumer.py``).
Args:
channel_id: Channel/instance to begin showing typing in.
"""
await self._event_bus.publish_outbound(self._platform, {
"channel_id": channel_id,
"type": "start_typing",
})
[docs]
async def stop_typing(self, channel_id: str) -> None:
"""Publish a request to stop a sustained typing indicator.
Emits a ``type: "stop_typing"`` envelope via
``event_bus.publish_outbound`` onto ``sg:stream:outbound:{platform}``;
the Gateway's outbound consumer tears down the typing loop started by
:meth:`start_typing`. Called by the message processor when long
operations finish (``message_processor/processor.py``).
Args:
channel_id: Channel/instance to stop showing typing in.
"""
await self._event_bus.publish_outbound(self._platform, {
"channel_id": channel_id,
"type": "stop_typing",
})
[docs]
async def add_reaction(self, channel_id: str, message_id: str,
emoji: str, **kwargs) -> None:
"""Publish a reaction envelope to add an emoji to a message.
Emits a ``type: "reaction"`` envelope carrying the target
``message_id`` and ``emoji``; the Gateway's outbound consumer adds the
reaction on the live client.
It calls :meth:`_maybe_inject_key` (action ``"reaction"``) for
idempotency and awaits ``event_bus.publish_outbound`` onto
``sg:stream:outbound:{platform}``. Invoked from the outbound consumer's
delegation branch (``core/outbound_consumer.py``); higher-level reaction
tooling (e.g. ``tools/discord_react.py``) reaches this path through the
adapter interface.
Args:
channel_id: Channel/instance containing the target message.
message_id: Identifier of the message to react to.
emoji: Emoji (unicode or platform shortcode) to add.
**kwargs: Extra envelope fields (e.g. ``message_key``); ``None``
values are dropped.
"""
self._maybe_inject_key("reaction", kwargs)
await self._event_bus.publish_outbound(self._platform, {
"channel_id": channel_id,
"type": "reaction",
"message_id": message_id,
"emoji": emoji,
**{k: v for k, v in kwargs.items() if v is not None},
})
[docs]
async def set_presence(self, text: str, emoji: str | None = None) -> None:
"""Publish a presence/status update for the Gateway to apply.
Presence is a global (non-channel) action: the Gateway's outbound
consumer calls ``adapter.set_presence`` on the live client. No
``channel_id`` or ``message_key`` is attached — it is fire-and-forget
and must not go through the per-message idempotency/claim path.
"""
await self._event_bus.publish_outbound(self._platform, {
"type": "presence",
"text": text,
"emoji": emoji,
})
[docs]
async def fetch_history(self, channel_id: str, limit: int = 10, **kwargs):
"""Alias for :meth:`fetch_channel_history`.
Tools call the :class:`PlatformAdapter` interface method name
``fetch_history``; the proxy implements the RPC under
``fetch_channel_history``. Without this alias those calls raise
``AttributeError`` on worker nodes.
"""
return await self.fetch_channel_history(channel_id, limit=limit, **kwargs)
[docs]
async def fetch_channel_history(self, channel_id: str, limit: int = 10, **kwargs):
r"""Fetch recent channel history from the Gateway over a Redis-Streams RPC.
A worker node has no live client, so it cannot read history locally. This
publishes a ``type: "rpc_request"`` envelope (method
``fetch_channel_history``) onto the platform's outbound stream via
``event_bus.publish_outbound`` and then blocks on the event bus's Redis
client with ``blpop`` (30s) on a unique ``sg:rpc:reply:{uuid}`` list key;
the Gateway's :meth:`core.outbound_consumer.OutboundStreamConsumer._handle_rpc_request`
runs the real ``fetch_history`` and pushes a JSON result back, which is
decoded and returned. On timeout or any error it logs and returns an empty
list, and it always best-effort ``delete``\ s the reply key in a
``finally`` so abandoned replies do not accumulate.
Args:
channel_id: Channel/instance whose recent messages to fetch.
limit: Maximum number of messages to request. Defaults to ``10``.
**kwargs: Extra arguments forwarded to the Gateway-side fetch.
Returns:
list[HistoricalMessage]: Decoded history entries, or an empty list on timeout/error.
"""
reply_id = str(uuid.uuid4())
reply_key = f"sg:rpc:reply:{reply_id}"
request_payload = {
"channel_id": channel_id,
"type": "rpc_request",
"rpc_method": "fetch_channel_history",
"rpc_args": {"limit": limit, **kwargs},
"reply_key": reply_key,
}
logger.info(
"fetch_channel_history: Initiating RPC history fetch request for platform=%s, channel_id=%s, limit=%d",
self._platform,
channel_id,
limit,
)
try:
await self._event_bus.publish_outbound(self._platform, request_payload)
# Wait up to 30s for a reply using BLPOP
redis = self._event_bus._redis
res = await redis.blpop(reply_key, timeout=30)
if not res:
logger.warning("fetch_channel_history: rpc_timeout method=fetch_channel_history reply_key=%s", reply_key)
return []
_, data = res
raw_list = json.loads(data)
if not isinstance(raw_list, list):
logger.warning("fetch_channel_history: RPC returned non-list data for channel_id=%s: %s", channel_id, type(raw_list))
return []
logger.info(
"fetch_channel_history: Received RPC response, deserialized %d raw messages for channel_id=%s",
len(raw_list),
channel_id,
)
history = []
for idx, item in enumerate(raw_list):
if not isinstance(item, dict):
logger.warning("fetch_channel_history: Message item at index %d is not a dict: %s", idx, type(item))
continue
ts_val = item.get("timestamp")
if isinstance(ts_val, (int, float)):
try:
dt = datetime.fromtimestamp(ts_val, tz=timezone.utc)
except Exception:
logger.warning("fetch_channel_history: Invalid float timestamp: %s, falling back to current time", ts_val)
dt = datetime.now(timezone.utc)
elif isinstance(ts_val, str):
try:
dt = datetime.fromisoformat(ts_val)
except ValueError:
logger.warning("fetch_channel_history: Malformed ISO timestamp string: %s, falling back to current time", ts_val)
dt = datetime.now(timezone.utc)
else:
dt = datetime.now(timezone.utc)
msg_id = item.get("message_id") or ""
user_id = item.get("user_id") or ""
logger.debug(
"fetch_channel_history: Re-inflating message_id=%s sent by user_id=%s",
msg_id,
user_id,
)
history.append(
HistoricalMessage(
user_id=str(user_id),
user_name=str(item.get("user_name") or ""),
text=str(item.get("text") or ""),
timestamp=dt,
message_id=str(msg_id),
is_bot=bool(item.get("is_bot", False)),
reply_to_id=str(item.get("reply_to_id") or ""),
reactions=str(item.get("reactions") or ""),
)
)
return history
except Exception as e:
logger.error("fetch_channel_history: rpc_error method=fetch_channel_history error=%s", e, exc_info=True)
return []
finally:
try:
# Clean up the key just in case
await self._event_bus._redis.delete(reply_key)
except Exception:
pass
[docs]
async def delegate_to_gateway(self, method: str, **kwargs) -> Any:
"""Run an administrative or destructive adapter action on the Gateway, with retries.
The escalated RPC path for privileged operations (e.g. banning a user)
that must execute against the live client rather than be queued as an
ordinary outbound message. For up to three attempts it publishes a
``type: "rpc_request"`` envelope (carrying ``method`` and ``kwargs``) via
``event_bus.publish_outbound`` and blocks on a unique
``sg:rpc:reply:{uuid}`` list with ``blpop`` (10s) on the event bus's Redis
client; the Gateway's ``_handle_rpc_request`` performs the action and
pushes back a JSON result that is decoded and returned on the first reply.
Between failed or timed-out attempts it sleeps with exponential backoff
(``2 ** attempt`` seconds) and always best-effort ``delete``\\ s the reply
key in a ``finally``. Returns ``None`` if all attempts are exhausted.
Driven by the RPC migration tests
(``tests/core/migration/test_proxy_adapter_rpc.py``); tools invoke it
through the adapter interface for privileged moderation actions.
Args:
method: Name of the Gateway-side adapter method to invoke.
**kwargs: Keyword arguments forwarded to that method as ``rpc_args``.
Returns:
Any: The decoded JSON result on success, or ``None`` if every attempt
times out or errors.
"""
max_attempts = 3
for attempt in range(max_attempts):
reply_id = str(uuid.uuid4())
reply_key = f"sg:rpc:reply:{reply_id}"
request_payload = {
"type": "rpc_request",
"rpc_method": method,
"rpc_args": kwargs,
"reply_key": reply_key,
}
try:
await self._event_bus.publish_outbound(self._platform, request_payload)
redis = self._event_bus._redis
res = await redis.blpop(reply_key, timeout=10)
if res:
_, data = res
return json.loads(data)
logger.warning(f"rpc_timeout method={method} attempt={attempt+1}/{max_attempts}")
except Exception as e:
logger.error(f"rpc_error method={method} attempt={attempt+1}/{max_attempts} error={e}")
finally:
try:
await self._event_bus._redis.delete(reply_key)
except Exception:
pass
if attempt < max_attempts - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
return None
[docs]
async def should_skip_channel_heartbeat(self, channel_id: str) -> bool:
"""Ask the Gateway over RPC whether this channel's heartbeat should be skipped.
Heartbeat suppression depends on live presence/state that only the
Gateway's real client holds, so the worker proxies the decision. It
publishes a ``type: "rpc_request"`` envelope (method
``should_skip_channel_heartbeat``) via ``event_bus.publish_outbound`` and
blocks on a unique ``sg:rpc:reply:{uuid}`` list with ``blpop`` (10s) on the
event bus's Redis client; the Gateway's ``_handle_rpc_request`` calls the
real adapter method and pushes back a JSON boolean. On timeout or any
error it logs and conservatively returns ``False`` (do not skip), and it
always best-effort ``delete``\\ s the reply key in a ``finally``.
Called by the channel heartbeat loop
(``message_processor/channel_heartbeat.py``) before emitting a heartbeat,
and covered by the RPC migration tests
(``tests/core/migration/test_proxy_adapter_rpc.py``).
Args:
channel_id: Channel/instance whose heartbeat eligibility to check.
Returns:
bool: ``True`` to skip the heartbeat; ``False`` to proceed (also the
safe default on timeout/error).
"""
reply_id = str(uuid.uuid4())
reply_key = f"sg:rpc:reply:{reply_id}"
request_payload = {
"channel_id": channel_id,
"type": "rpc_request",
"rpc_method": "should_skip_channel_heartbeat",
"rpc_args": {},
"reply_key": reply_key,
}
try:
await self._event_bus.publish_outbound(self._platform, request_payload)
# Block until reply arrives or timeout
redis = self._event_bus._redis
res = await redis.blpop(reply_key, timeout=10)
if not res:
logger.warning(f"rpc_timeout method=should_skip_channel_heartbeat reply_key={reply_key}")
return False
_, data = res
return json.loads(data)
except Exception as e:
logger.error(f"rpc_error method=should_skip_channel_heartbeat error={e}")
return False
finally:
try:
await self._event_bus._redis.delete(reply_key)
except Exception:
pass
[docs]
async def get_channel_name(self, channel_id: str) -> str:
"""Return a synthetic display name for a channel on the worker node.
Worker nodes do not hold a live client and cannot resolve real channel
names cheaply, so this returns the deterministic placeholder
``"channel_{channel_id}"`` rather than performing an RPC. It has no side
effects and makes no network calls. No internal callers were found in
this repo; it satisfies the :class:`PlatformAdapter` interface so code
expecting ``get_channel_name`` does not raise on worker nodes.
Args:
channel_id: Channel/instance identifier to label.
Returns:
str: The placeholder name ``"channel_{channel_id}"``.
"""
return f"channel_{channel_id}"