"""Generic async circuit breaker.
:class:`CircuitBreaker` trips to ``OPEN`` after ``failure_threshold``
consecutive failures, short-circuiting further calls with
:class:`CircuitBreakerOpenException` until ``reset_timeout`` elapses,
then probes recovery via a ``HALF_OPEN`` trial call. Used to protect
calls to fragile dependencies (e.g. Redis during ingress routing).
"""
import time
import logging
import asyncio
from typing import Callable, Any
logger = logging.getLogger(__name__)
[docs]
class CircuitBreakerOpenException(Exception):
pass
[docs]
class CircuitBreaker:
[docs]
def __init__(self, failure_threshold: int = 3, reset_timeout: float = 60.0):
"""Initialize a closed circuit breaker with failure/reset tuning.
Sets up the in-memory state machine used to short-circuit calls to a
fragile dependency. The breaker starts in the ``CLOSED`` state with a
zeroed failure counter and no recorded last-failure time, so the first
protected call always executes.
This is a pure in-memory initializer: it performs no I/O, touches no
Redis/KG/LLM/HTTP collaborators, and only stores configuration and
counters on ``self``. State transitions happen later inside
:meth:`call`, :meth:`_record_failure`, and :meth:`_record_success`.
It is constructed by ``core.strangler_router`` (and the top-level
``strangler_router`` shim) when wiring ingress routing, and directly in
``tests/core/migration/test_circuit_breaker.py``.
Args:
failure_threshold (int): Number of consecutive failures, while in
the ``CLOSED`` state, that trips the breaker to ``OPEN``.
Defaults to 3.
reset_timeout (float): Seconds the breaker stays ``OPEN`` before a
subsequent :meth:`call` is allowed through as a ``HALF_OPEN``
recovery probe. Defaults to 60.0.
"""
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failures = 0
self.state = "CLOSED"
self.last_failure_time = 0.0
[docs]
async def call(self, func: Callable[..., Any], *args, **kwargs) -> Any:
"""Invoke an async callable through the breaker, enforcing the circuit state.
This is the single entry point that protects a fragile dependency. When
the breaker is ``OPEN`` it checks whether :attr:`reset_timeout` has
elapsed since the last failure: if so it transitions to ``HALF_OPEN``
(logging ``circuit_breaker_half_open``) and lets one probe call through;
otherwise it short-circuits immediately by raising
:class:`CircuitBreakerOpenException` without ever awaiting *func*.
When a call is allowed, it awaits ``func(*args, **kwargs)``. Any
exception is routed through :meth:`_record_failure` (which may trip or
re-open the breaker) and then re-raised unchanged; a successful return is
routed through :meth:`_record_success` (which may close the breaker and
clear the failure count) before the result is returned. The method reads
the wall clock via :func:`time.time` and emits structured log lines on
the ``stargazer.circuit_breaker`` logger; it performs no Redis, KG, LLM,
or HTTP I/O of its own beyond whatever *func* does.
It is called by ``core.strangler_router`` / the top-level
``strangler_router`` shim to guard ingress route resolution, and by the
circuit-breaker unit tests in
``tests/core/migration/test_circuit_breaker.py``.
Args:
func (Callable[..., Any]): The async callable to execute under
breaker protection. Its ``__name__`` is used only for log
labels (falling back to ``"unknown_func"``).
*args: Positional arguments forwarded verbatim to *func*.
**kwargs: Keyword arguments forwarded verbatim to *func*.
Returns:
Any: Whatever *func* returns when the call is permitted and succeeds.
Raises:
CircuitBreakerOpenException: If the breaker is ``OPEN`` and the
reset timeout has not yet elapsed, so *func* is never invoked.
Exception: Re-raises, unchanged, any exception thrown by *func*
after recording the failure.
"""
func_name = getattr(func, "__name__", "unknown_func")
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "HALF_OPEN"
logger.info(f"circuit_breaker_half_open func={func_name}")
else:
raise CircuitBreakerOpenException("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
except Exception as e:
self._record_failure(func_name)
raise e
self._record_success(func_name)
return result
def _record_failure(self, func_name: str):
"""Update breaker state after a protected call raised.
Stamps :attr:`last_failure_time` with the current :func:`time.time` so
the ``OPEN``-to-``HALF_OPEN`` cooldown in :meth:`call` is measured from
this moment. A failure observed during a ``HALF_OPEN`` probe immediately
re-opens the breaker (logging ``circuit_breaker_open ... failed during
HALF_OPEN``). A failure in the ``CLOSED`` state increments
:attr:`failures`; once it reaches :attr:`failure_threshold` the breaker
trips to ``OPEN`` (logging ``circuit_breaker_open ... failures=N``).
This is a private helper invoked only by :meth:`call` on the exception
path. It mutates in-memory state and emits warnings on the
``stargazer.circuit_breaker`` logger; it performs no external I/O.
Args:
func_name (str): Name of the failing callable, used purely for the
structured log messages.
"""
self.last_failure_time = time.time()
if self.state == "HALF_OPEN":
self.state = "OPEN"
logger.warning(f"circuit_breaker_open func={func_name} (failed during HALF_OPEN)")
elif self.state == "CLOSED":
self.failures += 1
if self.failures >= self.failure_threshold:
self.state = "OPEN"
logger.warning(f"circuit_breaker_open func={func_name} (failures={self.failures})")
def _record_success(self, func_name: str):
"""Update breaker state after a protected call returned successfully.
A success during a ``HALF_OPEN`` probe closes the breaker: the state
returns to ``CLOSED``, :attr:`failures` is reset to 0, and
``circuit_breaker_closed`` is logged, signalling that the dependency has
recovered. A success in the ``CLOSED`` state with a non-zero failure
count simply clears the accumulated :attr:`failures` so transient blips
do not creep toward the threshold.
This is a private helper invoked only by :meth:`call` on the success
path. It mutates in-memory state and may emit an info log on the
``stargazer.circuit_breaker`` logger; it performs no external I/O.
Args:
func_name (str): Name of the succeeding callable, used purely for
the structured log message.
"""
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failures = 0
logger.info(f"circuit_breaker_closed func={func_name}")
elif self.state == "CLOSED" and self.failures > 0:
self.failures = 0