core.proxy_adapter module

Proxy PlatformAdapter that publishes to Redis instead of calling SDK APIs.

class core.proxy_adapter.ProxyPlatformAdapter(event_bus, platform, trace_id=None)[source]

Bases: object

Drop-in replacement for PlatformAdapter on worker nodes.

Instead of holding a live Discord/Matrix client, this adapter serializes all send() calls into outbound stream envelopes. The gateway consumes these and dispatches to the real adapter.

Parameters:
__init__(event_bus, platform, trace_id=None)[source]

Initialize the proxy adapter for one platform on a worker node.

Stores the shared RedisEventBus used to publish outbound envelopes, the platform name (e.g. "discord") that selects the destination stream, and an optional trace_id used to derive deterministic per-action idempotency keys. A defaultdict(int) of per-action-type counters backs that key derivation in _maybe_inject_key().

This constructor has no side effects of its own. It is instantiated by the inference worker when it needs an adapter to hand to the message processor / tools (via ToolContext.adapter) so their send calls become Redis envelopes instead of live SDK calls.

Parameters:
  • event_bus (RedisEventBus) – Shared event bus whose publish_outbound writes the outbound Redis Stream and whose _redis backs RPC replies.

  • platform (str) – Platform identifier selecting the outbound stream and gateway consumer group (e.g. "discord", "matrix").

  • trace_id (str | None) – Optional request trace id; when set, send actions get a derived message_key for gateway-side idempotency.

Return type:

None

async send(channel_id, text, **kwargs)[source]

Publish a text message envelope for the Gateway to deliver.

Serializes a type: "message" envelope (channel, text, and any non-None extra kwargs such as reply_to) instead of calling a live SDK; the Gateway’s outbound consumer reconstructs and sends it on the real client.

It first calls _maybe_inject_key() to stamp a message_key for idempotency, then awaits event_bus.publish_outbound(platform, ...), which XADDs onto the sg:stream:outbound:{platform} stream. Called broadly across the worker: the message processor and command router (message_processor/processor.py, command_router.py), tools via ctx.adapter.send / platform.send, and task_manager.py all use it to emit chat output.

Parameters:
  • channel_id (str) – Destination channel/instance identifier.

  • text (str) – Message body to send.

  • **kwargs – Extra envelope fields (e.g. reply_to, message_key); None values are dropped.

Return type:

None

async send_file(channel_id, file_data, filename='file', mimetype='application/octet-stream', **kwargs)[source]

Publish a file-attachment envelope for the Gateway to upload.

Base64-encodes the raw file_data so the bytes survive Redis Stream serialization, then emits a type: "file" envelope carrying the channel, encoded payload, filename, and mimetype.

It calls _maybe_inject_key() (action "file") for idempotency and awaits event_bus.publish_outbound, XADDing onto sg:stream:outbound:{platform}; the Gateway’s outbound consumer decodes and uploads via the live adapter’s send_file. Called by the outbound consumer’s own delegation path and by many media tools through ctx.adapter.send_file (e.g. tools/generate_image.py, tools/elevenlabs_tts.py, tools/render_mermaid.py) and background agents to attach generated artifacts.

Parameters:
  • channel_id (str) – Destination channel/instance identifier.

  • file_data (bytes) – Raw file bytes to send (base64-encoded for transport).

  • filename (str) – Suggested attachment filename. Defaults to "file".

  • mimetype (str) – MIME type of the attachment. Defaults to "application/octet-stream".

  • **kwargs – Extra envelope fields (e.g. caption text, message_key); None values are dropped.

Return type:

None

async send_with_buttons(channel_id, text, view=None, **kwargs)[source]

Publish a text-with-buttons envelope, normalizing the button spec.

Accepts either a plain list of button dicts or a Discord-style view object and flattens it into a serializable buttons list (label, custom_id, optional emoji, and a string-ified style) since live UI view objects cannot cross Redis. An explicit buttons kwarg, if present, overrides whatever was derived from view.

After building the list it calls _maybe_inject_key() (action "buttons") and awaits event_bus.publish_outbound to XADD a type: "buttons" envelope onto sg:stream:outbound:{platform}; the Gateway’s outbound consumer rebuilds the interactive view. Called by the message processor’s send path (message_processor/generate_and_send.py, processor.py) and the outbound consumer’s delegation branch.

