"""DEPRECATED (Phase T3, 2026-06-02) — retained for historical reference only.
During the monolith-to-microservices migration the strangler-fig router
decided, per channel/guild, whether a message was served by the legacy
monolith or routed over the event bus to the new services. The monolith has
since been retired and all traffic flows through the microservices, so this
module is no longer imported or instantiated by any service.
Do **not** import or use :class:`StranglerRouter` in new code; it exists
only to document the migration history. See :mod:`core.strangler_router` for
the equivalent ``core``-package copy.
"""
# DEPRECATED — Phase T3 (2026-06-02)
# The StranglerRouter was removed from GatewayService once the monolith was
# confirmed dead and full microservice routing was validated in production.
# This file is retained for historical reference only.
# Do NOT import or instantiate StranglerRouter in new code.
import asyncio
import logging
import random
from typing import Optional
from core.circuit_breaker import CircuitBreaker, CircuitBreakerOpenException
logger = logging.getLogger(__name__)
[docs]
class StranglerRouter:
"""Deprecated strangler-fig router that chose monolith vs. microservice.
During the monolith-to-microservices migration this resolved, per
channel/guild/platform, whether a message was served by the legacy
monolith or routed over the event bus to the new services, supporting a
percentage-based rollout and a default fallback. Rules live in the
``sg:strangler:routes`` Redis hash and are cached in memory; a pub/sub
listener on ``sg:pubsub:config`` flushes that cache when operator scripts
rewrite the rules, and a :class:`~core.circuit_breaker.CircuitBreaker`
guards the cache-miss Redis read so a flaky Redis degrades to the safe
``monolith`` default rather than raising.
Retired in Phase T3 once the monolith was confirmed dead: no live service
constructs or imports this class anymore, and the only remaining callers
are the migration test suite (``tests/core/migration/test_strangler_router.py``).
Do not use it in new code; see :mod:`core.strangler_router` for the
package copy.
"""
[docs]
def __init__(self, redis):
"""Initialize the router with a Redis client and an empty route cache.
Sets up the in-memory ``_cache`` dict that holds routing rules pulled
from Redis, a :class:`~core.circuit_breaker.CircuitBreaker`
(``failure_threshold=3``, ``reset_timeout=30.0``) that guards the
cache-miss path against a flaky Redis, an unset ``_listen_task`` handle
for the background invalidation listener, and a ``_default_route`` of
``"monolith"`` used as the safe fallback.
This stores the passed ``redis`` handle but performs no I/O; the cache
is populated lazily on the first :meth:`_fetch_route` call. As this
module is deprecated (Phase T3), no live service constructs this class;
the only callers are historical and the test suite.
Args:
redis: An async Redis client (e.g. ``redis.asyncio.Redis``) used
for ``hgetall`` of the routes hash and for pub/sub
subscription to config-change notifications.
"""
self.redis = redis
self._cache = {}
# Protects against Redis being down during cache misses
self._circuit_breaker = CircuitBreaker(failure_threshold=3, reset_timeout=30.0)
self._listen_task = None
self._default_route = "monolith"
[docs]
async def start(self):
"""Launch the background cache-invalidation listener.
Schedules :meth:`_listen_for_invalidations` as a fire-and-forget
``asyncio`` task whose handle is stored on ``self._listen_task`` so
:meth:`stop` can cancel it later. Returns immediately without awaiting
the listener.
This calls :func:`asyncio.create_task`; the spawned task subscribes to
the ``sg:pubsub:config`` Redis channel. Historically invoked by the
owning ``GatewayService`` during startup, but that wiring was removed
in Phase T3, so no live caller remains (only the test suite exercises
it directly).
"""
self._listen_task = asyncio.create_task(self._listen_for_invalidations())
[docs]
async def stop(self):
"""Cancel the background invalidation listener and await its teardown.
If a listener task was started, requests cancellation and awaits it,
swallowing the resulting :class:`asyncio.CancelledError` so shutdown
completes cleanly. A no-op when :meth:`start` was never called.
This cancels the task created in :meth:`start` (which runs
:meth:`_listen_for_invalidations`). Historically called by
``GatewayService`` on shutdown; that wiring was removed in Phase T3, so
only the test suite calls it now.
"""
if self._listen_task:
self._listen_task.cancel()
try:
await self._listen_task
except asyncio.CancelledError:
pass
async def _listen_for_invalidations(self):
"""Subscribe to config pub/sub and flush the route cache on any message.
Runs as a long-lived background task: opens a Redis pub/sub connection,
subscribes to the ``sg:pubsub:config`` channel, and for every published
message clears ``self._cache`` so the next :meth:`_fetch_route` reloads
fresh rules via :meth:`_refresh_cache`. This is how rollout changes made
by operator scripts take effect without a restart.
This calls ``self.redis.pubsub()`` / ``subscribe`` and iterates
``pubsub.listen()``; its side effect is mutating ``self._cache``
(clearing it) and emitting an ``strangler_router_cache_invalidated``
log line per notification. The config channel is published to by
``scripts/canary_rollout.py`` and ``scripts/enable_shadow_mode.py``
after they rewrite the ``sg:strangler:routes`` hash. Re-raises
:class:`asyncio.CancelledError` so :meth:`stop` can tear it down; any
other exception is logged as ``strangler_pubsub_error`` and ends the
loop. Spawned only by :meth:`start`.
"""
try:
pubsub = self.redis.pubsub()
await pubsub.subscribe("sg:pubsub:config")
async for message in pubsub.listen():
if message["type"] == "message":
logger.info("strangler_router_cache_invalidated")
self._cache.clear()
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(f"strangler_pubsub_error: {e}")
[docs]
async def get_route(
self,
channel: str = None,
guild: str = None,
platform: str = None
) -> str:
"""Resolve the routing target for a message, degrading safely on failure.
Public entry point for routing decisions: delegates to
:meth:`_fetch_route` through the circuit breaker so a struggling Redis
trips the breaker instead of blocking, and converts every failure mode
into the safe ``self._default_route`` (``"monolith"``). A tripped breaker
logs ``strangler_circuit_open_fallback_monolith`` and any other exception
logs ``strangler_route_error`` before falling back. As the module is
deprecated (Phase T3), no live service calls this; only the migration
test suite exercises it.
Args:
channel: Optional channel identifier (highest-priority rule).
guild: Optional guild/server identifier (second priority).
platform: Optional platform name such as ``"discord"`` (third
priority).
Returns:
str: The resolved route — typically ``"monolith"``,
``"microservice"``, or ``"shadow"`` — falling back to the default
route on any error.
"""
try:
route = await self._circuit_breaker.call(
self._fetch_route, channel, guild, platform
)
return route
except CircuitBreakerOpenException:
logger.warning("strangler_circuit_open_fallback_monolith")
return self._default_route
except Exception as e:
logger.error(f"strangler_route_error error={e}")
return self._default_route
async def _fetch_route(
self,
channel: str = None,
guild: str = None,
platform: str = None
) -> str:
"""Resolve a routing target from the cached rules using a 5-level hierarchy.
Lazily populates the cache on first use, then evaluates rules in
priority order: a channel-specific rule wins over a guild rule, which
wins over a platform rule, which wins over a percentage-based rollout,
which falls back to the default route. The percent rule
(``route:pct:microservice``) performs stochastic routing: a uniform
random draw below the configured percentage returns ``"microservice"``.
This calls :meth:`_refresh_cache` when ``self._cache`` is empty and then
reads only from that in-memory cache (no further Redis I/O), looking up
keys such as ``route:channel:{channel}``, ``route:guild:{guild}``,
``route:platform:{platform}``, ``route:pct:microservice`` and
``route:default``. It is invoked exclusively through the circuit
breaker by :meth:`get_route` (passed as the wrapped callable), never
directly by external callers. A non-numeric percent value is ignored
(caught ``ValueError``) rather than raised.
Args:
channel: Optional channel identifier; its rule has highest priority.
guild: Optional guild/server identifier; second priority.
platform: Optional platform name (e.g. ``"discord"``); third priority.
Returns:
str: The resolved route — a rule value (typically ``"monolith"``,
``"microservice"`` or ``"shadow"``), ``"microservice"`` from the
percentage rollout, or the cached ``route:default`` (falling back to
``self._default_route``).
"""
# Check cache if empty
if not self._cache:
await self._refresh_cache()
# 5-level routing hierarchy: channel > guild > platform > pct > default
if channel and (route := self._cache.get(f"route:channel:{channel}")):
return route
if guild and (route := self._cache.get(f"route:guild:{guild}")):
return route
if platform and (route := self._cache.get(f"route:platform:{platform}")):
return route
# Percent-based rollout (stochastic routing based on pct threshold)
pct = self._cache.get("route:pct:microservice")
if pct is not None:
try:
pct_val = float(pct)
if random.random() * 100 < pct_val:
return "microservice"
except ValueError:
pass
return self._cache.get("route:default", self._default_route)
async def _refresh_cache(self):
"""Reload all routing rules from Redis into the in-memory cache.
Reads the entire ``sg:strangler:routes`` hash and copies every
field/value into ``self._cache``, decoding ``bytes`` keys and values to
``str`` (and coercing other types via ``str``) so subsequent lookups in
:meth:`_fetch_route` can use plain string keys. Existing cache entries
are overwritten/merged rather than cleared first.
This issues a single ``self.redis.hgetall("sg:strangler:routes")`` call
— the only Redis read this class performs — against the hash that
operator scripts (``scripts/canary_rollout.py``,
``scripts/enable_shadow_mode.py``) populate. Called by
:meth:`_fetch_route` on a cold cache; because that call runs inside the
circuit breaker, a Redis failure here counts toward tripping it.
"""
# Fetch all routing rules from redis
rules = await self.redis.hgetall("sg:strangler:routes")
for k, v in rules.items():
key_str = k.decode('utf-8') if isinstance(k, bytes) else str(k)
val_str = v.decode('utf-8') if isinstance(v, bytes) else str(v)
self._cache[key_str] = val_str