"""Redis Streams event bus for the Stargazer distributed architecture.
Defines :class:`RedisEventBus`, the thin publish/provision layer that ties the
five microservices (gateway / inference / agents / consolidation / web)
together over Redis Streams. Gateways publish inbound message envelopes onto a
single shared worker stream, inference/agents workers publish responses onto
per-platform outbound streams, and each side reads with its own consumer
group; this module owns the stream and group names and the ``XGROUP`` /
``XADD`` calls that create and feed them.
Stream and group names are resolved once at import time from environment
variables (``SG_INBOUND_STREAM``, ``SG_OUTBOUND_STREAM_PREFIX``,
``SG_WORKER_GROUP``, ``SG_GATEWAY_GROUP_PREFIX``) so every service agrees on
the topology without hard-coding it. Envelopes are msgpack-framed by
``core.serialization`` before they hit the wire. The bus is constructed once
per service by the ``gateway_main`` / ``inference_main`` / ``agents_main``
entrypoints.
"""
import asyncio
import logging
import os
import time
from typing import Any, Optional
from redis.asyncio import Redis
from core.serialization import serialize_stream_payload, deserialize_stream_payload
logger = logging.getLogger("stargazer.event_bus")
# Stream names
INBOUND_STREAM = os.environ.get("SG_INBOUND_STREAM", "sg:stream:inbound")
OUTBOUND_STREAM_PREFIX = os.environ.get("SG_OUTBOUND_STREAM_PREFIX", "sg:stream:outbound") # sg:stream:outbound:discord, etc.
# Consumer group names
WORKER_GROUP = os.environ.get("SG_WORKER_GROUP", "sg:workers")
GATEWAY_GROUP_PREFIX = os.environ.get("SG_GATEWAY_GROUP_PREFIX", "sg:gateway") # sg:gateway:discord, etc.
# Dedicated tool-execution service: inference workers publish tool-exec requests
# onto a single shared stream that ``tools`` instances load-balance via one
# consumer group; each request names a per-worker reply stream the tools service
# XADDs the result onto (a demux reader on the worker resolves it). Envelopes are
# msgpack-framed like every other stream, so the reply can carry raw media bytes
# (``serialize_stream_payload`` uses ``use_bin_type=True``).
TOOLS_STREAM = os.environ.get("SG_TOOLS_STREAM", "sg:stream:tools")
TOOLS_GROUP = os.environ.get("SG_TOOLS_GROUP", "sg:tools")
TOOLS_REPLY_STREAM_PREFIX = os.environ.get("SG_TOOLS_REPLY_STREAM_PREFIX", "sg:tools:reply") # sg:tools:reply:{worker_id}
[docs]
class RedisEventBus:
"""Manages Redis Streams for inbound/outbound message routing.
Responsibilities:
- Publishing inbound message envelopes (gateway → worker)
- Publishing outbound response envelopes (worker → gateway)
- Creating and managing consumer groups
- Providing stream metadata (lag, pending counts)
"""
[docs]
def __init__(
self,
redis: Redis,
node_role: str,
node_id: str,
*,
stream_maxlen: int = 100_000,
) -> None:
"""Initialize the event bus with its Redis client and node identity.
Stores the shared :class:`redis.asyncio.Redis` client used for every
subsequent ``XGROUP``/``XADD``/``XINFO``/``XPENDING``/``XLEN`` call, plus
the node's role and id, which decide (in :meth:`ensure_streams`) which
inbound/outbound streams and consumer groups this node provisions. The
stream length cap is floored at 1000 to guard against a misconfiguration
that would make the approximate ``MAXLEN`` trimming in
:meth:`publish_inbound` / :meth:`publish_outbound` evict almost
everything.
This is a pure in-memory initializer: it opens no connections and issues
no Redis commands. It is constructed once per service by the
``gateway``, ``inference``, and ``agents`` entrypoints
(``gateway_main.py``, ``inference_main.py``, ``agents_main.py``) and by
``tests/core/test_event_bus.py``.
Args:
redis (Redis): Shared async Redis client backing every stream
operation performed by this bus.
node_role (str): One of ``"gateway"``, ``"worker"``, or
``"standalone"``; determines which consumer groups
:meth:`ensure_streams` creates.
node_id (str): Stable identifier for this process, used only in
diagnostic log ``extra`` fields.
stream_maxlen (int): Approximate maximum number of entries to retain
per stream. Values below 1000 are clamped up to 1000. Defaults
to 100_000.
"""
self._redis = redis
self._node_role = node_role
self._node_id = node_id
self._stream_maxlen = max(1000, int(stream_maxlen))
# ── Initialization ────────────────────────────────────────────
[docs]
async def ensure_streams(self, platforms: list[str] | None = None) -> None:
"""Create consumer groups for all known streams.
Called once at startup. Uses XGROUP CREATE with mkstream=True
so the stream is auto-created if it doesn't exist yet.
"""
groups_to_create = []
if self._node_role in ("worker", "standalone"):
groups_to_create.append((INBOUND_STREAM, WORKER_GROUP))
if self._node_role in ("gateway", "standalone"):
resolved_platforms = platforms if platforms is not None else ("discord", "matrix", "webchat")
for platform in resolved_platforms:
stream = f"{OUTBOUND_STREAM_PREFIX}:{platform}"
group = f"{GATEWAY_GROUP_PREFIX}:{platform}"
groups_to_create.append((stream, group))
if self._node_role == "tools":
# Only the dedicated tools service consumes the tools stream;
# standalone/dev runs tools in-process and needs no group.
groups_to_create.append((TOOLS_STREAM, TOOLS_GROUP))
logger.debug(
"Initializing event bus streams check",
extra={
"node_role": self._node_role,
"node_id": self._node_id,
"streams_to_provision": len(groups_to_create),
},
)
for stream, group in groups_to_create:
try:
await self._redis.xgroup_create(stream, group, id="$", mkstream=True)
logger.info(
"Created consumer group",
extra={"stream": stream, "group": group},
)
except Exception:
# Group already exists — this is expected on restart
logger.debug("Consumer group already exists: %s/%s", stream, group)
# ── Publishing ────────────────────────────────────────────────
[docs]
async def publish_inbound(self, envelope: dict[str, Any]) -> str:
"""Publish an inbound message envelope to the worker stream.
Called by gateway nodes when they receive a platform message.
Returns the Redis Stream message ID.
"""
logger.debug(
"Publishing inbound payload envelope",
extra={
"channel_id": envelope.get("channel_id"),
"platform": envelope.get("platform"),
"content_len": len(envelope.get("content", "") or ""),
},
)
raw = serialize_stream_payload(envelope)
await self._check_stream_backlog(INBOUND_STREAM, WORKER_GROUP)
res = await self._redis.xadd(
INBOUND_STREAM, raw, maxlen=self._stream_maxlen, approximate=True,
)
# Decode bytes message ID returned by redis to string
msg_id = res.decode() if isinstance(res, bytes) else str(res)
logger.info(
"Published inbound event",
extra={
"stream_msg_id": msg_id,
"channel_id": envelope.get("channel_id"),
"platform": envelope.get("platform"),
"trace_id": raw.get("trace_id", b"").decode(),
},
)
return msg_id
[docs]
async def publish_outbound(
self,
platform: str,
envelope: dict[str, Any],
) -> str:
"""Publish an outbound response envelope to the platform-specific stream.
Called by worker nodes after LLM inference completes.
Returns the Redis Stream message ID.
"""
logger.debug(
"Publishing outbound payload envelope",
extra={
"channel_id": envelope.get("channel_id"),
"platform": platform,
"msg_type": envelope.get("type"),
},
)
stream = f"{OUTBOUND_STREAM_PREFIX}:{platform}"
raw = serialize_stream_payload(envelope)
group = f"{GATEWAY_GROUP_PREFIX}:{platform}"
await self._check_stream_backlog(stream, group)
res = await self._redis.xadd(
stream, raw, maxlen=self._stream_maxlen, approximate=True,
)
msg_id = res.decode() if isinstance(res, bytes) else str(res)
logger.info(
"Published outbound event",
extra={
"stream_msg_id": msg_id,
"channel_id": envelope.get("channel_id"),
"platform": platform,
},
)
return msg_id
# ── Diagnostics ───────────────────────────────────────────────
[docs]
async def get_stream_info(self, stream: str) -> dict[str, Any]:
"""Return ``XINFO STREAM`` metadata for a stream, or empty on error.
Surfaces the raw Redis ``XINFO STREAM`` mapping (length, first/last
entry ids, group count, and so on) so monitoring code can report on a
stream's depth and shape. Any error -- most commonly the stream not
existing yet -- is swallowed and reported as an empty dict so a probe
never crashes the caller.
Issues a single read-only ``XINFO STREAM`` against Redis. No internal
callers were found by grep; it is consumed by monitoring/diagnostic code
and the bus tests in ``tests/core/test_event_bus.py``.
Args:
stream: The Redis Stream name to introspect.
Returns:
The ``XINFO STREAM`` field mapping, or an empty dict if the stream
is missing or the call fails.
"""
try:
return await self._redis.xinfo_stream(stream)
except Exception:
return {}
[docs]
async def get_pending_count(self, stream: str, group: str) -> int:
"""Return the consumer-group backlog (unacknowledged message count).
Reports how many messages a consumer group has read but not yet
``XACK``-ed -- the pending entries list (PEL) size -- which is the
primary signal that workers are falling behind. The result feeds the
eviction-risk check in :meth:`_check_stream_backlog` and any external
monitoring. Errors (including a missing stream or group) are swallowed
and reported as ``0`` so a diagnostic call never disrupts publishing.
Issues a single read-only ``XPENDING`` summary against Redis. Called by
:meth:`_check_stream_backlog` in this class and by the bus tests in
``tests/core/test_event_bus.py``.
Args:
stream: The Redis Stream to inspect.
group: The consumer group whose pending count is requested.
Returns:
The number of pending (read but unacknowledged) messages, or ``0``
when the information is unavailable.
"""
try:
info = await self._redis.xpending(stream, group)
return info.get("pending", 0) if isinstance(info, dict) else 0
except Exception:
return 0
async def _check_stream_backlog(self, stream: str, group: str) -> None:
"""Warn when a stream is full enough that ``MAXLEN`` trimming may drop unread data.
Guards against silent data loss: because :meth:`publish_inbound` and
:meth:`publish_outbound` cap streams with approximate ``MAXLEN``
trimming, a stream that is near capacity while a consumer group is
lagging risks evicting entries the group has not yet read. This runs the
check just before each publish and emits a single ``warning`` log when
the stream length exceeds 80 percent of ``stream_maxlen`` or more than
1000 messages are pending, giving operators a chance to react before
eviction happens.
Combines :meth:`get_pending_count` with a read-only ``XLEN`` against
Redis; it only logs and never raises -- any failure is downgraded to a
debug line so a diagnostics hiccup cannot block the actual publish.
Called at the top of :meth:`publish_inbound` and
:meth:`publish_outbound`.
Args:
stream: The Redis Stream about to be published to.
group: The consumer group whose lag is weighed against the stream
length.
"""
try:
pending = await self.get_pending_count(stream, group)
length = await self._redis.xlen(stream)
if length > int(self._stream_maxlen * 0.8) or pending > 1000:
logger.warning(
"Stream backlog high — eviction risk",
extra={
"stream": stream,
"group": group,
"length": length,
"pending": pending,
"maxlen": self._stream_maxlen,
},
)
except Exception:
logger.debug("Stream backlog check failed for %s", stream, exc_info=True)