"""DEPRECATED (Phase T3, 2026-06-02) — retained for historical reference.
The strangler-fig router used to decide, per channel/guild, whether a
message was handled by the legacy monolith or the new microservices.
Now that the monolith is gone and all traffic routes over the event
bus, :class:`StranglerRouter` is no longer instantiated by any
service. Do **not** import or use it in new code; it is documented
only to explain the migration history.
"""
# DEPRECATED — Phase T3 (2026-06-02)
# The StranglerRouter was removed from GatewayService once the monolith was
# confirmed dead and full microservice routing was validated in production.
# This file is retained for historical reference only.
# Do NOT import or instantiate StranglerRouter in new code.
import asyncio
import logging
import random
from typing import Optional
from core.circuit_breaker import CircuitBreaker, CircuitBreakerOpenException
logger = logging.getLogger(__name__)
[docs]
class StranglerRouter:
"""DEPRECATED strangler-fig router for monolith-vs-microservice traffic.
During the migration this decided, per channel / guild / platform, whether an
inbound message was handled by the legacy monolith or the new microservices,
using a cached rule hierarchy plus a percentage rollout knob so traffic could
be shifted gradually and safely. As of Phase T3 (2026-06-02) the monolith is
gone and all traffic routes over the event bus, so no live service
constructs this class -- it is retained only to document the migration and is
exercised solely by ``tests/core/migration/test_strangler_router.py``.
Reads its rules from the ``sg:strangler:routes`` Redis hash, subscribes to
the ``sg:pubsub:config`` channel to invalidate its in-memory cache when an
operator script (``scripts/canary_rollout.py``,
``scripts/enable_shadow_mode.py``) republishes routes, and shields the
cache-miss path with a :class:`~core.circuit_breaker.CircuitBreaker` so a
flaky Redis falls back to a safe default rather than failing routing. Do
**not** import or instantiate it in new code.
"""
[docs]
def __init__(self, redis):
"""Initialize the router with a Redis handle and an empty route cache.
Sets up the in-memory ``_cache`` dict that holds routing rules pulled
from Redis, a :class:`~core.circuit_breaker.CircuitBreaker`
(``failure_threshold=3``, ``reset_timeout=30.0``) guarding the
cache-miss path against a flaky Redis, an unset ``_listen_task`` handle
for the background invalidation listener, and a ``_default_route`` of
``"monolith"`` used as the safe fallback.
This only stores the passed ``redis`` handle and constructs the circuit
breaker; it performs no I/O, so the cache is populated lazily on the
first :meth:`_fetch_route` call. Since the module is deprecated (Phase
T3) no live service constructs this class; the only caller is the test
suite (``tests/core/migration/test_strangler_router.py``).
Args:
redis: An async Redis client (e.g. ``redis.asyncio.Redis``) used for
``hgetall`` of the routes hash and for pub/sub subscription to
config-change notifications.
"""
self.redis = redis
self._cache = {}
# Protects against Redis being down during cache misses
self._circuit_breaker = CircuitBreaker(failure_threshold=3, reset_timeout=30.0)
self._listen_task = None
self._default_route = "monolith"
[docs]
async def start(self):
"""Launch the background cache-invalidation listener.
Schedules :meth:`_listen_for_invalidations` as a fire-and-forget
``asyncio`` task and stores its handle on ``self._listen_task`` so
:meth:`stop` can cancel it later. Returns immediately without awaiting
the listener.
This calls :func:`asyncio.create_task`; the spawned task subscribes to
the ``sg:pubsub:config`` Redis channel and clears ``self._cache`` on any
published message. Historically called by ``GatewayService`` during
startup; as of Phase T3 there are no live callers (the router was
removed from the gateway), so only the test suite exercises it.
"""
self._listen_task = asyncio.create_task(self._listen_for_invalidations())
[docs]
async def stop(self):
"""Cancel and join the background cache-invalidation listener.
Idempotent teardown counterpart to :meth:`start`: if a listener task is
running it is cancelled and awaited, swallowing the resulting
:class:`asyncio.CancelledError` so shutdown completes cleanly. When no
task was ever started this is a no-op.
This cancels the task created by :meth:`start` (which holds the
``sg:pubsub:config`` pub/sub subscription). Historically called by
``GatewayService`` during shutdown; as of Phase T3 only the test suite
invokes it.
"""
if self._listen_task:
self._listen_task.cancel()
try:
await self._listen_task
except asyncio.CancelledError:
pass
async def _listen_for_invalidations(self):
"""Subscribe to config pub/sub and flush the route cache on each update.
Runs as a long-lived background loop (the task created by
:meth:`start`): it opens a Redis pub/sub, subscribes to the
``sg:pubsub:config`` channel, and for every published ``"message"`` it
logs ``strangler_router_cache_invalidated`` and clears ``self._cache``,
forcing the next :meth:`_fetch_route` to re-read the routes hash from
Redis. This is how operator scripts push route changes live: both
``scripts/canary_rollout.py`` and ``scripts/enable_shadow_mode.py``
rewrite ``sg:strangler:routes`` and then ``publish`` to
``sg:pubsub:config`` to trigger this invalidation.
Interacts with ``self.redis.pubsub()`` for the subscription. A
:class:`asyncio.CancelledError` (from :meth:`stop`) is re-raised so the
task terminates promptly; any other exception is logged as
``strangler_pubsub_error`` and ends the loop without propagating.
Called only by :meth:`start` via :func:`asyncio.create_task`; never
invoked directly.
Raises:
asyncio.CancelledError: Re-raised when the task is cancelled so the
listener can be torn down by :meth:`stop`.
"""
try:
pubsub = self.redis.pubsub()
await pubsub.subscribe("sg:pubsub:config")
async for message in pubsub.listen():
if message["type"] == "message":
logger.info("strangler_router_cache_invalidated")
self._cache.clear()
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(f"strangler_pubsub_error: {e}")
[docs]
async def get_route(
self,
channel: str = None,
guild: str = None,
platform: str = None
) -> str:
"""Resolve the route for a message, failing safe on any error.
Public entry point of the router: it delegates the real decision to
:meth:`_fetch_route` but runs it inside the instance's
:class:`~core.circuit_breaker.CircuitBreaker` so a Redis outage during a
cache refresh trips the breaker instead of stalling every message. On
either an open breaker or any other exception it logs and returns the
safe ``self._default_route`` (``"monolith"``), so routing degrades
rather than throws.
Touches Redis only indirectly, through :meth:`_fetch_route` /
:meth:`_refresh_cache` reading ``sg:strangler:routes``. Since the module
is deprecated (Phase T3) it has no live callers; only
``tests/core/migration/test_strangler_router.py`` invokes it.
Args:
channel: Optional channel identifier; highest-priority override.
guild: Optional guild/server identifier; second-priority override.
platform: Optional platform name (e.g. ``"discord"``); third-priority
override.
Returns:
str: The resolved route (typically ``"microservice"`` or
``"monolith"``), falling back to ``self._default_route`` on breaker
open or error.
"""
try:
route = await self._circuit_breaker.call(
self._fetch_route, channel, guild, platform
)
return route
except CircuitBreakerOpenException:
logger.warning("strangler_circuit_open_fallback_monolith")
return self._default_route
except Exception as e:
logger.error(f"strangler_route_error error={e}")
return self._default_route
async def _fetch_route(
self,
channel: str = None,
guild: str = None,
platform: str = None
) -> str:
"""Resolve a route from the cached rules using the 5-level hierarchy.
Implements the actual routing decision behind :meth:`get_route`. On a
cold cache it first calls :meth:`_refresh_cache`, then consults the
rules in priority order: a channel-specific override, then a
guild-specific one, then a platform-specific one. If none match, a
percentage rollout key (``route:pct:microservice``) performs stochastic
routing -- a uniform :func:`random.random` draw scaled to 0-100 is
compared against the threshold and, when below it, returns
``"microservice"`` (a non-numeric threshold is ignored). Otherwise it
returns the cached ``route:default`` value, or ``self._default_route``
(``"monolith"``) if that key is absent.
Reads from ``self._cache`` (populated from the ``sg:strangler:routes``
Redis hash) using keys ``route:channel:{channel}``,
``route:guild:{guild}``, ``route:platform:{platform}``,
``route:pct:microservice`` and ``route:default``. It mutates no state
beyond possibly triggering a cache refresh.
Called by :meth:`get_route`, which wraps it in the circuit breaker so a
Redis failure during :meth:`_refresh_cache` trips the breaker rather
than surfacing here. No other internal callers; tests invoke it
indirectly via :meth:`get_route`.
Args:
channel: Optional channel identifier; highest-priority override.
guild: Optional guild/server identifier; second-priority override.
platform: Optional platform name (e.g. ``"discord"``); third-priority
override.
Returns:
str: The resolved route, typically ``"microservice"`` or
``"monolith"`` (whatever value the matching rule holds).
"""
# Check cache if empty
if not self._cache:
await self._refresh_cache()
# 5-level routing hierarchy: channel > guild > platform > pct > default
if channel and (route := self._cache.get(f"route:channel:{channel}")):
return route
if guild and (route := self._cache.get(f"route:guild:{guild}")):
return route
if platform and (route := self._cache.get(f"route:platform:{platform}")):
return route
# Percent-based rollout (stochastic routing based on pct threshold)
pct = self._cache.get("route:pct:microservice")
if pct is not None:
try:
pct_val = float(pct)
if random.random() * 100 < pct_val:
return "microservice"
except ValueError:
pass
return self._cache.get("route:default", self._default_route)
async def _refresh_cache(self):
"""Reload all routing rules from Redis into the in-memory cache.
Reads the entire ``sg:strangler:routes`` hash and copies every
field/value into ``self._cache``, decoding ``bytes`` keys and values to
``str`` (and coercing any non-bytes to ``str``) so later lookups in
:meth:`_fetch_route` use plain string keys. Existing cache entries are
overwritten but not cleared first.
Issues a single ``self.redis.hgetall("sg:strangler:routes")`` call; that
hash is maintained by the operator scripts ``scripts/canary_rollout.py``
and ``scripts/enable_shadow_mode.py``. Any Redis error propagates to the
caller.
Called by :meth:`_fetch_route` on a cold (empty) cache, which itself
runs inside the :meth:`get_route` circuit breaker, so a failure here is
what trips that breaker. No other internal callers.
Raises:
Exception: Propagates any error raised by ``redis.hgetall`` (e.g.
connection failures), which the circuit breaker counts toward
its failure threshold.
"""
# Fetch all routing rules from redis
rules = await self.redis.hgetall("sg:strangler:routes")
for k, v in rules.items():
key_str = k.decode('utf-8') if isinstance(k, bytes) else str(k)
val_str = v.decode('utf-8') if isinstance(v, bytes) else str(v)
self._cache[key_str] = val_str