Source code for core.circuit_breaker

"""Generic async circuit breaker.

:class:`CircuitBreaker` trips to ``OPEN`` after ``failure_threshold``
consecutive failures, short-circuiting further calls with
:class:`CircuitBreakerOpenException` until ``reset_timeout`` elapses,
then probes recovery via a ``HALF_OPEN`` trial call. Used to protect
calls to fragile dependencies (e.g. Redis during ingress routing).
"""

import time
import logging
import asyncio
from typing import Callable, Any

logger = logging.getLogger(__name__)

[docs] class CircuitBreakerOpenException(Exception): pass
[docs] class CircuitBreaker:
[docs] def __init__(self, failure_threshold: int = 3, reset_timeout: float = 60.0): """Initialize a closed circuit breaker with failure/reset tuning. Sets up the in-memory state machine used to short-circuit calls to a fragile dependency. The breaker starts in the ``CLOSED`` state with a zeroed failure counter and no recorded last-failure time, so the first protected call always executes. This is a pure in-memory initializer: it performs no I/O, touches no Redis/KG/LLM/HTTP collaborators, and only stores configuration and counters on ``self``. State transitions happen later inside :meth:`call`, :meth:`_record_failure`, and :meth:`_record_success`. It is constructed by ``core.strangler_router`` (and the top-level ``strangler_router`` shim) when wiring ingress routing, and directly in ``tests/core/migration/test_circuit_breaker.py``. Args: failure_threshold (int): Number of consecutive failures, while in the ``CLOSED`` state, that trips the breaker to ``OPEN``. Defaults to 3. reset_timeout (float): Seconds the breaker stays ``OPEN`` before a subsequent :meth:`call` is allowed through as a ``HALF_OPEN`` recovery probe. Defaults to 60.0. """ self.failure_threshold = failure_threshold self.reset_timeout = reset_timeout self.failures = 0 self.state = "CLOSED" self.last_failure_time = 0.0
[docs] async def call(self, func: Callable[..., Any], *args, **kwargs) -> Any: """Invoke an async callable through the breaker, enforcing the circuit state. This is the single entry point that protects a fragile dependency. When the breaker is ``OPEN`` it checks whether :attr:`reset_timeout` has elapsed since the last failure: if so it transitions to ``HALF_OPEN`` (logging ``circuit_breaker_half_open``) and lets one probe call through; otherwise it short-circuits immediately by raising :class:`CircuitBreakerOpenException` without ever awaiting *func*. When a call is allowed, it awaits ``func(*args, **kwargs)``. Any exception is routed through :meth:`_record_failure` (which may trip or re-open the breaker) and then re-raised unchanged; a successful return is routed through :meth:`_record_success` (which may close the breaker and clear the failure count) before the result is returned. The method reads the wall clock via :func:`time.time` and emits structured log lines on the ``stargazer.circuit_breaker`` logger; it performs no Redis, KG, LLM, or HTTP I/O of its own beyond whatever *func* does. It is called by ``core.strangler_router`` / the top-level ``strangler_router`` shim to guard ingress route resolution, and by the circuit-breaker unit tests in ``tests/core/migration/test_circuit_breaker.py``. Args: func (Callable[..., Any]): The async callable to execute under breaker protection. Its ``__name__`` is used only for log labels (falling back to ``"unknown_func"``). *args: Positional arguments forwarded verbatim to *func*. **kwargs: Keyword arguments forwarded verbatim to *func*. Returns: Any: Whatever *func* returns when the call is permitted and succeeds. Raises: CircuitBreakerOpenException: If the breaker is ``OPEN`` and the reset timeout has not yet elapsed, so *func* is never invoked. Exception: Re-raises, unchanged, any exception thrown by *func* after recording the failure. """ func_name = getattr(func, "__name__", "unknown_func") if self.state == "OPEN": if time.time() - self.last_failure_time > self.reset_timeout: self.state = "HALF_OPEN" logger.info(f"circuit_breaker_half_open func={func_name}") else: raise CircuitBreakerOpenException("Circuit breaker is OPEN") try: result = await func(*args, **kwargs) except Exception as e: self._record_failure(func_name) raise e self._record_success(func_name) return result
def _record_failure(self, func_name: str): """Update breaker state after a protected call raised. Stamps :attr:`last_failure_time` with the current :func:`time.time` so the ``OPEN``-to-``HALF_OPEN`` cooldown in :meth:`call` is measured from this moment. A failure observed during a ``HALF_OPEN`` probe immediately re-opens the breaker (logging ``circuit_breaker_open ... failed during HALF_OPEN``). A failure in the ``CLOSED`` state increments :attr:`failures`; once it reaches :attr:`failure_threshold` the breaker trips to ``OPEN`` (logging ``circuit_breaker_open ... failures=N``). This is a private helper invoked only by :meth:`call` on the exception path. It mutates in-memory state and emits warnings on the ``stargazer.circuit_breaker`` logger; it performs no external I/O. Args: func_name (str): Name of the failing callable, used purely for the structured log messages. """ self.last_failure_time = time.time() if self.state == "HALF_OPEN": self.state = "OPEN" logger.warning(f"circuit_breaker_open func={func_name} (failed during HALF_OPEN)") elif self.state == "CLOSED": self.failures += 1 if self.failures >= self.failure_threshold: self.state = "OPEN" logger.warning(f"circuit_breaker_open func={func_name} (failures={self.failures})") def _record_success(self, func_name: str): """Update breaker state after a protected call returned successfully. A success during a ``HALF_OPEN`` probe closes the breaker: the state returns to ``CLOSED``, :attr:`failures` is reset to 0, and ``circuit_breaker_closed`` is logged, signalling that the dependency has recovered. A success in the ``CLOSED`` state with a non-zero failure count simply clears the accumulated :attr:`failures` so transient blips do not creep toward the threshold. This is a private helper invoked only by :meth:`call` on the success path. It mutates in-memory state and may emit an info log on the ``stargazer.circuit_breaker`` logger; it performs no external I/O. Args: func_name (str): Name of the succeeding callable, used purely for the structured log message. """ if self.state == "HALF_OPEN": self.state = "CLOSED" self.failures = 0 logger.info(f"circuit_breaker_closed func={func_name}") elif self.state == "CLOSED" and self.failures > 0: self.failures = 0