Source code for core.strangler_router

"""DEPRECATED (Phase T3, 2026-06-02) — retained for historical reference.

The strangler-fig router used to decide, per channel/guild, whether a
message was handled by the legacy monolith or the new microservices.
Now that the monolith is gone and all traffic routes over the event
bus, :class:`StranglerRouter` is no longer instantiated by any
service. Do **not** import or use it in new code; it is documented
only to explain the migration history.
"""

# DEPRECATED — Phase T3 (2026-06-02)
# The StranglerRouter was removed from GatewayService once the monolith was
# confirmed dead and full microservice routing was validated in production.
# This file is retained for historical reference only.
# Do NOT import or instantiate StranglerRouter in new code.

import asyncio
import logging
import random
from typing import Optional
from core.circuit_breaker import CircuitBreaker, CircuitBreakerOpenException

logger = logging.getLogger(__name__)

[docs] class StranglerRouter: """DEPRECATED strangler-fig router for monolith-vs-microservice traffic. During the migration this decided, per channel / guild / platform, whether an inbound message was handled by the legacy monolith or the new microservices, using a cached rule hierarchy plus a percentage rollout knob so traffic could be shifted gradually and safely. As of Phase T3 (2026-06-02) the monolith is gone and all traffic routes over the event bus, so no live service constructs this class -- it is retained only to document the migration and is exercised solely by ``tests/core/migration/test_strangler_router.py``. Reads its rules from the ``sg:strangler:routes`` Redis hash, subscribes to the ``sg:pubsub:config`` channel to invalidate its in-memory cache when an operator script (``scripts/canary_rollout.py``, ``scripts/enable_shadow_mode.py``) republishes routes, and shields the cache-miss path with a :class:`~core.circuit_breaker.CircuitBreaker` so a flaky Redis falls back to a safe default rather than failing routing. Do **not** import or instantiate it in new code. """
[docs] def __init__(self, redis): """Initialize the router with a Redis handle and an empty route cache. Sets up the in-memory ``_cache`` dict that holds routing rules pulled from Redis, a :class:`~core.circuit_breaker.CircuitBreaker` (``failure_threshold=3``, ``reset_timeout=30.0``) guarding the cache-miss path against a flaky Redis, an unset ``_listen_task`` handle for the background invalidation listener, and a ``_default_route`` of ``"monolith"`` used as the safe fallback. This only stores the passed ``redis`` handle and constructs the circuit breaker; it performs no I/O, so the cache is populated lazily on the first :meth:`_fetch_route` call. Since the module is deprecated (Phase T3) no live service constructs this class; the only caller is the test suite (``tests/core/migration/test_strangler_router.py``). Args: redis: An async Redis client (e.g. ``redis.asyncio.Redis``) used for ``hgetall`` of the routes hash and for pub/sub subscription to config-change notifications. """ self.redis = redis self._cache = {} # Protects against Redis being down during cache misses self._circuit_breaker = CircuitBreaker(failure_threshold=3, reset_timeout=30.0) self._listen_task = None self._default_route = "monolith"
[docs] async def start(self): """Launch the background cache-invalidation listener. Schedules :meth:`_listen_for_invalidations` as a fire-and-forget ``asyncio`` task and stores its handle on ``self._listen_task`` so :meth:`stop` can cancel it later. Returns immediately without awaiting the listener. This calls :func:`asyncio.create_task`; the spawned task subscribes to the ``sg:pubsub:config`` Redis channel and clears ``self._cache`` on any published message. Historically called by ``GatewayService`` during startup; as of Phase T3 there are no live callers (the router was removed from the gateway), so only the test suite exercises it. """ self._listen_task = asyncio.create_task(self._listen_for_invalidations())
[docs] async def stop(self): """Cancel and join the background cache-invalidation listener. Idempotent teardown counterpart to :meth:`start`: if a listener task is running it is cancelled and awaited, swallowing the resulting :class:`asyncio.CancelledError` so shutdown completes cleanly. When no task was ever started this is a no-op. This cancels the task created by :meth:`start` (which holds the ``sg:pubsub:config`` pub/sub subscription). Historically called by ``GatewayService`` during shutdown; as of Phase T3 only the test suite invokes it. """ if self._listen_task: self._listen_task.cancel() try: await self._listen_task except asyncio.CancelledError: pass
async def _listen_for_invalidations(self): """Subscribe to config pub/sub and flush the route cache on each update. Runs as a long-lived background loop (the task created by :meth:`start`): it opens a Redis pub/sub, subscribes to the ``sg:pubsub:config`` channel, and for every published ``"message"`` it logs ``strangler_router_cache_invalidated`` and clears ``self._cache``, forcing the next :meth:`_fetch_route` to re-read the routes hash from Redis. This is how operator scripts push route changes live: both ``scripts/canary_rollout.py`` and ``scripts/enable_shadow_mode.py`` rewrite ``sg:strangler:routes`` and then ``publish`` to ``sg:pubsub:config`` to trigger this invalidation. Interacts with ``self.redis.pubsub()`` for the subscription. A :class:`asyncio.CancelledError` (from :meth:`stop`) is re-raised so the task terminates promptly; any other exception is logged as ``strangler_pubsub_error`` and ends the loop without propagating. Called only by :meth:`start` via :func:`asyncio.create_task`; never invoked directly. Raises: asyncio.CancelledError: Re-raised when the task is cancelled so the listener can be torn down by :meth:`stop`. """ try: pubsub = self.redis.pubsub() await pubsub.subscribe("sg:pubsub:config") async for message in pubsub.listen(): if message["type"] == "message": logger.info("strangler_router_cache_invalidated") self._cache.clear() except asyncio.CancelledError: raise except Exception as e: logger.error(f"strangler_pubsub_error: {e}")
[docs] async def get_route( self, channel: str = None, guild: str = None, platform: str = None ) -> str: """Resolve the route for a message, failing safe on any error. Public entry point of the router: it delegates the real decision to :meth:`_fetch_route` but runs it inside the instance's :class:`~core.circuit_breaker.CircuitBreaker` so a Redis outage during a cache refresh trips the breaker instead of stalling every message. On either an open breaker or any other exception it logs and returns the safe ``self._default_route`` (``"monolith"``), so routing degrades rather than throws. Touches Redis only indirectly, through :meth:`_fetch_route` / :meth:`_refresh_cache` reading ``sg:strangler:routes``. Since the module is deprecated (Phase T3) it has no live callers; only ``tests/core/migration/test_strangler_router.py`` invokes it. Args: channel: Optional channel identifier; highest-priority override. guild: Optional guild/server identifier; second-priority override. platform: Optional platform name (e.g. ``"discord"``); third-priority override. Returns: str: The resolved route (typically ``"microservice"`` or ``"monolith"``), falling back to ``self._default_route`` on breaker open or error. """ try: route = await self._circuit_breaker.call( self._fetch_route, channel, guild, platform ) return route except CircuitBreakerOpenException: logger.warning("strangler_circuit_open_fallback_monolith") return self._default_route except Exception as e: logger.error(f"strangler_route_error error={e}") return self._default_route
async def _fetch_route( self, channel: str = None, guild: str = None, platform: str = None ) -> str: """Resolve a route from the cached rules using the 5-level hierarchy. Implements the actual routing decision behind :meth:`get_route`. On a cold cache it first calls :meth:`_refresh_cache`, then consults the rules in priority order: a channel-specific override, then a guild-specific one, then a platform-specific one. If none match, a percentage rollout key (``route:pct:microservice``) performs stochastic routing -- a uniform :func:`random.random` draw scaled to 0-100 is compared against the threshold and, when below it, returns ``"microservice"`` (a non-numeric threshold is ignored). Otherwise it returns the cached ``route:default`` value, or ``self._default_route`` (``"monolith"``) if that key is absent. Reads from ``self._cache`` (populated from the ``sg:strangler:routes`` Redis hash) using keys ``route:channel:{channel}``, ``route:guild:{guild}``, ``route:platform:{platform}``, ``route:pct:microservice`` and ``route:default``. It mutates no state beyond possibly triggering a cache refresh. Called by :meth:`get_route`, which wraps it in the circuit breaker so a Redis failure during :meth:`_refresh_cache` trips the breaker rather than surfacing here. No other internal callers; tests invoke it indirectly via :meth:`get_route`. Args: channel: Optional channel identifier; highest-priority override. guild: Optional guild/server identifier; second-priority override. platform: Optional platform name (e.g. ``"discord"``); third-priority override. Returns: str: The resolved route, typically ``"microservice"`` or ``"monolith"`` (whatever value the matching rule holds). """ # Check cache if empty if not self._cache: await self._refresh_cache() # 5-level routing hierarchy: channel > guild > platform > pct > default if channel and (route := self._cache.get(f"route:channel:{channel}")): return route if guild and (route := self._cache.get(f"route:guild:{guild}")): return route if platform and (route := self._cache.get(f"route:platform:{platform}")): return route # Percent-based rollout (stochastic routing based on pct threshold) pct = self._cache.get("route:pct:microservice") if pct is not None: try: pct_val = float(pct) if random.random() * 100 < pct_val: return "microservice" except ValueError: pass return self._cache.get("route:default", self._default_route) async def _refresh_cache(self): """Reload all routing rules from Redis into the in-memory cache. Reads the entire ``sg:strangler:routes`` hash and copies every field/value into ``self._cache``, decoding ``bytes`` keys and values to ``str`` (and coercing any non-bytes to ``str``) so later lookups in :meth:`_fetch_route` use plain string keys. Existing cache entries are overwritten but not cleared first. Issues a single ``self.redis.hgetall("sg:strangler:routes")`` call; that hash is maintained by the operator scripts ``scripts/canary_rollout.py`` and ``scripts/enable_shadow_mode.py``. Any Redis error propagates to the caller. Called by :meth:`_fetch_route` on a cold (empty) cache, which itself runs inside the :meth:`get_route` circuit breaker, so a failure here is what trips that breaker. No other internal callers. Raises: Exception: Propagates any error raised by ``redis.hgetall`` (e.g. connection failures), which the circuit breaker counts toward its failure threshold. """ # Fetch all routing rules from redis rules = await self.redis.hgetall("sg:strangler:routes") for k, v in rules.items(): key_str = k.decode('utf-8') if isinstance(k, bytes) else str(k) val_str = v.decode('utf-8') if isinstance(v, bytes) else str(v) self._cache[key_str] = val_str