core.outbound_consumer module

Outbound stream consumer for gateway nodes.

class core.outbound_consumer.OutboundStreamConsumer(redis, platform, adapter, consumer_name, autoclaim_interval=30.0, autoclaim_min_idle=60000, tools_dir=None)[source]

Bases: object

Consumes outbound response envelopes and dispatches to local adapters.

One instance per platform per gateway node.

Parameters:
  • redis (redis.asyncio.Redis)

  • platform (str)

  • adapter (Any)

  • consumer_name (str)

  • autoclaim_interval (float)

  • autoclaim_min_idle (int)

  • tools_dir (str | None)

__init__(redis, platform, adapter, consumer_name, autoclaim_interval=30.0, autoclaim_min_idle=60000, tools_dir=None)[source]

Initialize a per-platform outbound consumer and derive its stream/group names.

Stores the injected collaborators and computes the outbound stream name {OUTBOUND_STREAM_PREFIX}:{platform} and consumer-group name {GATEWAY_GROUP_PREFIX}:{platform} that the consume and autoclaim loops read from. Background-task handles and the active-dispatch task set are left empty here; nothing is scheduled until start() is called. The ToolRegistry is left unbuilt (None) and only lazily compiled when an execute_tool RPC first arrives.

Constructed once per platform per gateway node, primarily by gateway_main.GatewayRunner in its on_start adapter loop (and directly by the test suite under tests/core).

Parameters:
  • redis (Redis) – Async Redis client used for stream reads, ACKs, locks, the idempotency/claim keys, and RPC reply queues.

  • platform (str) – Platform name (e.g. "discord"); used to build the stream and group names and as a log dimension.

  • adapter (Any) – The concrete PlatformAdapter whose send/typing/reaction/ presence and RPC methods are invoked to actually reach the platform.

  • consumer_name (str) – Unique consumer name within the group (typically the gateway instance id) used for xreadgroup/xautoclaim.

  • autoclaim_interval (float) – Seconds to sleep between autoclaim sweeps.

  • autoclaim_min_idle (int) – Minimum idle time in milliseconds before a pending entry is eligible to be reclaimed from another consumer.

  • tools_dir (str | None) – Directory to load delegated tools from for execute_tool RPCs; defaults to "tools" when not provided.

Return type:

None

async start()[source]

Launch the background consume and autoclaim loops for this platform.

Flips _running true and schedules _consume_loop() (live stream reads) and _autoclaim_loop() (recovery of stranded entries) as named asyncio tasks, then emits a startup log line. Returns immediately; the loops run until stop() cancels them.

Called by gateway_main.GatewayRunner in its on_start adapter setup loop, deliberately before each adapter.start() so the response path exists before the first inbound message can arrive (also exercised directly in the test suite).

Return type:

None

async stop()[source]

Stop the consumer, cancelling its loops and draining in-flight dispatches.

Clears _running so the loops exit, cancels the consume and autoclaim tasks, awaits the (cancelled) consume task to absorb its CancelledError, and finally gathers any still-running per-message dispatch tasks tracked in _active_tasks so outbound sends that are mid-flight are allowed to finish before shutdown completes.

Called by gateway_main.GatewayRunner during graceful teardown, which iterates self.outbound_consumers and awaits consumer.stop() for each (also covered by tests/core/test_outbound_consumer.py).

Return type:

None