core.resilience module

Redis resilience helpers — circuit breaking with write replay.

RedisCircuitBreaker wraps a Redis client so that, while Redis is unavailable, reads return safe fallbacks and write mutations (WRITE_COMMANDS) are queued and replayed on recovery, smoothing over transient Redis outages without crashing the service.

class core.resilience.RedisCircuitBreaker(redis_client, failure_threshold=5, recovery_timeout=10.0)[source]

Bases: object

Circuit breaker around a Redis client that buffers writes during outages.

Tracks consecutive failures and trips from CLOSED to OPEN once failure_threshold is reached, after which execute() returns neutral fallbacks for reads and queues write commands (those in WRITE_COMMANDS) onto an in-memory re-sync queue instead of touching the failing client. After recovery_timeout it probes the connection in HALF-OPEN; a successful probe closes the breaker and triggers flush_re_sync_queue() to replay the buffered writes in order, so a transient Redis outage degrades gracefully and self-heals without losing mutations or crashing the service. Wrapped by ResilientRedis to form the drop-in client surface; exercised directly by the adversarial network-partition tests (tests/adversarial/test_network_partition.py).

Parameters:
  • redis_client (Any)

  • failure_threshold (int)

  • recovery_timeout (float)

__init__(redis_client, failure_threshold=5, recovery_timeout=10.0)[source]

Initialize a circuit breaker around a Redis client.

Sets the breaker to the CLOSED (healthy) state with no recorded failures and an empty re-sync queue. The wrapped redis_client is stored for later use both by execute() (to run commands while healthy) and by flush_re_sync_queue() (to replay queued writes on recovery). No I/O is performed here; this only captures state.

This is called when constructing a RedisCircuitBreaker, which is then handed to ResilientRedis. In the repo it is exercised directly by the adversarial network-partition tests (tests/adversarial/test_network_partition.py); production code obtains the breaker via dependency injection rather than instantiating it here.

Parameters:
  • redis_client (Any) – The underlying async Redis client whose methods are guarded and, on failure, queued/replayed.

  • failure_threshold (int) – Number of consecutive failures that trips the breaker from CLOSED to OPEN.

  • recovery_timeout (float) – Seconds the breaker stays OPEN before a command is allowed through in HALF-OPEN to probe recovery.

property state: str

Return the current breaker state.

Exposes the internal _state machine value, one of "CLOSED" (healthy, commands pass through), "OPEN" (tripped, reads return fallbacks and writes are queued), or "HALF-OPEN" (probing recovery). Read-only; state transitions happen inside execute() and flush_re_sync_queue().

Read by the adversarial network-partition tests (tests/adversarial/test_network_partition.py) to assert the breaker tripped or recovered. No internal callers in this module read it.

Returns:

The current state string.

Return type:

str

property failures: int

Return the count of consecutive failures since the last success.

Reflects the internal _failures counter, which execute() increments on each failed command and resets to zero on success. When it reaches failure_threshold the breaker trips to OPEN.

Read by the adversarial network-partition tests (tests/adversarial/test_network_partition.py) to assert failure accounting. No internal callers in this module read it.

Returns:

The current consecutive-failure count.

Return type:

int

async execute(method_name, func, *args, **kwargs)[source]

Run a guarded Redis command through the circuit breaker.

Drives the breaker’s state machine. While OPEN and still inside the recovery window, write commands (those in WRITE_COMMANDS) are appended to the in-memory re-sync queue and a fake "OK" is returned, while reads short-circuit to _get_read_fallback(); once the recovery timeout elapses the breaker moves to HALF-OPEN and lets the next command through to probe Redis. On a successful call it resets the failure counter, and from HALF-OPEN it closes the breaker and schedules flush_re_sync_queue() as a background task. On a failed call it queues failed writes for later replay, increments the failure counter, and trips to OPEN if in HALF-OPEN or once failure_threshold consecutive failures are reached.

