Source code for core.resilience

"""Redis resilience helpers — circuit breaking with write replay.

:class:`RedisCircuitBreaker` wraps a Redis client so that, while Redis
is unavailable, reads return safe fallbacks and write mutations
(:data:`WRITE_COMMANDS`) are queued and replayed on recovery,
smoothing over transient Redis outages without crashing the service.
"""

import logging
import asyncio
import time
from typing import Any, Callable, Dict, List, Tuple

logger = logging.getLogger("stargazer.resilience")

WRITE_COMMANDS = {"set", "delete", "hset", "sadd", "srem", "rpush", "ltrim", "xadd", "xack"}

[docs] class RedisCircuitBreaker: """Circuit breaker around a Redis client that buffers writes during outages. Tracks consecutive failures and trips from ``CLOSED`` to ``OPEN`` once ``failure_threshold`` is reached, after which :meth:`execute` returns neutral fallbacks for reads and queues write commands (those in :data:`WRITE_COMMANDS`) onto an in-memory re-sync queue instead of touching the failing client. After ``recovery_timeout`` it probes the connection in ``HALF-OPEN``; a successful probe closes the breaker and triggers :meth:`flush_re_sync_queue` to replay the buffered writes in order, so a transient Redis outage degrades gracefully and self-heals without losing mutations or crashing the service. Wrapped by :class:`ResilientRedis` to form the drop-in client surface; exercised directly by the adversarial network-partition tests (``tests/adversarial/test_network_partition.py``). """
[docs] def __init__(self, redis_client: Any, failure_threshold: int = 5, recovery_timeout: float = 10.0): """Initialize a circuit breaker around a Redis client. Sets the breaker to the ``CLOSED`` (healthy) state with no recorded failures and an empty re-sync queue. The wrapped ``redis_client`` is stored for later use both by :meth:`execute` (to run commands while healthy) and by :meth:`flush_re_sync_queue` (to replay queued writes on recovery). No I/O is performed here; this only captures state. This is called when constructing a :class:`RedisCircuitBreaker`, which is then handed to :class:`ResilientRedis`. In the repo it is exercised directly by the adversarial network-partition tests (``tests/adversarial/test_network_partition.py``); production code obtains the breaker via dependency injection rather than instantiating it here. Args: redis_client: The underlying async Redis client whose methods are guarded and, on failure, queued/replayed. failure_threshold: Number of consecutive failures that trips the breaker from ``CLOSED`` to ``OPEN``. recovery_timeout: Seconds the breaker stays ``OPEN`` before a command is allowed through in ``HALF-OPEN`` to probe recovery. """ self._redis = redis_client self._failure_threshold = failure_threshold self._recovery_timeout = recovery_timeout self._state = "CLOSED" # CLOSED, OPEN, HALF-OPEN self._failures = 0 self._last_state_change = 0.0 # List to store queued write mutations: (method_name, args, kwargs) self._re_sync_queue: List[Tuple[str, tuple, dict]] = []
@property def state(self) -> str: """Return the current breaker state. Exposes the internal ``_state`` machine value, one of ``"CLOSED"`` (healthy, commands pass through), ``"OPEN"`` (tripped, reads return fallbacks and writes are queued), or ``"HALF-OPEN"`` (probing recovery). Read-only; state transitions happen inside :meth:`execute` and :meth:`flush_re_sync_queue`. Read by the adversarial network-partition tests (``tests/adversarial/test_network_partition.py``) to assert the breaker tripped or recovered. No internal callers in this module read it. Returns: str: The current state string. """ return self._state @property def failures(self) -> int: """Return the count of consecutive failures since the last success. Reflects the internal ``_failures`` counter, which :meth:`execute` increments on each failed command and resets to zero on success. When it reaches ``failure_threshold`` the breaker trips to ``OPEN``. Read by the adversarial network-partition tests (``tests/adversarial/test_network_partition.py``) to assert failure accounting. No internal callers in this module read it. Returns: int: The current consecutive-failure count. """ return self._failures def _get_read_fallback(self, method_name: str) -> Any: """Return a safe, empty fallback value for a read command. Maps a Redis read method name to the neutral value its caller can tolerate while the breaker is ``OPEN``, so reads degrade gracefully instead of raising: an empty mapping for hash reads, an empty list for range/stream reads, ``False`` for ``sismember``, an empty set for ``smembers``, and ``None`` otherwise. Pure function with no side effects. Called by :meth:`execute` when the breaker is ``OPEN`` and the method is not a write command, supplying the value returned in place of contacting Redis. No external callers. Args: method_name: The Redis method being shielded (e.g. ``"hgetall"``). Returns: Any: An empty/neutral value appropriate to ``method_name``. """ if method_name in ("hgetall", "hgetall_bin"): return {} if method_name in ("lrange", "xread", "xreadgroup"): return [] if method_name in ("sismember",): return False if method_name in ("smembers",): return set() return None
[docs] async def execute(self, method_name: str, func: Callable, *args, **kwargs) -> Any: """Run a guarded Redis command through the circuit breaker. Drives the breaker's state machine. While ``OPEN`` and still inside the recovery window, write commands (those in :data:`WRITE_COMMANDS`) are appended to the in-memory re-sync queue and a fake ``"OK"`` is returned, while reads short-circuit to :meth:`_get_read_fallback`; once the recovery timeout elapses the breaker moves to ``HALF-OPEN`` and lets the next command through to probe Redis. On a successful call it resets the failure counter, and from ``HALF-OPEN`` it closes the breaker and schedules :meth:`flush_re_sync_queue` as a background task. On a failed call it queues failed writes for later replay, increments the failure counter, and trips to ``OPEN`` if in ``HALF-OPEN`` or once ``failure_threshold`` consecutive failures are reached. Interacts with the wrapped client only through the supplied ``func`` coroutine; it spawns ``asyncio.create_task`` for queue flushing, appends to ``self._re_sync_queue``, and emits log records on the ``stargazer.resilience`` logger. Called by :meth:`ResilientRedis.__getattr__`'s ``wrapped`` closure for every attribute access on a :class:`ResilientRedis`, and exercised directly (via ``ResilientRedis``) by ``tests/adversarial/test_network_partition``. Args: method_name: Name of the Redis command, used to classify it as a read or write and to pick a read fallback. func: The bound coroutine function to invoke when the breaker permits the call. *args: Positional arguments forwarded to ``func`` (and stored for replay if the write is queued). **kwargs: Keyword arguments forwarded to ``func`` (and stored for replay if the write is queued). Returns: Any: The result of ``func`` on success; ``"OK"`` for a write queued while ``OPEN``; or a neutral read fallback while ``OPEN``. Raises: Exception: Re-raises whatever ``func`` raised after recording the failure and (if applicable) queuing the write and tripping the breaker. """ # Check state transitions if self._state == "OPEN": if time.time() - self._last_state_change > self._recovery_timeout: self._state = "HALF-OPEN" logger.info("Circuit breaker entering HALF-OPEN state, testing connection...") else: # If it's a write command, queue it so we can replay it later if method_name in WRITE_COMMANDS: self._re_sync_queue.append((method_name, args, kwargs)) logger.debug( "Queueing write command %s during OPEN state (current queue size: %d)", method_name, len(self._re_sync_queue), ) return "OK" else: # It's a read command. Return fallback return self._get_read_fallback(method_name) try: result = await func(*args, **kwargs) # If successful, we potentially close the breaker if self._state == "HALF-OPEN": logger.info("Redis operation succeeded in HALF-OPEN state. Closing circuit breaker.") self._state = "CLOSED" self._failures = 0 # Trigger playback queue in background asyncio.create_task(self.flush_re_sync_queue()) elif self._state == "CLOSED": self._failures = 0 # reset on success return result except Exception as e: # If the command failed and it's a write command, queue it if method_name in WRITE_COMMANDS: self._re_sync_queue.append((method_name, args, kwargs)) logger.debug( "Queueing failed write command %s: %s (current queue size: %d)", method_name, e, len(self._re_sync_queue), ) self._failures += 1 if self._state == "HALF-OPEN" or self._failures >= self._failure_threshold: self._state = "OPEN" self._last_state_change = time.time() logger.warning("Redis circuit breaker tripped to OPEN! Consecutive failures: %d. Error: %s", self._failures, e) raise e
[docs] async def flush_re_sync_queue(self) -> None: """Replay queued write commands in FIFO order after Redis recovers. The recovery half of the write-replay scheme: every mutation queued while the breaker was ``OPEN`` (in :meth:`execute`) is drained here and re-run against the wrapped Redis client so no buffered write is lost. It snapshots and clears ``self._re_sync_queue`` up front, then for each ``(method_name, args, kwargs)`` resolves the method with ``getattr`` on the underlying client and awaits it in order. If any replayed command fails, that command is re-inserted at the front of the queue, the breaker is forced back to ``OPEN`` (resetting ``_last_state_change`` and pinning ``_failures`` at the threshold), and the flush stops so the remaining writes are retried on the next recovery rather than reordered or dropped. Progress and outcomes are logged on the ``stargazer.resilience`` logger. Scheduled as a fire-and-forget ``asyncio`` task by :meth:`execute` when a ``HALF-OPEN`` probe succeeds and the breaker closes; not called directly elsewhere in this module. """ if not self._re_sync_queue: return logger.info("Flushing %d queued write operations...", len(self._re_sync_queue)) temp_queue = list(self._re_sync_queue) self._re_sync_queue.clear() replayed_count = 0 for method_name, args, kwargs in temp_queue: try: func = getattr(self._redis, method_name) await func(*args, **kwargs) replayed_count += 1 logger.debug("Successfully replayed %s", method_name) except Exception as e: # If it fails again, put it back in the front of the queue and trip back to OPEN logger.warning("Playback of %s failed: %s. Tripping circuit breaker back to OPEN.", method_name, e) self._re_sync_queue.insert(0, (method_name, args, kwargs)) self._state = "OPEN" self._last_state_change = time.time() self._failures = self._failure_threshold break if replayed_count > 0 and self._state == "CLOSED": logger.info("Successfully completed playback of all %d queued write operations.", replayed_count)
[docs] class ResilientRedis: """Drop-in async Redis client that funnels every call through a circuit breaker. A thin proxy holding a :class:`RedisCircuitBreaker`: its :meth:`__getattr__` transparently routes each method access (``get``, ``hset``, ``xadd``, ...) through :meth:`RedisCircuitBreaker.execute`, so callers use it exactly like a raw async Redis client while gaining the breaker's fallback-and-replay protection. Constructed wherever a resilience-guarded Redis handle is wanted; in this repo it is instantiated and driven by the adversarial network-partition tests (``tests/adversarial/test_network_partition.py``). """
[docs] def __init__(self, breaker: RedisCircuitBreaker): """Wrap a circuit breaker as a drop-in Redis client. Stores the :class:`RedisCircuitBreaker` so that all attribute access is later routed through it by :meth:`__getattr__`. No I/O is performed. Constructed wherever a resilience-guarded Redis handle is needed (the breaker plus this wrapper form the public surface); in the repo it is instantiated directly by ``tests/adversarial/test_network_partition.py`` and passed where a plain async Redis client is expected. Args: breaker: The circuit breaker whose ``execute`` gates each call and whose wrapped client supplies the actual methods/attributes. """ self._breaker = breaker
[docs] def __getattr__(self, name: str) -> Any: """Proxy attribute access to the wrapped Redis client via the breaker. Looks ``name`` up on the breaker's underlying Redis client. If the resolved attribute is callable, returns an async ``wrapped`` closure that funnels the call through :meth:`RedisCircuitBreaker.execute` so it is guarded, queued, or falls back as appropriate; non-callable attributes are returned unchanged. Because this dunder only fires for attributes not found normally, ``self._breaker`` itself is resolved without recursion. Invoked implicitly by Python on every ``resilient_redis.<name>`` access, e.g. ``resilient_redis.get(...)`` / ``resilient_redis.hset(...)`` in ``tests/adversarial/test_network_partition.py`` and in any component handed a ``ResilientRedis`` in place of a raw client. Args: name: The attribute or method name being accessed on the proxy. Returns: Any: An async wrapper coroutine for callable attributes, or the raw attribute value for non-callables. """ attr = getattr(self._breaker._redis, name) if callable(attr): async def wrapped(*args, **kwargs): """Forward this call through the breaker's :meth:`RedisCircuitBreaker.execute`. Closure bound to the outer ``name`` and ``attr``; awaiting it runs the guarded command (or returns a fallback / queues the write) and yields the breaker's result. Args: *args: Positional arguments for the underlying Redis method. **kwargs: Keyword arguments for the underlying Redis method. Returns: Any: Whatever :meth:`RedisCircuitBreaker.execute` returns. """ return await self._breaker.execute(name, attr, *args, **kwargs) return wrapped return attr