core.proxy_adapter module
Proxy PlatformAdapter that publishes to Redis instead of calling SDK APIs.
- class core.proxy_adapter.ProxyPlatformAdapter(event_bus, platform, trace_id=None)[source]
Bases:
objectDrop-in replacement for PlatformAdapter on worker nodes.
Instead of holding a live Discord/Matrix client, this adapter serializes all send() calls into outbound stream envelopes. The gateway consumes these and dispatches to the real adapter.
- Parameters:
event_bus (RedisEventBus)
platform (str)
trace_id (str | None)
- __init__(event_bus, platform, trace_id=None)[source]
Initialize the proxy adapter for one platform on a worker node.
Stores the shared
RedisEventBusused to publish outbound envelopes, the platform name (e.g."discord") that selects the destination stream, and an optionaltrace_idused to derive deterministic per-action idempotency keys. Adefaultdict(int)of per-action-type counters backs that key derivation in_maybe_inject_key().This constructor has no side effects of its own. It is instantiated by the inference worker when it needs an adapter to hand to the message processor / tools (via
ToolContext.adapter) so theirsendcalls become Redis envelopes instead of live SDK calls.- Parameters:
event_bus (
RedisEventBus) – Shared event bus whosepublish_outboundwrites the outbound Redis Stream and whose_redisbacks RPC replies.platform (
str) – Platform identifier selecting the outbound stream and gateway consumer group (e.g."discord","matrix").trace_id (
str|None) – Optional request trace id; when set, send actions get a derivedmessage_keyfor gateway-side idempotency.
- Return type:
None
- async send(channel_id, text, **kwargs)[source]
Publish a text message envelope for the Gateway to deliver.
Serializes a
type: "message"envelope (channel, text, and any non-Noneextra kwargs such asreply_to) instead of calling a live SDK; the Gateway’s outbound consumer reconstructs and sends it on the real client.It first calls
_maybe_inject_key()to stamp amessage_keyfor idempotency, then awaitsevent_bus.publish_outbound(platform, ...), which XADDs onto thesg:stream:outbound:{platform}stream. Called broadly across the worker: the message processor and command router (message_processor/processor.py,command_router.py), tools viactx.adapter.send/platform.send, andtask_manager.pyall use it to emit chat output.
- async send_file(channel_id, file_data, filename='file', mimetype='application/octet-stream', **kwargs)[source]
Publish a file-attachment envelope for the Gateway to upload.
Base64-encodes the raw
file_dataso the bytes survive Redis Stream serialization, then emits atype: "file"envelope carrying the channel, encoded payload, filename, and mimetype.It calls
_maybe_inject_key()(action"file") for idempotency and awaitsevent_bus.publish_outbound, XADDing ontosg:stream:outbound:{platform}; the Gateway’s outbound consumer decodes and uploads via the live adapter’ssend_file. Called by the outbound consumer’s own delegation path and by many media tools throughctx.adapter.send_file(e.g.tools/generate_image.py,tools/elevenlabs_tts.py,tools/render_mermaid.py) and background agents to attach generated artifacts.- Parameters:
channel_id (
str) – Destination channel/instance identifier.file_data (
bytes) – Raw file bytes to send (base64-encoded for transport).filename (
str) – Suggested attachment filename. Defaults to"file".mimetype (
str) – MIME type of the attachment. Defaults to"application/octet-stream".**kwargs – Extra envelope fields (e.g. caption
text,message_key);Nonevalues are dropped.
- Return type:
- async send_with_buttons(channel_id, text, view=None, **kwargs)[source]
Publish a text-with-buttons envelope, normalizing the button spec.
Accepts either a plain list of button dicts or a Discord-style
viewobject and flattens it into a serializablebuttonslist (label,custom_id, optionalemoji, and a string-ifiedstyle) since live UIviewobjects cannot cross Redis. An explicitbuttonskwarg, if present, overrides whatever was derived fromview.After building the list it calls
_maybe_inject_key()(action"buttons") and awaitsevent_bus.publish_outboundto XADD atype: "buttons"envelope ontosg:stream:outbound:{platform}; the Gateway’s outbound consumer rebuilds the interactive view. Called by the message processor’s send path (message_processor/generate_and_send.py,processor.py) and the outbound consumer’s delegation branch.- Parameters:
channel_id (
str) – Destination channel/instance identifier.text (
str) – Message body shown above the buttons.view (
Any) – Either a list of button dicts or a view object exposingchildrenwithlabel/custom_id/emoji/style.**kwargs – Extra envelope fields; an explicit
buttonskey replaces the derived list.Nonevalues are dropped.
- Return type:
- property name: str
Return this adapter’s platform identifier.
Exposes the
platformpassed at construction (e.g."discord") so the proxy satisfies the same read-onlynamecontract as a livePlatformAdapter. Read-only with no side effects.Read by the message processor and helpers that index adapters by name, e.g. the
{a.name: a for a in adapters}maps built inmessage_processor/processor.py,generate_and_send.py, andchannel_heartbeat.py.- Returns:
The platform name.
- Return type:
- property bot_identity: dict[str, str]
Return the bot’s identity descriptor for this platform.
Provides the static identity the live
PlatformAdapterwould normally derive from its connected client, which the worker-side proxy cannot reach. The mapping carries this adapter’splatformplus the fixed"Stargazer"user_idand display name. Read-only and built fresh on each access with no side effects.Satisfies the
bot_identityslot of the adapter interface; no internal caller in this repo reads it on the proxy specifically.
- async send_typing(channel_id)[source]
Publish a one-shot typing indicator for the channel.
Emits a
type: "typing"envelope (no idempotency key, since typing is transient/fire-and-forget) by awaitingevent_bus.publish_outboundontosg:stream:outbound:{platform}; the Gateway’s outbound consumer triggers a single typing pulse on the live client. Invoked from that same consumer’s delegation branch (core/outbound_consumer.py) as the fallback when continuous typing is not requested.
- async start_typing(channel_id)[source]
Publish a request to begin a sustained typing indicator.
Emits a
type: "start_typing"envelope viaevent_bus.publish_outboundontosg:stream:outbound:{platform}; the Gateway’s outbound consumer keeps a continuous typing loop alive on the live client until a matchingstop_typing()arrives. Called by the message processor around long operations (message_processor/processor.py) and by the outbound consumer’s own delegation branch (core/outbound_consumer.py).
- async stop_typing(channel_id)[source]
Publish a request to stop a sustained typing indicator.
Emits a
type: "stop_typing"envelope viaevent_bus.publish_outboundontosg:stream:outbound:{platform}; the Gateway’s outbound consumer tears down the typing loop started bystart_typing(). Called by the message processor when long operations finish (message_processor/processor.py).
- async add_reaction(channel_id, message_id, emoji, **kwargs)[source]
Publish a reaction envelope to add an emoji to a message.
Emits a
type: "reaction"envelope carrying the targetmessage_idandemoji; the Gateway’s outbound consumer adds the reaction on the live client.It calls
_maybe_inject_key()(action"reaction") for idempotency and awaitsevent_bus.publish_outboundontosg:stream:outbound:{platform}. Invoked from the outbound consumer’s delegation branch (core/outbound_consumer.py); higher-level reaction tooling (e.g.tools/discord_react.py) reaches this path through the adapter interface.
- async set_presence(text, emoji=None)[source]
Publish a presence/status update for the Gateway to apply.
Presence is a global (non-channel) action: the Gateway’s outbound consumer calls
adapter.set_presenceon the live client. Nochannel_idormessage_keyis attached — it is fire-and-forget and must not go through the per-message idempotency/claim path.
- async fetch_history(channel_id, limit=10, **kwargs)[source]
Alias for
fetch_channel_history().Tools call the
PlatformAdapterinterface method namefetch_history; the proxy implements the RPC underfetch_channel_history. Without this alias those calls raiseAttributeErroron worker nodes.
- async fetch_channel_history(channel_id, limit=10, **kwargs)[source]
Fetch recent channel history from the Gateway over a Redis-Streams RPC.
A worker node has no live client, so it cannot read history locally. This publishes a
type: "rpc_request"envelope (methodfetch_channel_history) onto the platform’s outbound stream viaevent_bus.publish_outboundand then blocks on the event bus’s Redis client withblpop(30s) on a uniquesg:rpc:reply:{uuid}list key; the Gateway’score.outbound_consumer.OutboundStreamConsumer._handle_rpc_request()runs the realfetch_historyand pushes a JSON result back, which is decoded and returned. On timeout or any error it logs and returns an empty list, and it always best-effortdeletes the reply key in afinallyso abandoned replies do not accumulate.- Parameters:
- Returns:
Decoded history entries, or an empty list on timeout/error.
- Return type:
- async delegate_to_gateway(method, **kwargs)[source]
Run an administrative or destructive adapter action on the Gateway, with retries.
The escalated RPC path for privileged operations (e.g. banning a user) that must execute against the live client rather than be queued as an ordinary outbound message. For up to three attempts it publishes a
type: "rpc_request"envelope (carryingmethodandkwargs) viaevent_bus.publish_outboundand blocks on a uniquesg:rpc:reply:{uuid}list withblpop(10s) on the event bus’s Redis client; the Gateway’s_handle_rpc_requestperforms the action and pushes back a JSON result that is decoded and returned on the first reply.Between failed or timed-out attempts it sleeps with exponential backoff (
2 ** attemptseconds) and always best-effortdeletes the reply key in afinally. ReturnsNoneif all attempts are exhausted. Driven by the RPC migration tests (tests/core/migration/test_proxy_adapter_rpc.py); tools invoke it through the adapter interface for privileged moderation actions.
- async should_skip_channel_heartbeat(channel_id)[source]
Ask the Gateway over RPC whether this channel’s heartbeat should be skipped.
Heartbeat suppression depends on live presence/state that only the Gateway’s real client holds, so the worker proxies the decision. It publishes a
type: "rpc_request"envelope (methodshould_skip_channel_heartbeat) viaevent_bus.publish_outboundand blocks on a uniquesg:rpc:reply:{uuid}list withblpop(10s) on the event bus’s Redis client; the Gateway’s_handle_rpc_requestcalls the real adapter method and pushes back a JSON boolean. On timeout or any error it logs and conservatively returnsFalse(do not skip), and it always best-effortdeletes the reply key in afinally.Called by the channel heartbeat loop (
message_processor/channel_heartbeat.py) before emitting a heartbeat, and covered by the RPC migration tests (tests/core/migration/test_proxy_adapter_rpc.py).
- async get_channel_name(channel_id)[source]
Return a synthetic display name for a channel on the worker node.
Worker nodes do not hold a live client and cannot resolve real channel names cheaply, so this returns the deterministic placeholder
"channel_{channel_id}"rather than performing an RPC. It has no side effects and makes no network calls. No internal callers were found in this repo; it satisfies thePlatformAdapterinterface so code expectingget_channel_namedoes not raise on worker nodes.