Parameters:
  • channel_id (str) – Destination channel/instance identifier.

  • text (str) – Message body shown above the buttons.

  • view (Any) – Either a list of button dicts or a view object exposing children with label/custom_id/emoji/style.

  • **kwargs – Extra envelope fields; an explicit buttons key replaces the derived list. None values are dropped.

Return type:

None

property name: str

Return this adapter’s platform identifier.

Exposes the platform passed at construction (e.g. "discord") so the proxy satisfies the same read-only name contract as a live PlatformAdapter. Read-only with no side effects.

Read by the message processor and helpers that index adapters by name, e.g. the {a.name: a for a in adapters} maps built in message_processor/processor.py, generate_and_send.py, and channel_heartbeat.py.

Returns:

The platform name.

Return type:

str

property bot_identity: dict[str, str]

Return the bot’s identity descriptor for this platform.

Provides the static identity the live PlatformAdapter would normally derive from its connected client, which the worker-side proxy cannot reach. The mapping carries this adapter’s platform plus the fixed "Stargazer" user_id and display name. Read-only and built fresh on each access with no side effects.

Satisfies the bot_identity slot of the adapter interface; no internal caller in this repo reads it on the proxy specifically.

Returns:

Keys platform, user_id, and display_name.

Return type:

dict[str, str]

async send_typing(channel_id)[source]

Publish a one-shot typing indicator for the channel.

Emits a type: "typing" envelope (no idempotency key, since typing is transient/fire-and-forget) by awaiting event_bus.publish_outbound onto sg:stream:outbound:{platform}; the Gateway’s outbound consumer triggers a single typing pulse on the live client. Invoked from that same consumer’s delegation branch (core/outbound_consumer.py) as the fallback when continuous typing is not requested.

Parameters:

channel_id (str) – Channel/instance to show the typing indicator in.

Return type:

None

async start_typing(channel_id)[source]

Publish a request to begin a sustained typing indicator.

Emits a type: "start_typing" envelope via event_bus.publish_outbound onto sg:stream:outbound:{platform}; the Gateway’s outbound consumer keeps a continuous typing loop alive on the live client until a matching stop_typing() arrives. Called by the message processor around long operations (message_processor/processor.py) and by the outbound consumer’s own delegation branch (core/outbound_consumer.py).

Parameters:

channel_id (str) – Channel/instance to begin showing typing in.

Return type:

None

async stop_typing(channel_id)[source]

Publish a request to stop a sustained typing indicator.

Emits a type: "stop_typing" envelope via event_bus.publish_outbound onto sg:stream:outbound:{platform}; the Gateway’s outbound consumer tears down the typing loop started by start_typing(). Called by the message processor when long operations finish (message_processor/processor.py).

Parameters:

channel_id (str) – Channel/instance to stop showing typing in.

Return type:

None

async add_reaction(channel_id, message_id, emoji, **kwargs)[source]

Publish a reaction envelope to add an emoji to a message.

Emits a type: "reaction" envelope carrying the target message_id and emoji; the Gateway’s outbound consumer adds the reaction on the live client.

It calls _maybe_inject_key() (action "reaction") for idempotency and awaits event_bus.publish_outbound onto sg:stream:outbound:{platform}. Invoked from the outbound consumer’s delegation branch (core/outbound_consumer.py); higher-level reaction tooling (e.g. tools/discord_react.py) reaches this path through the adapter interface.

Parameters:
  • channel_id (str) – Channel/instance containing the target message.

  • message_id (str) – Identifier of the message to react to.

  • emoji (str) – Emoji (unicode or platform shortcode) to add.

  • **kwargs – Extra envelope fields (e.g. message_key); None values are dropped.

Return type:

None

async set_presence(text, emoji=None)[source]

Publish a presence/status update for the Gateway to apply.

Presence is a global (non-channel) action: the Gateway’s outbound consumer calls adapter.set_presence on the live client. No channel_id or message_key is attached — it is fire-and-forget and must not go through the per-message idempotency/claim path.

Return type:

None

Parameters:
  • text (str)

  • emoji (str | None)

