core.event_bus module
Redis Streams event bus for the Stargazer distributed architecture.
Defines RedisEventBus, the thin publish/provision layer that ties the
five microservices (gateway / inference / agents / consolidation / web)
together over Redis Streams. Gateways publish inbound message envelopes onto a
single shared worker stream, inference/agents workers publish responses onto
per-platform outbound streams, and each side reads with its own consumer
group; this module owns the stream and group names and the XGROUP /
XADD calls that create and feed them.
Stream and group names are resolved once at import time from environment
variables (SG_INBOUND_STREAM, SG_OUTBOUND_STREAM_PREFIX,
SG_WORKER_GROUP, SG_GATEWAY_GROUP_PREFIX) so every service agrees on
the topology without hard-coding it. Envelopes are msgpack-framed by
core.serialization before they hit the wire. The bus is constructed once
per service by the gateway_main / inference_main / agents_main
entrypoints.
- class core.event_bus.RedisEventBus(redis, node_role, node_id, *, stream_maxlen=100000)[source]
Bases:
objectManages Redis Streams for inbound/outbound message routing.
Responsibilities: - Publishing inbound message envelopes (gateway → worker) - Publishing outbound response envelopes (worker → gateway) - Creating and managing consumer groups - Providing stream metadata (lag, pending counts)
- __init__(redis, node_role, node_id, *, stream_maxlen=100000)[source]
Initialize the event bus with its Redis client and node identity.
Stores the shared
redis.asyncio.Redisclient used for every subsequentXGROUP/XADD/XINFO/XPENDING/XLENcall, plus the node’s role and id, which decide (inensure_streams()) which inbound/outbound streams and consumer groups this node provisions. The stream length cap is floored at 1000 to guard against a misconfiguration that would make the approximateMAXLENtrimming inpublish_inbound()/publish_outbound()evict almost everything.This is a pure in-memory initializer: it opens no connections and issues no Redis commands. It is constructed once per service by the
gateway,inference, andagentsentrypoints (gateway_main.py,inference_main.py,agents_main.py) and bytests/core/test_event_bus.py.- Parameters:
redis (
Redis) – Shared async Redis client backing every stream operation performed by this bus.node_role (
str) – One of"gateway","worker", or"standalone"; determines which consumer groupsensure_streams()creates.node_id (
str) – Stable identifier for this process, used only in diagnostic logextrafields.stream_maxlen (
int) – Approximate maximum number of entries to retain per stream. Values below 1000 are clamped up to 1000. Defaults to 100_000.
- Return type:
None
- async ensure_streams(platforms=None)[source]
Create consumer groups for all known streams.
Called once at startup. Uses XGROUP CREATE with mkstream=True so the stream is auto-created if it doesn’t exist yet.
- async publish_inbound(envelope)[source]
Publish an inbound message envelope to the worker stream.
Called by gateway nodes when they receive a platform message. Returns the Redis Stream message ID.
- async publish_outbound(platform, envelope)[source]
Publish an outbound response envelope to the platform-specific stream.
Called by worker nodes after LLM inference completes. Returns the Redis Stream message ID.
- async publish_tools_request(envelope)[source]
Publish a tool-execution request to the shared
toolsstream.Called by the inference tier’s
RemoteToolRegistryto delegate a non-pinned tool call. Thetoolsservice instances load-balance the stream via thesg:toolsconsumer group. The envelope must carry thereply_tostream name +correlation_idso the result can be routed back. Returns the Redis Stream message ID.
- async publish_tools_reply(reply_stream, envelope)[source]
Publish a tool-execution result onto a worker’s reply stream.
Called by the
toolsservice after running a delegated tool. The reply_stream is thereply_tovalue from the originating request (sg:tools:reply:{worker_id}); a demux reader on that worker matchescorrelation_idand resolves the waiting call. The reply may contain raw media bytes (writeback.sent_files) — msgpack framing preserves them. The reply stream is capped tighter than the request stream since results are consumed promptly. Returns the Redis Stream message ID.
- async get_stream_info(stream)[source]
Return
XINFO STREAMmetadata for a stream, or empty on error.Surfaces the raw Redis
XINFO STREAMmapping (length, first/last entry ids, group count, and so on) so monitoring code can report on a stream’s depth and shape. Any error – most commonly the stream not existing yet – is swallowed and reported as an empty dict so a probe never crashes the caller.Issues a single read-only
XINFO STREAMagainst Redis. No internal callers were found by grep; it is consumed by monitoring/diagnostic code and the bus tests intests/core/test_event_bus.py.
- async get_pending_count(stream, group)[source]
Return the consumer-group backlog (unacknowledged message count).
Reports how many messages a consumer group has read but not yet
XACK-ed – the pending entries list (PEL) size – which is the primary signal that workers are falling behind. The result feeds the eviction-risk check in_check_stream_backlog()and any external monitoring. Errors (including a missing stream or group) are swallowed and reported as0so a diagnostic call never disrupts publishing.Issues a single read-only
XPENDINGsummary against Redis. Called by_check_stream_backlog()in this class and by the bus tests intests/core/test_event_bus.py.