Interacts with the wrapped client only through the supplied func coroutine; it spawns asyncio.create_task for queue flushing, appends to self._re_sync_queue, and emits log records on the stargazer.resilience logger. Called by ResilientRedis.__getattr__()’s wrapped closure for every attribute access on a ResilientRedis, and exercised directly (via ResilientRedis) by tests/adversarial/test_network_partition.

Parameters:
  • method_name (str) – Name of the Redis command, used to classify it as a read or write and to pick a read fallback.

  • func (Callable) – The bound coroutine function to invoke when the breaker permits the call.

  • *args – Positional arguments forwarded to func (and stored for replay if the write is queued).

  • **kwargs – Keyword arguments forwarded to func (and stored for replay if the write is queued).

Returns:

The result of func on success; "OK" for a write queued while OPEN; or a neutral read fallback while OPEN.

Return type:

Any

Raises:

Exception – Re-raises whatever func raised after recording the failure and (if applicable) queuing the write and tripping the breaker.

async flush_re_sync_queue()[source]

Replay queued write commands in FIFO order after Redis recovers.

The recovery half of the write-replay scheme: every mutation queued while the breaker was OPEN (in execute()) is drained here and re-run against the wrapped Redis client so no buffered write is lost. It snapshots and clears self._re_sync_queue up front, then for each (method_name, args, kwargs) resolves the method with getattr on the underlying client and awaits it in order.

If any replayed command fails, that command is re-inserted at the front of the queue, the breaker is forced back to OPEN (resetting _last_state_change and pinning _failures at the threshold), and the flush stops so the remaining writes are retried on the next recovery rather than reordered or dropped. Progress and outcomes are logged on the stargazer.resilience logger.

Scheduled as a fire-and-forget asyncio task by execute() when a HALF-OPEN probe succeeds and the breaker closes; not called directly elsewhere in this module.

Return type:

None

class core.resilience.ResilientRedis(breaker)[source]

Bases: object

Drop-in async Redis client that funnels every call through a circuit breaker.

A thin proxy holding a RedisCircuitBreaker: its __getattr__() transparently routes each method access (get, hset, xadd, …) through RedisCircuitBreaker.execute(), so callers use it exactly like a raw async Redis client while gaining the breaker’s fallback-and-replay protection. Constructed wherever a resilience-guarded Redis handle is wanted; in this repo it is instantiated and driven by the adversarial network-partition tests (tests/adversarial/test_network_partition.py).

Parameters:

breaker (RedisCircuitBreaker)

__init__(breaker)[source]

Wrap a circuit breaker as a drop-in Redis client.

Stores the RedisCircuitBreaker so that all attribute access is later routed through it by __getattr__(). No I/O is performed.

Constructed wherever a resilience-guarded Redis handle is needed (the breaker plus this wrapper form the public surface); in the repo it is instantiated directly by tests/adversarial/test_network_partition.py and passed where a plain async Redis client is expected.

Parameters:

breaker (RedisCircuitBreaker) – The circuit breaker whose execute gates each call and whose wrapped client supplies the actual methods/attributes.

__getattr__(name)[source]

Proxy attribute access to the wrapped Redis client via the breaker.

Looks name up on the breaker’s underlying Redis client. If the resolved attribute is callable, returns an async wrapped closure that funnels the call through RedisCircuitBreaker.execute() so it is guarded, queued, or falls back as appropriate; non-callable attributes are returned unchanged. Because this dunder only fires for attributes not found normally, self._breaker itself is resolved without recursion.

Invoked implicitly by Python on every resilient_redis.<name> access, e.g. resilient_redis.get(...) / resilient_redis.hset(...) in tests/adversarial/test_network_partition.py and in any component handed a ResilientRedis in place of a raw client.

Parameters:

name (str) – The attribute or method name being accessed on the proxy.

Returns:

An async wrapper coroutine for callable attributes, or the raw attribute value for non-callables.

Return type:

Any