Source code for strangler_router

"""DEPRECATED (Phase T3, 2026-06-02) — retained for historical reference only.

During the monolith-to-microservices migration the strangler-fig router
decided, per channel/guild, whether a message was served by the legacy
monolith or routed over the event bus to the new services. The monolith has
since been retired and all traffic flows through the microservices, so this
module is no longer imported or instantiated by any service.

Do **not** import or use :class:`StranglerRouter` in new code; it exists
only to document the migration history. See :mod:`core.strangler_router` for
the equivalent ``core``-package copy.
"""

# DEPRECATED — Phase T3 (2026-06-02)
# The StranglerRouter was removed from GatewayService once the monolith was
# confirmed dead and full microservice routing was validated in production.
# This file is retained for historical reference only.
# Do NOT import or instantiate StranglerRouter in new code.

import asyncio
import logging
import random
from typing import Optional
from core.circuit_breaker import CircuitBreaker, CircuitBreakerOpenException

logger = logging.getLogger(__name__)

[docs] class StranglerRouter: """Deprecated strangler-fig router that chose monolith vs. microservice. During the monolith-to-microservices migration this resolved, per channel/guild/platform, whether a message was served by the legacy monolith or routed over the event bus to the new services, supporting a percentage-based rollout and a default fallback. Rules live in the ``sg:strangler:routes`` Redis hash and are cached in memory; a pub/sub listener on ``sg:pubsub:config`` flushes that cache when operator scripts rewrite the rules, and a :class:`~core.circuit_breaker.CircuitBreaker` guards the cache-miss Redis read so a flaky Redis degrades to the safe ``monolith`` default rather than raising. Retired in Phase T3 once the monolith was confirmed dead: no live service constructs or imports this class anymore, and the only remaining callers are the migration test suite (``tests/core/migration/test_strangler_router.py``). Do not use it in new code; see :mod:`core.strangler_router` for the package copy. """
[docs] def __init__(self, redis): """Initialize the router with a Redis client and an empty route cache. Sets up the in-memory ``_cache`` dict that holds routing rules pulled from Redis, a :class:`~core.circuit_breaker.CircuitBreaker` (``failure_threshold=3``, ``reset_timeout=30.0``) that guards the cache-miss path against a flaky Redis, an unset ``_listen_task`` handle for the background invalidation listener, and a ``_default_route`` of ``"monolith"`` used as the safe fallback. This stores the passed ``redis`` handle but performs no I/O; the cache is populated lazily on the first :meth:`_fetch_route` call. As this module is deprecated (Phase T3), no live service constructs this class; the only callers are historical and the test suite. Args: redis: An async Redis client (e.g. ``redis.asyncio.Redis``) used for ``hgetall`` of the routes hash and for pub/sub subscription to config-change notifications. """ self.redis = redis self._cache = {} # Protects against Redis being down during cache misses self._circuit_breaker = CircuitBreaker(failure_threshold=3, reset_timeout=30.0) self._listen_task = None self._default_route = "monolith"
[docs] async def start(self): """Launch the background cache-invalidation listener. Schedules :meth:`_listen_for_invalidations` as a fire-and-forget ``asyncio`` task whose handle is stored on ``self._listen_task`` so :meth:`stop` can cancel it later. Returns immediately without awaiting the listener. This calls :func:`asyncio.create_task`; the spawned task subscribes to the ``sg:pubsub:config`` Redis channel. Historically invoked by the owning ``GatewayService`` during startup, but that wiring was removed in Phase T3, so no live caller remains (only the test suite exercises it directly). """ self._listen_task = asyncio.create_task(self._listen_for_invalidations())
[docs] async def stop(self): """Cancel the background invalidation listener and await its teardown. If a listener task was started, requests cancellation and awaits it, swallowing the resulting :class:`asyncio.CancelledError` so shutdown completes cleanly. A no-op when :meth:`start` was never called. This cancels the task created in :meth:`start` (which runs :meth:`_listen_for_invalidations`). Historically called by ``GatewayService`` on shutdown; that wiring was removed in Phase T3, so only the test suite calls it now. """ if self._listen_task: self._listen_task.cancel() try: await self._listen_task except asyncio.CancelledError: pass
async def _listen_for_invalidations(self): """Subscribe to config pub/sub and flush the route cache on any message. Runs as a long-lived background task: opens a Redis pub/sub connection, subscribes to the ``sg:pubsub:config`` channel, and for every published message clears ``self._cache`` so the next :meth:`_fetch_route` reloads fresh rules via :meth:`_refresh_cache`. This is how rollout changes made by operator scripts take effect without a restart. This calls ``self.redis.pubsub()`` / ``subscribe`` and iterates ``pubsub.listen()``; its side effect is mutating ``self._cache`` (clearing it) and emitting an ``strangler_router_cache_invalidated`` log line per notification. The config channel is published to by ``scripts/canary_rollout.py`` and ``scripts/enable_shadow_mode.py`` after they rewrite the ``sg:strangler:routes`` hash. Re-raises :class:`asyncio.CancelledError` so :meth:`stop` can tear it down; any other exception is logged as ``strangler_pubsub_error`` and ends the loop. Spawned only by :meth:`start`. """ try: pubsub = self.redis.pubsub() await pubsub.subscribe("sg:pubsub:config") async for message in pubsub.listen(): if message["type"] == "message": logger.info("strangler_router_cache_invalidated") self._cache.clear() except asyncio.CancelledError: raise except Exception as e: logger.error(f"strangler_pubsub_error: {e}")
[docs] async def get_route( self, channel: str = None, guild: str = None, platform: str = None ) -> str: """Resolve the routing target for a message, degrading safely on failure. Public entry point for routing decisions: delegates to :meth:`_fetch_route` through the circuit breaker so a struggling Redis trips the breaker instead of blocking, and converts every failure mode into the safe ``self._default_route`` (``"monolith"``). A tripped breaker logs ``strangler_circuit_open_fallback_monolith`` and any other exception logs ``strangler_route_error`` before falling back. As the module is deprecated (Phase T3), no live service calls this; only the migration test suite exercises it. Args: channel: Optional channel identifier (highest-priority rule). guild: Optional guild/server identifier (second priority). platform: Optional platform name such as ``"discord"`` (third priority). Returns: str: The resolved route — typically ``"monolith"``, ``"microservice"``, or ``"shadow"`` — falling back to the default route on any error. """ try: route = await self._circuit_breaker.call( self._fetch_route, channel, guild, platform ) return route except CircuitBreakerOpenException: logger.warning("strangler_circuit_open_fallback_monolith") return self._default_route except Exception as e: logger.error(f"strangler_route_error error={e}") return self._default_route
async def _fetch_route( self, channel: str = None, guild: str = None, platform: str = None ) -> str: """Resolve a routing target from the cached rules using a 5-level hierarchy. Lazily populates the cache on first use, then evaluates rules in priority order: a channel-specific rule wins over a guild rule, which wins over a platform rule, which wins over a percentage-based rollout, which falls back to the default route. The percent rule (``route:pct:microservice``) performs stochastic routing: a uniform random draw below the configured percentage returns ``"microservice"``. This calls :meth:`_refresh_cache` when ``self._cache`` is empty and then reads only from that in-memory cache (no further Redis I/O), looking up keys such as ``route:channel:{channel}``, ``route:guild:{guild}``, ``route:platform:{platform}``, ``route:pct:microservice`` and ``route:default``. It is invoked exclusively through the circuit breaker by :meth:`get_route` (passed as the wrapped callable), never directly by external callers. A non-numeric percent value is ignored (caught ``ValueError``) rather than raised. Args: channel: Optional channel identifier; its rule has highest priority. guild: Optional guild/server identifier; second priority. platform: Optional platform name (e.g. ``"discord"``); third priority. Returns: str: The resolved route — a rule value (typically ``"monolith"``, ``"microservice"`` or ``"shadow"``), ``"microservice"`` from the percentage rollout, or the cached ``route:default`` (falling back to ``self._default_route``). """ # Check cache if empty if not self._cache: await self._refresh_cache() # 5-level routing hierarchy: channel > guild > platform > pct > default if channel and (route := self._cache.get(f"route:channel:{channel}")): return route if guild and (route := self._cache.get(f"route:guild:{guild}")): return route if platform and (route := self._cache.get(f"route:platform:{platform}")): return route # Percent-based rollout (stochastic routing based on pct threshold) pct = self._cache.get("route:pct:microservice") if pct is not None: try: pct_val = float(pct) if random.random() * 100 < pct_val: return "microservice" except ValueError: pass return self._cache.get("route:default", self._default_route) async def _refresh_cache(self): """Reload all routing rules from Redis into the in-memory cache. Reads the entire ``sg:strangler:routes`` hash and copies every field/value into ``self._cache``, decoding ``bytes`` keys and values to ``str`` (and coercing other types via ``str``) so subsequent lookups in :meth:`_fetch_route` can use plain string keys. Existing cache entries are overwritten/merged rather than cleared first. This issues a single ``self.redis.hgetall("sg:strangler:routes")`` call — the only Redis read this class performs — against the hash that operator scripts (``scripts/canary_rollout.py``, ``scripts/enable_shadow_mode.py``) populate. Called by :meth:`_fetch_route` on a cold cache; because that call runs inside the circuit breaker, a Redis failure here counts toward tripping it. """ # Fetch all routing rules from redis rules = await self.redis.hgetall("sg:strangler:routes") for k, v in rules.items(): key_str = k.decode('utf-8') if isinstance(k, bytes) else str(k) val_str = v.decode('utf-8') if isinstance(v, bytes) else str(v) self._cache[key_str] = val_str