"""Redis resilience helpers — circuit breaking with write replay.
:class:`RedisCircuitBreaker` wraps a Redis client so that, while Redis
is unavailable, reads return safe fallbacks and write mutations
(:data:`WRITE_COMMANDS`) are queued and replayed on recovery,
smoothing over transient Redis outages without crashing the service.
"""
import logging
import asyncio
import time
from typing import Any, Callable, Dict, List, Tuple
logger = logging.getLogger("stargazer.resilience")
WRITE_COMMANDS = {"set", "delete", "hset", "sadd", "srem", "rpush", "ltrim", "xadd", "xack"}
[docs]
class RedisCircuitBreaker:
"""Circuit breaker around a Redis client that buffers writes during outages.
Tracks consecutive failures and trips from ``CLOSED`` to ``OPEN`` once
``failure_threshold`` is reached, after which :meth:`execute` returns neutral
fallbacks for reads and queues write commands (those in
:data:`WRITE_COMMANDS`) onto an in-memory re-sync queue instead of touching
the failing client. After ``recovery_timeout`` it probes the connection in
``HALF-OPEN``; a successful probe closes the breaker and triggers
:meth:`flush_re_sync_queue` to replay the buffered writes in order, so a
transient Redis outage degrades gracefully and self-heals without losing
mutations or crashing the service. Wrapped by :class:`ResilientRedis` to form
the drop-in client surface; exercised directly by the adversarial
network-partition tests (``tests/adversarial/test_network_partition.py``).
"""
[docs]
def __init__(self, redis_client: Any, failure_threshold: int = 5, recovery_timeout: float = 10.0):
"""Initialize a circuit breaker around a Redis client.
Sets the breaker to the ``CLOSED`` (healthy) state with no recorded
failures and an empty re-sync queue. The wrapped ``redis_client`` is
stored for later use both by :meth:`execute` (to run commands while
healthy) and by :meth:`flush_re_sync_queue` (to replay queued writes
on recovery). No I/O is performed here; this only captures state.
This is called when constructing a :class:`RedisCircuitBreaker`, which
is then handed to :class:`ResilientRedis`. In the repo it is exercised
directly by the adversarial network-partition tests
(``tests/adversarial/test_network_partition.py``); production code
obtains the breaker via dependency injection rather than instantiating
it here.
Args:
redis_client: The underlying async Redis client whose methods are
guarded and, on failure, queued/replayed.
failure_threshold: Number of consecutive failures that trips the
breaker from ``CLOSED`` to ``OPEN``.
recovery_timeout: Seconds the breaker stays ``OPEN`` before a
command is allowed through in ``HALF-OPEN`` to probe recovery.
"""
self._redis = redis_client
self._failure_threshold = failure_threshold
self._recovery_timeout = recovery_timeout
self._state = "CLOSED" # CLOSED, OPEN, HALF-OPEN
self._failures = 0
self._last_state_change = 0.0
# List to store queued write mutations: (method_name, args, kwargs)
self._re_sync_queue: List[Tuple[str, tuple, dict]] = []
@property
def state(self) -> str:
"""Return the current breaker state.
Exposes the internal ``_state`` machine value, one of ``"CLOSED"``
(healthy, commands pass through), ``"OPEN"`` (tripped, reads return
fallbacks and writes are queued), or ``"HALF-OPEN"`` (probing recovery).
Read-only; state transitions happen inside :meth:`execute` and
:meth:`flush_re_sync_queue`.
Read by the adversarial network-partition tests
(``tests/adversarial/test_network_partition.py``) to assert the breaker
tripped or recovered. No internal callers in this module read it.
Returns:
str: The current state string.
"""
return self._state
@property
def failures(self) -> int:
"""Return the count of consecutive failures since the last success.
Reflects the internal ``_failures`` counter, which :meth:`execute`
increments on each failed command and resets to zero on success. When
it reaches ``failure_threshold`` the breaker trips to ``OPEN``.
Read by the adversarial network-partition tests
(``tests/adversarial/test_network_partition.py``) to assert failure
accounting. No internal callers in this module read it.
Returns:
int: The current consecutive-failure count.
"""
return self._failures
def _get_read_fallback(self, method_name: str) -> Any:
"""Return a safe, empty fallback value for a read command.
Maps a Redis read method name to the neutral value its caller can
tolerate while the breaker is ``OPEN``, so reads degrade gracefully
instead of raising: an empty mapping for hash reads, an empty list for
range/stream reads, ``False`` for ``sismember``, an empty set for
``smembers``, and ``None`` otherwise. Pure function with no side
effects.
Called by :meth:`execute` when the breaker is ``OPEN`` and the method
is not a write command, supplying the value returned in place of
contacting Redis. No external callers.
Args:
method_name: The Redis method being shielded (e.g. ``"hgetall"``).
Returns:
Any: An empty/neutral value appropriate to ``method_name``.
"""
if method_name in ("hgetall", "hgetall_bin"):
return {}
if method_name in ("lrange", "xread", "xreadgroup"):
return []
if method_name in ("sismember",):
return False
if method_name in ("smembers",):
return set()
return None
[docs]
async def execute(self, method_name: str, func: Callable, *args, **kwargs) -> Any:
"""Run a guarded Redis command through the circuit breaker.
Drives the breaker's state machine. While ``OPEN`` and still inside the
recovery window, write commands (those in :data:`WRITE_COMMANDS`) are
appended to the in-memory re-sync queue and a fake ``"OK"`` is returned,
while reads short-circuit to :meth:`_get_read_fallback`; once the
recovery timeout elapses the breaker moves to ``HALF-OPEN`` and lets the
next command through to probe Redis. On a successful call it resets the
failure counter, and from ``HALF-OPEN`` it closes the breaker and
schedules :meth:`flush_re_sync_queue` as a background task. On a failed
call it queues failed writes for later replay, increments the failure
counter, and trips to ``OPEN`` if in ``HALF-OPEN`` or once
``failure_threshold`` consecutive failures are reached.
Interacts with the wrapped client only through the supplied ``func``
coroutine; it spawns ``asyncio.create_task`` for queue flushing,
appends to ``self._re_sync_queue``, and emits log records on the
``stargazer.resilience`` logger. Called by
:meth:`ResilientRedis.__getattr__`'s ``wrapped`` closure for every
attribute access on a :class:`ResilientRedis`, and exercised directly
(via ``ResilientRedis``) by ``tests/adversarial/test_network_partition``.
Args:
method_name: Name of the Redis command, used to classify it as a
read or write and to pick a read fallback.
func: The bound coroutine function to invoke when the breaker
permits the call.
*args: Positional arguments forwarded to ``func`` (and stored for
replay if the write is queued).
**kwargs: Keyword arguments forwarded to ``func`` (and stored for
replay if the write is queued).
Returns:
Any: The result of ``func`` on success; ``"OK"`` for a write queued
while ``OPEN``; or a neutral read fallback while ``OPEN``.
Raises:
Exception: Re-raises whatever ``func`` raised after recording the
failure and (if applicable) queuing the write and tripping the
breaker.
"""
# Check state transitions
if self._state == "OPEN":
if time.time() - self._last_state_change > self._recovery_timeout:
self._state = "HALF-OPEN"
logger.info("Circuit breaker entering HALF-OPEN state, testing connection...")
else:
# If it's a write command, queue it so we can replay it later
if method_name in WRITE_COMMANDS:
self._re_sync_queue.append((method_name, args, kwargs))
logger.debug(
"Queueing write command %s during OPEN state (current queue size: %d)",
method_name,
len(self._re_sync_queue),
)
return "OK"
else:
# It's a read command. Return fallback
return self._get_read_fallback(method_name)
try:
result = await func(*args, **kwargs)
# If successful, we potentially close the breaker
if self._state == "HALF-OPEN":
logger.info("Redis operation succeeded in HALF-OPEN state. Closing circuit breaker.")
self._state = "CLOSED"
self._failures = 0
# Trigger playback queue in background
asyncio.create_task(self.flush_re_sync_queue())
elif self._state == "CLOSED":
self._failures = 0 # reset on success
return result
except Exception as e:
# If the command failed and it's a write command, queue it
if method_name in WRITE_COMMANDS:
self._re_sync_queue.append((method_name, args, kwargs))
logger.debug(
"Queueing failed write command %s: %s (current queue size: %d)",
method_name,
e,
len(self._re_sync_queue),
)
self._failures += 1
if self._state == "HALF-OPEN" or self._failures >= self._failure_threshold:
self._state = "OPEN"
self._last_state_change = time.time()
logger.warning("Redis circuit breaker tripped to OPEN! Consecutive failures: %d. Error: %s", self._failures, e)
raise e
[docs]
async def flush_re_sync_queue(self) -> None:
"""Replay queued write commands in FIFO order after Redis recovers.
The recovery half of the write-replay scheme: every mutation queued while
the breaker was ``OPEN`` (in :meth:`execute`) is drained here and re-run
against the wrapped Redis client so no buffered write is lost. It snapshots
and clears ``self._re_sync_queue`` up front, then for each
``(method_name, args, kwargs)`` resolves the method with ``getattr`` on the
underlying client and awaits it in order.
If any replayed command fails, that command is re-inserted at the front of
the queue, the breaker is forced back to ``OPEN`` (resetting
``_last_state_change`` and pinning ``_failures`` at the threshold), and the
flush stops so the remaining writes are retried on the next recovery rather
than reordered or dropped. Progress and outcomes are logged on the
``stargazer.resilience`` logger.
Scheduled as a fire-and-forget ``asyncio`` task by :meth:`execute` when a
``HALF-OPEN`` probe succeeds and the breaker closes; not called directly
elsewhere in this module.
"""
if not self._re_sync_queue:
return
logger.info("Flushing %d queued write operations...", len(self._re_sync_queue))
temp_queue = list(self._re_sync_queue)
self._re_sync_queue.clear()
replayed_count = 0
for method_name, args, kwargs in temp_queue:
try:
func = getattr(self._redis, method_name)
await func(*args, **kwargs)
replayed_count += 1
logger.debug("Successfully replayed %s", method_name)
except Exception as e:
# If it fails again, put it back in the front of the queue and trip back to OPEN
logger.warning("Playback of %s failed: %s. Tripping circuit breaker back to OPEN.", method_name, e)
self._re_sync_queue.insert(0, (method_name, args, kwargs))
self._state = "OPEN"
self._last_state_change = time.time()
self._failures = self._failure_threshold
break
if replayed_count > 0 and self._state == "CLOSED":
logger.info("Successfully completed playback of all %d queued write operations.", replayed_count)
[docs]
class ResilientRedis:
"""Drop-in async Redis client that funnels every call through a circuit breaker.
A thin proxy holding a :class:`RedisCircuitBreaker`: its :meth:`__getattr__`
transparently routes each method access (``get``, ``hset``, ``xadd``, ...)
through :meth:`RedisCircuitBreaker.execute`, so callers use it exactly like a
raw async Redis client while gaining the breaker's fallback-and-replay
protection. Constructed wherever a resilience-guarded Redis handle is wanted;
in this repo it is instantiated and driven by the adversarial
network-partition tests (``tests/adversarial/test_network_partition.py``).
"""
[docs]
def __init__(self, breaker: RedisCircuitBreaker):
"""Wrap a circuit breaker as a drop-in Redis client.
Stores the :class:`RedisCircuitBreaker` so that all attribute access is
later routed through it by :meth:`__getattr__`. No I/O is performed.
Constructed wherever a resilience-guarded Redis handle is needed (the
breaker plus this wrapper form the public surface); in the repo it is
instantiated directly by ``tests/adversarial/test_network_partition.py``
and passed where a plain async Redis client is expected.
Args:
breaker: The circuit breaker whose ``execute`` gates each call and
whose wrapped client supplies the actual methods/attributes.
"""
self._breaker = breaker
[docs]
def __getattr__(self, name: str) -> Any:
"""Proxy attribute access to the wrapped Redis client via the breaker.
Looks ``name`` up on the breaker's underlying Redis client. If the
resolved attribute is callable, returns an async ``wrapped`` closure
that funnels the call through :meth:`RedisCircuitBreaker.execute` so it
is guarded, queued, or falls back as appropriate; non-callable
attributes are returned unchanged. Because this dunder only fires for
attributes not found normally, ``self._breaker`` itself is resolved
without recursion.
Invoked implicitly by Python on every ``resilient_redis.<name>`` access,
e.g. ``resilient_redis.get(...)`` / ``resilient_redis.hset(...)`` in
``tests/adversarial/test_network_partition.py`` and in any component
handed a ``ResilientRedis`` in place of a raw client.
Args:
name: The attribute or method name being accessed on the proxy.
Returns:
Any: An async wrapper coroutine for callable attributes, or the raw
attribute value for non-callables.
"""
attr = getattr(self._breaker._redis, name)
if callable(attr):
async def wrapped(*args, **kwargs):
"""Forward this call through the breaker's :meth:`RedisCircuitBreaker.execute`.
Closure bound to the outer ``name`` and ``attr``; awaiting it
runs the guarded command (or returns a fallback / queues the
write) and yields the breaker's result.
Args:
*args: Positional arguments for the underlying Redis method.
**kwargs: Keyword arguments for the underlying Redis method.
Returns:
Any: Whatever :meth:`RedisCircuitBreaker.execute` returns.
"""
return await self._breaker.execute(name, attr, *args, **kwargs)
return wrapped
return attr