core.outbound_consumer module
Outbound stream consumer for gateway nodes.
- class core.outbound_consumer.OutboundStreamConsumer(redis, platform, adapter, consumer_name, autoclaim_interval=30.0, autoclaim_min_idle=60000, tools_dir=None)[source]
Bases:
objectConsumes outbound response envelopes and dispatches to local adapters.
One instance per platform per gateway node.
- Parameters:
- __init__(redis, platform, adapter, consumer_name, autoclaim_interval=30.0, autoclaim_min_idle=60000, tools_dir=None)[source]
Initialize a per-platform outbound consumer and derive its stream/group names.
Stores the injected collaborators and computes the outbound stream name
{OUTBOUND_STREAM_PREFIX}:{platform}and consumer-group name{GATEWAY_GROUP_PREFIX}:{platform}that the consume and autoclaim loops read from. Background-task handles and the active-dispatch task set are left empty here; nothing is scheduled untilstart()is called. TheToolRegistryis left unbuilt (None) and only lazily compiled when anexecute_toolRPC first arrives.Constructed once per platform per gateway node, primarily by
gateway_main.GatewayRunnerin itson_startadapter loop (and directly by the test suite undertests/core).- Parameters:
redis (
Redis) – Async Redis client used for stream reads, ACKs, locks, the idempotency/claim keys, and RPC reply queues.platform (
str) – Platform name (e.g."discord"); used to build the stream and group names and as a log dimension.adapter (
Any) – The concretePlatformAdapterwhose send/typing/reaction/ presence and RPC methods are invoked to actually reach the platform.consumer_name (
str) – Unique consumer name within the group (typically the gateway instance id) used forxreadgroup/xautoclaim.autoclaim_interval (
float) – Seconds to sleep between autoclaim sweeps.autoclaim_min_idle (
int) – Minimum idle time in milliseconds before a pending entry is eligible to be reclaimed from another consumer.tools_dir (
str|None) – Directory to load delegated tools from forexecute_toolRPCs; defaults to"tools"when not provided.
- Return type:
None
- async start()[source]
Launch the background consume and autoclaim loops for this platform.
Flips
_runningtrue and schedules_consume_loop()(live stream reads) and_autoclaim_loop()(recovery of stranded entries) as named asyncio tasks, then emits a startup log line. Returns immediately; the loops run untilstop()cancels them.Called by
gateway_main.GatewayRunnerin itson_startadapter setup loop, deliberately before eachadapter.start()so the response path exists before the first inbound message can arrive (also exercised directly in the test suite).- Return type:
- async stop()[source]
Stop the consumer, cancelling its loops and draining in-flight dispatches.
Clears
_runningso the loops exit, cancels the consume and autoclaim tasks, awaits the (cancelled) consume task to absorb itsCancelledError, and finally gathers any still-running per-message dispatch tasks tracked in_active_tasksso outbound sends that are mid-flight are allowed to finish before shutdown completes.Called by
gateway_main.GatewayRunnerduring graceful teardown, which iteratesself.outbound_consumersand awaitsconsumer.stop()for each (also covered bytests/core/test_outbound_consumer.py).- Return type: