Source code for core.event_bus

"""Redis Streams event bus for the Stargazer distributed architecture.

Defines :class:`RedisEventBus`, the thin publish/provision layer that ties the
five microservices (gateway / inference / agents / consolidation / web)
together over Redis Streams. Gateways publish inbound message envelopes onto a
single shared worker stream, inference/agents workers publish responses onto
per-platform outbound streams, and each side reads with its own consumer
group; this module owns the stream and group names and the ``XGROUP`` /
``XADD`` calls that create and feed them.

Stream and group names are resolved once at import time from environment
variables (``SG_INBOUND_STREAM``, ``SG_OUTBOUND_STREAM_PREFIX``,
``SG_WORKER_GROUP``, ``SG_GATEWAY_GROUP_PREFIX``) so every service agrees on
the topology without hard-coding it. Envelopes are msgpack-framed by
``core.serialization`` before they hit the wire. The bus is constructed once
per service by the ``gateway_main`` / ``inference_main`` / ``agents_main``
entrypoints.
"""

import asyncio
import logging
import os
import time
from typing import Any, Optional

from redis.asyncio import Redis

from core.serialization import serialize_stream_payload, deserialize_stream_payload

logger = logging.getLogger("stargazer.event_bus")

# Stream names
INBOUND_STREAM = os.environ.get("SG_INBOUND_STREAM", "sg:stream:inbound")
OUTBOUND_STREAM_PREFIX = os.environ.get("SG_OUTBOUND_STREAM_PREFIX", "sg:stream:outbound")  # sg:stream:outbound:discord, etc.

# Consumer group names
WORKER_GROUP = os.environ.get("SG_WORKER_GROUP", "sg:workers")
GATEWAY_GROUP_PREFIX = os.environ.get("SG_GATEWAY_GROUP_PREFIX", "sg:gateway")  # sg:gateway:discord, etc.

# Dedicated tool-execution service: inference workers publish tool-exec requests
# onto a single shared stream that ``tools`` instances load-balance via one
# consumer group; each request names a per-worker reply stream the tools service
# XADDs the result onto (a demux reader on the worker resolves it). Envelopes are
# msgpack-framed like every other stream, so the reply can carry raw media bytes
# (``serialize_stream_payload`` uses ``use_bin_type=True``).
TOOLS_STREAM = os.environ.get("SG_TOOLS_STREAM", "sg:stream:tools")
TOOLS_GROUP = os.environ.get("SG_TOOLS_GROUP", "sg:tools")
TOOLS_REPLY_STREAM_PREFIX = os.environ.get("SG_TOOLS_REPLY_STREAM_PREFIX", "sg:tools:reply")  # sg:tools:reply:{worker_id}


[docs] class RedisEventBus: """Manages Redis Streams for inbound/outbound message routing. Responsibilities: - Publishing inbound message envelopes (gateway → worker) - Publishing outbound response envelopes (worker → gateway) - Creating and managing consumer groups - Providing stream metadata (lag, pending counts) """
[docs] def __init__( self, redis: Redis, node_role: str, node_id: str, *, stream_maxlen: int = 100_000, ) -> None: """Initialize the event bus with its Redis client and node identity. Stores the shared :class:`redis.asyncio.Redis` client used for every subsequent ``XGROUP``/``XADD``/``XINFO``/``XPENDING``/``XLEN`` call, plus the node's role and id, which decide (in :meth:`ensure_streams`) which inbound/outbound streams and consumer groups this node provisions. The stream length cap is floored at 1000 to guard against a misconfiguration that would make the approximate ``MAXLEN`` trimming in :meth:`publish_inbound` / :meth:`publish_outbound` evict almost everything. This is a pure in-memory initializer: it opens no connections and issues no Redis commands. It is constructed once per service by the ``gateway``, ``inference``, and ``agents`` entrypoints (``gateway_main.py``, ``inference_main.py``, ``agents_main.py``) and by ``tests/core/test_event_bus.py``. Args: redis (Redis): Shared async Redis client backing every stream operation performed by this bus. node_role (str): One of ``"gateway"``, ``"worker"``, or ``"standalone"``; determines which consumer groups :meth:`ensure_streams` creates. node_id (str): Stable identifier for this process, used only in diagnostic log ``extra`` fields. stream_maxlen (int): Approximate maximum number of entries to retain per stream. Values below 1000 are clamped up to 1000. Defaults to 100_000. """ self._redis = redis self._node_role = node_role self._node_id = node_id self._stream_maxlen = max(1000, int(stream_maxlen))
# ── Initialization ────────────────────────────────────────────
[docs] async def ensure_streams(self, platforms: list[str] | None = None) -> None: """Create consumer groups for all known streams. Called once at startup. Uses XGROUP CREATE with mkstream=True so the stream is auto-created if it doesn't exist yet. """ groups_to_create = [] if self._node_role in ("worker", "standalone"): groups_to_create.append((INBOUND_STREAM, WORKER_GROUP)) if self._node_role in ("gateway", "standalone"): resolved_platforms = platforms if platforms is not None else ("discord", "matrix", "webchat") for platform in resolved_platforms: stream = f"{OUTBOUND_STREAM_PREFIX}:{platform}" group = f"{GATEWAY_GROUP_PREFIX}:{platform}" groups_to_create.append((stream, group)) if self._node_role == "tools": # Only the dedicated tools service consumes the tools stream; # standalone/dev runs tools in-process and needs no group. groups_to_create.append((TOOLS_STREAM, TOOLS_GROUP)) logger.debug( "Initializing event bus streams check", extra={ "node_role": self._node_role, "node_id": self._node_id, "streams_to_provision": len(groups_to_create), }, ) for stream, group in groups_to_create: try: await self._redis.xgroup_create(stream, group, id="$", mkstream=True) logger.info( "Created consumer group", extra={"stream": stream, "group": group}, ) except Exception: # Group already exists — this is expected on restart logger.debug("Consumer group already exists: %s/%s", stream, group)
# ── Publishing ────────────────────────────────────────────────
[docs] async def publish_inbound(self, envelope: dict[str, Any]) -> str: """Publish an inbound message envelope to the worker stream. Called by gateway nodes when they receive a platform message. Returns the Redis Stream message ID. """ logger.debug( "Publishing inbound payload envelope", extra={ "channel_id": envelope.get("channel_id"), "platform": envelope.get("platform"), "content_len": len(envelope.get("content", "") or ""), }, ) raw = serialize_stream_payload(envelope) await self._check_stream_backlog(INBOUND_STREAM, WORKER_GROUP) res = await self._redis.xadd( INBOUND_STREAM, raw, maxlen=self._stream_maxlen, approximate=True, ) # Decode bytes message ID returned by redis to string msg_id = res.decode() if isinstance(res, bytes) else str(res) logger.info( "Published inbound event", extra={ "stream_msg_id": msg_id, "channel_id": envelope.get("channel_id"), "platform": envelope.get("platform"), "trace_id": raw.get("trace_id", b"").decode(), }, ) return msg_id
[docs] async def publish_outbound( self, platform: str, envelope: dict[str, Any], ) -> str: """Publish an outbound response envelope to the platform-specific stream. Called by worker nodes after LLM inference completes. Returns the Redis Stream message ID. """ logger.debug( "Publishing outbound payload envelope", extra={ "channel_id": envelope.get("channel_id"), "platform": platform, "msg_type": envelope.get("type"), }, ) stream = f"{OUTBOUND_STREAM_PREFIX}:{platform}" raw = serialize_stream_payload(envelope) group = f"{GATEWAY_GROUP_PREFIX}:{platform}" await self._check_stream_backlog(stream, group) res = await self._redis.xadd( stream, raw, maxlen=self._stream_maxlen, approximate=True, ) msg_id = res.decode() if isinstance(res, bytes) else str(res) logger.info( "Published outbound event", extra={ "stream_msg_id": msg_id, "channel_id": envelope.get("channel_id"), "platform": platform, }, ) return msg_id
[docs] async def publish_tools_request(self, envelope: dict[str, Any]) -> str: """Publish a tool-execution request to the shared ``tools`` stream. Called by the inference tier's ``RemoteToolRegistry`` to delegate a non-pinned tool call. The ``tools`` service instances load-balance the stream via the ``sg:tools`` consumer group. The envelope must carry the ``reply_to`` stream name + ``correlation_id`` so the result can be routed back. Returns the Redis Stream message ID. """ raw = serialize_stream_payload(envelope) await self._check_stream_backlog(TOOLS_STREAM, TOOLS_GROUP) res = await self._redis.xadd( TOOLS_STREAM, raw, maxlen=self._stream_maxlen, approximate=True, ) msg_id = res.decode() if isinstance(res, bytes) else str(res) logger.debug( "Published tool-exec request", extra={ "stream_msg_id": msg_id, "tool_name": envelope.get("tool_name"), "correlation_id": envelope.get("correlation_id"), }, ) return msg_id
[docs] async def publish_tools_reply( self, reply_stream: str, envelope: dict[str, Any], ) -> str: """Publish a tool-execution result onto a worker's reply stream. Called by the ``tools`` service after running a delegated tool. The *reply_stream* is the ``reply_to`` value from the originating request (``sg:tools:reply:{worker_id}``); a demux reader on that worker matches ``correlation_id`` and resolves the waiting call. The reply may contain raw media bytes (``writeback.sent_files``) — msgpack framing preserves them. The reply stream is capped tighter than the request stream since results are consumed promptly. Returns the Redis Stream message ID. """ raw = serialize_stream_payload(envelope) res = await self._redis.xadd( reply_stream, raw, maxlen=10_000, approximate=True, ) return res.decode() if isinstance(res, bytes) else str(res)
# ── Diagnostics ───────────────────────────────────────────────
[docs] async def get_stream_info(self, stream: str) -> dict[str, Any]: """Return ``XINFO STREAM`` metadata for a stream, or empty on error. Surfaces the raw Redis ``XINFO STREAM`` mapping (length, first/last entry ids, group count, and so on) so monitoring code can report on a stream's depth and shape. Any error -- most commonly the stream not existing yet -- is swallowed and reported as an empty dict so a probe never crashes the caller. Issues a single read-only ``XINFO STREAM`` against Redis. No internal callers were found by grep; it is consumed by monitoring/diagnostic code and the bus tests in ``tests/core/test_event_bus.py``. Args: stream: The Redis Stream name to introspect. Returns: The ``XINFO STREAM`` field mapping, or an empty dict if the stream is missing or the call fails. """ try: return await self._redis.xinfo_stream(stream) except Exception: return {}
[docs] async def get_pending_count(self, stream: str, group: str) -> int: """Return the consumer-group backlog (unacknowledged message count). Reports how many messages a consumer group has read but not yet ``XACK``-ed -- the pending entries list (PEL) size -- which is the primary signal that workers are falling behind. The result feeds the eviction-risk check in :meth:`_check_stream_backlog` and any external monitoring. Errors (including a missing stream or group) are swallowed and reported as ``0`` so a diagnostic call never disrupts publishing. Issues a single read-only ``XPENDING`` summary against Redis. Called by :meth:`_check_stream_backlog` in this class and by the bus tests in ``tests/core/test_event_bus.py``. Args: stream: The Redis Stream to inspect. group: The consumer group whose pending count is requested. Returns: The number of pending (read but unacknowledged) messages, or ``0`` when the information is unavailable. """ try: info = await self._redis.xpending(stream, group) return info.get("pending", 0) if isinstance(info, dict) else 0 except Exception: return 0
async def _check_stream_backlog(self, stream: str, group: str) -> None: """Warn when a stream is full enough that ``MAXLEN`` trimming may drop unread data. Guards against silent data loss: because :meth:`publish_inbound` and :meth:`publish_outbound` cap streams with approximate ``MAXLEN`` trimming, a stream that is near capacity while a consumer group is lagging risks evicting entries the group has not yet read. This runs the check just before each publish and emits a single ``warning`` log when the stream length exceeds 80 percent of ``stream_maxlen`` or more than 1000 messages are pending, giving operators a chance to react before eviction happens. Combines :meth:`get_pending_count` with a read-only ``XLEN`` against Redis; it only logs and never raises -- any failure is downgraded to a debug line so a diagnostics hiccup cannot block the actual publish. Called at the top of :meth:`publish_inbound` and :meth:`publish_outbound`. Args: stream: The Redis Stream about to be published to. group: The consumer group whose lag is weighed against the stream length. """ try: pending = await self.get_pending_count(stream, group) length = await self._redis.xlen(stream) if length > int(self._stream_maxlen * 0.8) or pending > 1000: logger.warning( "Stream backlog high — eviction risk", extra={ "stream": stream, "group": group, "length": length, "pending": pending, "maxlen": self._stream_maxlen, }, ) except Exception: logger.debug("Stream backlog check failed for %s", stream, exc_info=True)