async fetch_history(channel_id, limit=10, **kwargs)[source]

Alias for fetch_channel_history().

Tools call the PlatformAdapter interface method name fetch_history; the proxy implements the RPC under fetch_channel_history. Without this alias those calls raise AttributeError on worker nodes.

Parameters:
  • channel_id (str)

  • limit (int)

async fetch_channel_history(channel_id, limit=10, **kwargs)[source]

Fetch recent channel history from the Gateway over a Redis-Streams RPC.

A worker node has no live client, so it cannot read history locally. This publishes a type: "rpc_request" envelope (method fetch_channel_history) onto the platform’s outbound stream via event_bus.publish_outbound and then blocks on the event bus’s Redis client with blpop (30s) on a unique sg:rpc:reply:{uuid} list key; the Gateway’s core.outbound_consumer.OutboundStreamConsumer._handle_rpc_request() runs the real fetch_history and pushes a JSON result back, which is decoded and returned. On timeout or any error it logs and returns an empty list, and it always best-effort deletes the reply key in a finally so abandoned replies do not accumulate.

Parameters:
  • channel_id (str) – Channel/instance whose recent messages to fetch.

  • limit (int) – Maximum number of messages to request. Defaults to 10.

  • **kwargs – Extra arguments forwarded to the Gateway-side fetch.

Returns:

Decoded history entries, or an empty list on timeout/error.

Return type:

list[HistoricalMessage]

async delegate_to_gateway(method, **kwargs)[source]

Run an administrative or destructive adapter action on the Gateway, with retries.

The escalated RPC path for privileged operations (e.g. banning a user) that must execute against the live client rather than be queued as an ordinary outbound message. For up to three attempts it publishes a type: "rpc_request" envelope (carrying method and kwargs) via event_bus.publish_outbound and blocks on a unique sg:rpc:reply:{uuid} list with blpop (10s) on the event bus’s Redis client; the Gateway’s _handle_rpc_request performs the action and pushes back a JSON result that is decoded and returned on the first reply.

Between failed or timed-out attempts it sleeps with exponential backoff (2 ** attempt seconds) and always best-effort deletes the reply key in a finally. Returns None if all attempts are exhausted. Driven by the RPC migration tests (tests/core/migration/test_proxy_adapter_rpc.py); tools invoke it through the adapter interface for privileged moderation actions.

Parameters:
  • method (str) – Name of the Gateway-side adapter method to invoke.

  • **kwargs – Keyword arguments forwarded to that method as rpc_args.

Returns:

The decoded JSON result on success, or None if every attempt times out or errors.

Return type:

Any

async should_skip_channel_heartbeat(channel_id)[source]

Ask the Gateway over RPC whether this channel’s heartbeat should be skipped.

Heartbeat suppression depends on live presence/state that only the Gateway’s real client holds, so the worker proxies the decision. It publishes a type: "rpc_request" envelope (method should_skip_channel_heartbeat) via event_bus.publish_outbound and blocks on a unique sg:rpc:reply:{uuid} list with blpop (10s) on the event bus’s Redis client; the Gateway’s _handle_rpc_request calls the real adapter method and pushes back a JSON boolean. On timeout or any error it logs and conservatively returns False (do not skip), and it always best-effort deletes the reply key in a finally.

Called by the channel heartbeat loop (message_processor/channel_heartbeat.py) before emitting a heartbeat, and covered by the RPC migration tests (tests/core/migration/test_proxy_adapter_rpc.py).

Parameters:

channel_id (str) – Channel/instance whose heartbeat eligibility to check.

Returns:

True to skip the heartbeat; False to proceed (also the safe default on timeout/error).

Return type:

bool

async get_channel_name(channel_id)[source]

Return a synthetic display name for a channel on the worker node.

Worker nodes do not hold a live client and cannot resolve real channel names cheaply, so this returns the deterministic placeholder "channel_{channel_id}" rather than performing an RPC. It has no side effects and makes no network calls. No internal callers were found in this repo; it satisfies the PlatformAdapter interface so code expecting get_channel_name does not raise on worker nodes.

Parameters:

channel_id (str) – Channel/instance identifier to label.

Returns:

The placeholder name "channel_{channel_id}".

Return type:

str