core.stream_consumer module

Stream consumer workers for Stargazer distributed architecture.

class core.stream_consumer.InboundStreamConsumer(redis, consumer_name, process_fn, autoclaim_interval=30.0, autoclaim_min_idle=60000, state_machine=None)[source]

Bases: object

Consumes inbound message envelopes and dispatches to the processor.

Runs as an asyncio task on worker nodes. Each worker instance has a unique consumer name within the WORKER_GROUP consumer group, ensuring Redis distributes messages across workers.

Parameters:
__init__(redis, consumer_name, process_fn, autoclaim_interval=30.0, autoclaim_min_idle=60000, state_machine=None)[source]

Configure an inbound consumer without starting any background work.

Stores the async Redis client, this worker’s unique consumer name within the WORKER_GROUP consumer group (so Redis Streams fans messages out across workers), the async process_fn callback that handles a single deserialized payload, and the autoclaim tuning knobs. An optional state_machine is retained so the consumer can emit trace lifecycle transitions (CLAIMED / COMPLETED / ERRORED) as messages move through. Per-message retry counters (_attempt_counts) and the live _active_tasks set are initialised empty, and the loop tasks are left as None until start() is called – no Redis I/O happens here. Constructed by inference_main.py (the inference worker) and by a wide range of tests under tests/core and tests/integration.

Parameters:
  • redis (Redis) – Async Redis client used for XREADGROUP, XACK, XPENDING, XAUTOCLAIM, the autoclaim audit hash, and the per-channel distributed lock.

  • consumer_name (str) – Unique consumer name within WORKER_GROUP.

  • process_fn (Callable[[dict[str, Any]], Awaitable[None]]) – Async callback invoked with the deserialized payload dict for each successfully decoded message.

  • autoclaim_interval (float) – Seconds the autoclaim daemon sleeps between sweeps.

  • autoclaim_min_idle (int) – Minimum idle time in milliseconds before a pending message is eligible to be reclaimed from a crashed consumer.

  • state_machine (Optional[Any]) – Optional trace state machine whose transition is awaited / scheduled to record lifecycle hooks; None disables hooks.

Return type:

None

async start()[source]

Start the inbound consume loop and the autoclaim daemon.

Flips _running true and spawns two long-lived asyncio tasks: the _consume_loop() that blocks on XREADGROUP for new inbound envelopes, and the _autoclaim_loop() that periodically reclaims messages orphaned by crashed workers. This is the activation point that turns a freshly constructed, idle consumer into a running worker; both tasks keep references on self so stop() can cancel them. No Redis I/O happens directly here beyond the loop tasks it launches, and an informational log line records the consumer name and INBOUND_STREAM. Called by the inference worker in inference_main.py once Redis is connected, and by tests under tests/core.

Return type:

None

async stop()[source]

Gracefully shut down the inbound consumer and drain in-flight work.

Clears _running so neither loop schedules new work, cancels the consume-loop and autoclaim tasks started by start(), then awaits any per-message tasks still tracked in _active_tasks (gathering exceptions rather than raising) so that messages already mid-processing finish their XACK / DLQ path before the worker exits. This is the lifecycle counterpart to start(), invoked during graceful worker shutdown in inference_main.py and by tests; it touches Redis only indirectly via the tasks it drains.

Return type:

None

class core.stream_consumer.OutboundStreamConsumer(redis, consumer_name, process_fn, platforms)[source]

Bases: object

Consumes outbound messages intended for platform adapters.

Runs on gateway nodes. Reads from sg:stream:outbound:{platform} for each configured platform.

Parameters:
__init__(redis, consumer_name, process_fn, platforms)[source]

Configure an outbound consumer for one or more platform streams.

Stores the async Redis client, this gateway’s unique consumer name (within the stargazer_gateways consumer group used by _consume_loop() / _handle_message()), and the async process_fn that hands a deserialized outbound payload to the platform adapter. From platforms it precomputes _streams, a mapping of sg:stream:outbound:{platform} to the ">" read cursor for each configured platform, which is passed straight to XREADGROUP. The running flag, loop task, and _active_tasks set are initialised so the consumer is idle until start() runs; no Redis I/O happens here. Constructed by gateway_main.py (the gateway service) and by tests under tests/core and tests/integration.

Parameters:
  • redis (Redis) – Async Redis client used for XREADGROUP and XACK on the outbound platform streams.

  • consumer_name (str) – Unique consumer name within the stargazer_gateways group.

  • process_fn (Callable[[dict[str, Any]], Awaitable[None]]) – Async callback invoked with the deserialized outbound payload dict, which routes the send to the platform adapter.

  • platforms (list[str]) – Platform identifiers (e.g. "discord") whose outbound streams this consumer should read; an empty list causes start() to no-op with a warning.

Return type:

None

async start()[source]

Start the outbound consume loop unless no platform streams exist.

Activation point for the gateway-side consumer. If _streams is empty (no platforms were configured) it logs a warning and returns without doing anything, since there is nothing to read. Otherwise it flips _running true and spawns the _consume_loop() task that block-reads the sg:stream:outbound:{platform} streams; the task reference is kept on self for stop() to cancel. Unlike the inbound consumer there is no autoclaim daemon. Called by the gateway in gateway_main.py – and importantly before each platform adapter.start() so responses are never missed – and by tests under tests/core.

Return type:

None

async stop()[source]

Gracefully shut down the outbound consumer and drain in-flight sends.

Clears _running so the loop stops scheduling work, cancels the consume-loop task launched by start(), and then awaits any per-message send tasks still tracked in _active_tasks (gathering exceptions) so outbound deliveries already underway complete their adapter call and XACK before the gateway exits. Lifecycle counterpart to start(), invoked during graceful gateway shutdown in gateway_main.py and by tests; reaches Redis only via the tasks it drains.

Return type:

None