core.resilience module
Redis resilience helpers — circuit breaking with write replay.
RedisCircuitBreaker wraps a Redis client so that, while Redis
is unavailable, reads return safe fallbacks and write mutations
(WRITE_COMMANDS) are queued and replayed on recovery,
smoothing over transient Redis outages without crashing the service.
- class core.resilience.RedisCircuitBreaker(redis_client, failure_threshold=5, recovery_timeout=10.0)[source]
Bases:
objectCircuit breaker around a Redis client that buffers writes during outages.
Tracks consecutive failures and trips from
CLOSEDtoOPENoncefailure_thresholdis reached, after whichexecute()returns neutral fallbacks for reads and queues write commands (those inWRITE_COMMANDS) onto an in-memory re-sync queue instead of touching the failing client. Afterrecovery_timeoutit probes the connection inHALF-OPEN; a successful probe closes the breaker and triggersflush_re_sync_queue()to replay the buffered writes in order, so a transient Redis outage degrades gracefully and self-heals without losing mutations or crashing the service. Wrapped byResilientRedisto form the drop-in client surface; exercised directly by the adversarial network-partition tests (tests/adversarial/test_network_partition.py).- __init__(redis_client, failure_threshold=5, recovery_timeout=10.0)[source]
Initialize a circuit breaker around a Redis client.
Sets the breaker to the
CLOSED(healthy) state with no recorded failures and an empty re-sync queue. The wrappedredis_clientis stored for later use both byexecute()(to run commands while healthy) and byflush_re_sync_queue()(to replay queued writes on recovery). No I/O is performed here; this only captures state.This is called when constructing a
RedisCircuitBreaker, which is then handed toResilientRedis. In the repo it is exercised directly by the adversarial network-partition tests (tests/adversarial/test_network_partition.py); production code obtains the breaker via dependency injection rather than instantiating it here.- Parameters:
redis_client (
Any) – The underlying async Redis client whose methods are guarded and, on failure, queued/replayed.failure_threshold (
int) – Number of consecutive failures that trips the breaker fromCLOSEDtoOPEN.recovery_timeout (
float) – Seconds the breaker staysOPENbefore a command is allowed through inHALF-OPENto probe recovery.
- property state: str
Return the current breaker state.
Exposes the internal
_statemachine value, one of"CLOSED"(healthy, commands pass through),"OPEN"(tripped, reads return fallbacks and writes are queued), or"HALF-OPEN"(probing recovery). Read-only; state transitions happen insideexecute()andflush_re_sync_queue().Read by the adversarial network-partition tests (
tests/adversarial/test_network_partition.py) to assert the breaker tripped or recovered. No internal callers in this module read it.- Returns:
The current state string.
- Return type:
- property failures: int
Return the count of consecutive failures since the last success.
Reflects the internal
_failurescounter, whichexecute()increments on each failed command and resets to zero on success. When it reachesfailure_thresholdthe breaker trips toOPEN.Read by the adversarial network-partition tests (
tests/adversarial/test_network_partition.py) to assert failure accounting. No internal callers in this module read it.- Returns:
The current consecutive-failure count.
- Return type:
- async execute(method_name, func, *args, **kwargs)[source]
Run a guarded Redis command through the circuit breaker.
Drives the breaker’s state machine. While
OPENand still inside the recovery window, write commands (those inWRITE_COMMANDS) are appended to the in-memory re-sync queue and a fake"OK"is returned, while reads short-circuit to_get_read_fallback(); once the recovery timeout elapses the breaker moves toHALF-OPENand lets the next command through to probe Redis. On a successful call it resets the failure counter, and fromHALF-OPENit closes the breaker and schedulesflush_re_sync_queue()as a background task. On a failed call it queues failed writes for later replay, increments the failure counter, and trips toOPENif inHALF-OPENor oncefailure_thresholdconsecutive failures are reached.Interacts with the wrapped client only through the supplied
funccoroutine; it spawnsasyncio.create_taskfor queue flushing, appends toself._re_sync_queue, and emits log records on thestargazer.resiliencelogger. Called byResilientRedis.__getattr__()’swrappedclosure for every attribute access on aResilientRedis, and exercised directly (viaResilientRedis) bytests/adversarial/test_network_partition.- Parameters:
method_name (
str) – Name of the Redis command, used to classify it as a read or write and to pick a read fallback.func (
Callable) – The bound coroutine function to invoke when the breaker permits the call.*args – Positional arguments forwarded to
func(and stored for replay if the write is queued).**kwargs – Keyword arguments forwarded to
func(and stored for replay if the write is queued).
- Returns:
The result of
funcon success;"OK"for a write queued whileOPEN; or a neutral read fallback whileOPEN.- Return type:
- Raises:
Exception – Re-raises whatever
funcraised after recording the failure and (if applicable) queuing the write and tripping the breaker.
- async flush_re_sync_queue()[source]
Replay queued write commands in FIFO order after Redis recovers.
The recovery half of the write-replay scheme: every mutation queued while the breaker was
OPEN(inexecute()) is drained here and re-run against the wrapped Redis client so no buffered write is lost. It snapshots and clearsself._re_sync_queueup front, then for each(method_name, args, kwargs)resolves the method withgetattron the underlying client and awaits it in order.If any replayed command fails, that command is re-inserted at the front of the queue, the breaker is forced back to
OPEN(resetting_last_state_changeand pinning_failuresat the threshold), and the flush stops so the remaining writes are retried on the next recovery rather than reordered or dropped. Progress and outcomes are logged on thestargazer.resiliencelogger.Scheduled as a fire-and-forget
asynciotask byexecute()when aHALF-OPENprobe succeeds and the breaker closes; not called directly elsewhere in this module.- Return type:
- class core.resilience.ResilientRedis(breaker)[source]
Bases:
objectDrop-in async Redis client that funnels every call through a circuit breaker.
A thin proxy holding a
RedisCircuitBreaker: its__getattr__()transparently routes each method access (get,hset,xadd, …) throughRedisCircuitBreaker.execute(), so callers use it exactly like a raw async Redis client while gaining the breaker’s fallback-and-replay protection. Constructed wherever a resilience-guarded Redis handle is wanted; in this repo it is instantiated and driven by the adversarial network-partition tests (tests/adversarial/test_network_partition.py).- Parameters:
breaker (RedisCircuitBreaker)
- __init__(breaker)[source]
Wrap a circuit breaker as a drop-in Redis client.
Stores the
RedisCircuitBreakerso that all attribute access is later routed through it by__getattr__(). No I/O is performed.Constructed wherever a resilience-guarded Redis handle is needed (the breaker plus this wrapper form the public surface); in the repo it is instantiated directly by
tests/adversarial/test_network_partition.pyand passed where a plain async Redis client is expected.- Parameters:
breaker (
RedisCircuitBreaker) – The circuit breaker whoseexecutegates each call and whose wrapped client supplies the actual methods/attributes.
- __getattr__(name)[source]
Proxy attribute access to the wrapped Redis client via the breaker.
Looks
nameup on the breaker’s underlying Redis client. If the resolved attribute is callable, returns an asyncwrappedclosure that funnels the call throughRedisCircuitBreaker.execute()so it is guarded, queued, or falls back as appropriate; non-callable attributes are returned unchanged. Because this dunder only fires for attributes not found normally,self._breakeritself is resolved without recursion.Invoked implicitly by Python on every
resilient_redis.<name>access, e.g.resilient_redis.get(...)/resilient_redis.hset(...)intests/adversarial/test_network_partition.pyand in any component handed aResilientRedisin place of a raw client.