core.event_bus module

Redis Streams event bus for the Stargazer distributed architecture.

Defines RedisEventBus, the thin publish/provision layer that ties the five microservices (gateway / inference / agents / consolidation / web) together over Redis Streams. Gateways publish inbound message envelopes onto a single shared worker stream, inference/agents workers publish responses onto per-platform outbound streams, and each side reads with its own consumer group; this module owns the stream and group names and the XGROUP / XADD calls that create and feed them.

Stream and group names are resolved once at import time from environment variables (SG_INBOUND_STREAM, SG_OUTBOUND_STREAM_PREFIX, SG_WORKER_GROUP, SG_GATEWAY_GROUP_PREFIX) so every service agrees on the topology without hard-coding it. Envelopes are msgpack-framed by core.serialization before they hit the wire. The bus is constructed once per service by the gateway_main / inference_main / agents_main entrypoints.

class core.event_bus.RedisEventBus(redis, node_role, node_id, *, stream_maxlen=100000)[source]

Bases: object

Manages Redis Streams for inbound/outbound message routing.

Responsibilities: - Publishing inbound message envelopes (gateway → worker) - Publishing outbound response envelopes (worker → gateway) - Creating and managing consumer groups - Providing stream metadata (lag, pending counts)

Parameters:
  • redis (redis.asyncio.Redis)

  • node_role (str)

  • node_id (str)

  • stream_maxlen (int)

__init__(redis, node_role, node_id, *, stream_maxlen=100000)[source]

Initialize the event bus with its Redis client and node identity.

Stores the shared redis.asyncio.Redis client used for every subsequent XGROUP/XADD/XINFO/XPENDING/XLEN call, plus the node’s role and id, which decide (in ensure_streams()) which inbound/outbound streams and consumer groups this node provisions. The stream length cap is floored at 1000 to guard against a misconfiguration that would make the approximate MAXLEN trimming in publish_inbound() / publish_outbound() evict almost everything.

This is a pure in-memory initializer: it opens no connections and issues no Redis commands. It is constructed once per service by the gateway, inference, and agents entrypoints (gateway_main.py, inference_main.py, agents_main.py) and by tests/core/test_event_bus.py.

Parameters:
  • redis (Redis) – Shared async Redis client backing every stream operation performed by this bus.

  • node_role (str) – One of "gateway", "worker", or "standalone"; determines which consumer groups ensure_streams() creates.

  • node_id (str) – Stable identifier for this process, used only in diagnostic log extra fields.

  • stream_maxlen (int) – Approximate maximum number of entries to retain per stream. Values below 1000 are clamped up to 1000. Defaults to 100_000.

Return type:

None

async ensure_streams(platforms=None)[source]

Create consumer groups for all known streams.

Called once at startup. Uses XGROUP CREATE with mkstream=True so the stream is auto-created if it doesn’t exist yet.

Return type:

None

Parameters:

platforms (list[str] | None)

async publish_inbound(envelope)[source]

Publish an inbound message envelope to the worker stream.

Called by gateway nodes when they receive a platform message. Returns the Redis Stream message ID.

Return type:

str

Parameters:

envelope (dict[str, Any])

async publish_outbound(platform, envelope)[source]

Publish an outbound response envelope to the platform-specific stream.

Called by worker nodes after LLM inference completes. Returns the Redis Stream message ID.

Return type:

str

Parameters:
async publish_tools_request(envelope)[source]

Publish a tool-execution request to the shared tools stream.

Called by the inference tier’s RemoteToolRegistry to delegate a non-pinned tool call. The tools service instances load-balance the stream via the sg:tools consumer group. The envelope must carry the reply_to stream name + correlation_id so the result can be routed back. Returns the Redis Stream message ID.

Return type:

str

Parameters:

envelope (dict[str, Any])

async publish_tools_reply(reply_stream, envelope)[source]

Publish a tool-execution result onto a worker’s reply stream.

Called by the tools service after running a delegated tool. The reply_stream is the reply_to value from the originating request (sg:tools:reply:{worker_id}); a demux reader on that worker matches correlation_id and resolves the waiting call. The reply may contain raw media bytes (writeback.sent_files) — msgpack framing preserves them. The reply stream is capped tighter than the request stream since results are consumed promptly. Returns the Redis Stream message ID.

Return type:

str

Parameters:
async get_stream_info(stream)[source]

Return XINFO STREAM metadata for a stream, or empty on error.

Surfaces the raw Redis XINFO STREAM mapping (length, first/last entry ids, group count, and so on) so monitoring code can report on a stream’s depth and shape. Any error – most commonly the stream not existing yet – is swallowed and reported as an empty dict so a probe never crashes the caller.

Issues a single read-only XINFO STREAM against Redis. No internal callers were found by grep; it is consumed by monitoring/diagnostic code and the bus tests in tests/core/test_event_bus.py.

Parameters:

stream (str) – The Redis Stream name to introspect.

Return type:

dict[str, Any]

Returns:

The XINFO STREAM field mapping, or an empty dict if the stream is missing or the call fails.

async get_pending_count(stream, group)[source]

Return the consumer-group backlog (unacknowledged message count).

Reports how many messages a consumer group has read but not yet XACK-ed – the pending entries list (PEL) size – which is the primary signal that workers are falling behind. The result feeds the eviction-risk check in _check_stream_backlog() and any external monitoring. Errors (including a missing stream or group) are swallowed and reported as 0 so a diagnostic call never disrupts publishing.

Issues a single read-only XPENDING summary against Redis. Called by _check_stream_backlog() in this class and by the bus tests in tests/core/test_event_bus.py.

Parameters:
  • stream (str) – The Redis Stream to inspect.

  • group (str) – The consumer group whose pending count is requested.

Return type:

int

Returns:

The number of pending (read but unacknowledged) messages, or 0 when the information is unavailable.