core.stream_consumer module
Stream consumer workers for Stargazer distributed architecture.
- class core.stream_consumer.InboundStreamConsumer(redis, consumer_name, process_fn, autoclaim_interval=30.0, autoclaim_min_idle=60000, state_machine=None)[source]
Bases:
objectConsumes inbound message envelopes and dispatches to the processor.
Runs as an asyncio task on worker nodes. Each worker instance has a unique consumer name within the WORKER_GROUP consumer group, ensuring Redis distributes messages across workers.
- Parameters:
- __init__(redis, consumer_name, process_fn, autoclaim_interval=30.0, autoclaim_min_idle=60000, state_machine=None)[source]
Configure an inbound consumer without starting any background work.
Stores the async Redis client, this worker’s unique consumer name within the
WORKER_GROUPconsumer group (so Redis Streams fans messages out across workers), the asyncprocess_fncallback that handles a single deserialized payload, and the autoclaim tuning knobs. An optionalstate_machineis retained so the consumer can emit trace lifecycle transitions (CLAIMED/COMPLETED/ERRORED) as messages move through. Per-message retry counters (_attempt_counts) and the live_active_tasksset are initialised empty, and the loop tasks are left asNoneuntilstart()is called – no Redis I/O happens here. Constructed byinference_main.py(the inference worker) and by a wide range of tests undertests/coreandtests/integration.- Parameters:
redis (
Redis) – Async Redis client used forXREADGROUP,XACK,XPENDING,XAUTOCLAIM, the autoclaim audit hash, and the per-channel distributed lock.consumer_name (
str) – Unique consumer name withinWORKER_GROUP.process_fn (
Callable[[dict[str,Any]],Awaitable[None]]) – Async callback invoked with the deserialized payload dict for each successfully decoded message.autoclaim_interval (
float) – Seconds the autoclaim daemon sleeps between sweeps.autoclaim_min_idle (
int) – Minimum idle time in milliseconds before a pending message is eligible to be reclaimed from a crashed consumer.state_machine (
Optional[Any]) – Optional trace state machine whosetransitionis awaited / scheduled to record lifecycle hooks;Nonedisables hooks.
- Return type:
None
- async start()[source]
Start the inbound consume loop and the autoclaim daemon.
Flips
_runningtrue and spawns two long-lived asyncio tasks: the_consume_loop()that blocks onXREADGROUPfor new inbound envelopes, and the_autoclaim_loop()that periodically reclaims messages orphaned by crashed workers. This is the activation point that turns a freshly constructed, idle consumer into a running worker; both tasks keep references onselfsostop()can cancel them. No Redis I/O happens directly here beyond the loop tasks it launches, and an informational log line records the consumer name andINBOUND_STREAM. Called by the inference worker ininference_main.pyonce Redis is connected, and by tests undertests/core.- Return type:
- async stop()[source]
Gracefully shut down the inbound consumer and drain in-flight work.
Clears
_runningso neither loop schedules new work, cancels the consume-loop and autoclaim tasks started bystart(), then awaits any per-message tasks still tracked in_active_tasks(gathering exceptions rather than raising) so that messages already mid-processing finish theirXACK/ DLQ path before the worker exits. This is the lifecycle counterpart tostart(), invoked during graceful worker shutdown ininference_main.pyand by tests; it touches Redis only indirectly via the tasks it drains.- Return type:
- class core.stream_consumer.OutboundStreamConsumer(redis, consumer_name, process_fn, platforms)[source]
Bases:
objectConsumes outbound messages intended for platform adapters.
Runs on gateway nodes. Reads from sg:stream:outbound:{platform} for each configured platform.
- Parameters:
- __init__(redis, consumer_name, process_fn, platforms)[source]
Configure an outbound consumer for one or more platform streams.
Stores the async Redis client, this gateway’s unique consumer name (within the
stargazer_gatewaysconsumer group used by_consume_loop()/_handle_message()), and the asyncprocess_fnthat hands a deserialized outbound payload to the platform adapter. Fromplatformsit precomputes_streams, a mapping ofsg:stream:outbound:{platform}to the">"read cursor for each configured platform, which is passed straight toXREADGROUP. The running flag, loop task, and_active_tasksset are initialised so the consumer is idle untilstart()runs; no Redis I/O happens here. Constructed bygateway_main.py(the gateway service) and by tests undertests/coreandtests/integration.- Parameters:
redis (
Redis) – Async Redis client used forXREADGROUPandXACKon the outbound platform streams.consumer_name (
str) – Unique consumer name within thestargazer_gatewaysgroup.process_fn (
Callable[[dict[str,Any]],Awaitable[None]]) – Async callback invoked with the deserialized outbound payload dict, which routes the send to the platform adapter.platforms (
list[str]) – Platform identifiers (e.g."discord") whose outbound streams this consumer should read; an empty list causesstart()to no-op with a warning.
- Return type:
None
- async start()[source]
Start the outbound consume loop unless no platform streams exist.
Activation point for the gateway-side consumer. If
_streamsis empty (no platforms were configured) it logs a warning and returns without doing anything, since there is nothing to read. Otherwise it flips_runningtrue and spawns the_consume_loop()task that block-reads thesg:stream:outbound:{platform}streams; the task reference is kept onselfforstop()to cancel. Unlike the inbound consumer there is no autoclaim daemon. Called by the gateway ingateway_main.py– and importantly before each platformadapter.start()so responses are never missed – and by tests undertests/core.- Return type:
- async stop()[source]
Gracefully shut down the outbound consumer and drain in-flight sends.
Clears
_runningso the loop stops scheduling work, cancels the consume-loop task launched bystart(), and then awaits any per-message send tasks still tracked in_active_tasks(gathering exceptions) so outbound deliveries already underway complete their adapter call andXACKbefore the gateway exits. Lifecycle counterpart tostart(), invoked during graceful gateway shutdown ingateway_main.pyand by tests; reaches Redis only via the tasks it drains.- Return type: