Source code for core.health_server

"""Liveness / readiness HTTP probes for orchestrators.

:class:`HealthServer` exposes ``/healthz`` (liveness) and ``/readyz``
(readiness) on a local aiohttp server, backed by a cached Redis-ping
loop that flips to unhealthy only after several consecutive failures,
so probe traffic never hammers Redis. Started automatically by
:class:`~core.service_base.StargazerService`.
"""

import asyncio
import logging
from aiohttp import web

logger = logging.getLogger(__name__)

[docs] class HealthServer: """Local HTTP liveness/readiness probe server backed by cached Redis health. Runs a tiny aiohttp server bound to ``127.0.0.1`` that answers ``/healthz`` (liveness) and ``/readyz`` (readiness) for an orchestrator such as Kubernetes or a process supervisor. Rather than pinging Redis on every probe request, a background loop pings periodically and caches a single ``_is_healthy`` flag that only flips to unhealthy after several consecutive failures, so health checks stay cheap and resistant to transient blips while still reflecting a genuine Redis outage. One instance is created per service by :class:`core.service_base.StargazerService` when health probing is enabled, which calls :meth:`start` during service startup and :meth:`stop` on shutdown; the probe handlers and ping logic are also exercised by ``tests/core/migration/test_health_server.py``. """
[docs] def __init__(self, redis, port: int = 9090, ping_interval: float = 5.0, failure_threshold: int = 3): """Initialize the health server's state and aiohttp routing. Stores the Redis client and probe-tuning parameters, seeds the cached health state to healthy, and builds the aiohttp :class:`~aiohttp.web.Application` with ``GET /healthz`` -> :meth:`healthz` and ``GET /readyz`` -> :meth:`readyz` routes plus an :class:`~aiohttp.web.AppRunner`. No socket is bound and no background task is spawned here; that happens in :meth:`start`. This constructor only registers routes against the bound methods and creates the runner; it performs no I/O. The Redis client is retained for the later ping loop (:meth:`_check_health_once`) but is not contacted yet. Called by :class:`core.service_base.StargazerService.__init__`, which instantiates one ``HealthServer`` per service when ``use_health_server`` is set and a Redis client is present (port read from ``SG_HEALTH_PORT``); also constructed directly in the migration tests. Args: redis: An async Redis client (``redis.asyncio``) whose ``ping()`` is polled by the background loop to derive the cached health state. port (int): TCP port to bind the probe server on ``127.0.0.1``. Defaults to ``9090``. ping_interval (float): Seconds to sleep between Redis pings in the background loop. Defaults to ``5.0``. failure_threshold (int): Number of consecutive failed pings required before the cached state flips to unhealthy. Defaults to ``3``. """ self.redis = redis self.port = port self.ping_interval = ping_interval self.failure_threshold = failure_threshold self._is_healthy = True self._consecutive_failures = 0 self._ping_task = None self._app = web.Application() self._app.router.add_get('/healthz', self.healthz) self._app.router.add_get('/readyz', self.readyz) self._runner = web.AppRunner(self._app) self._site = None
[docs] async def healthz(self, request): """Answer the ``GET /healthz`` liveness probe from cached health state. Returns ``200 OK`` while the cached ``_is_healthy`` flag is set and ``503 Service Unavailable`` once it has flipped, signalling the orchestrator that the container is wedged and should be restarted. The flag is maintained out-of-band by the background ping loop, so this handler does no I/O and never touches Redis itself, keeping probes fast and cheap. Registered as the ``GET /healthz`` route in :meth:`__init__` and invoked by aiohttp for each liveness request; also called directly by ``tests/core/migration/test_health_server.py``. Args: request: The incoming aiohttp request (unused; present to satisfy the handler signature). Returns: A 200 ``aiohttp.web.Response`` when healthy, otherwise a 503. """ if self._is_healthy: return web.Response(text="OK", status=200) return web.Response(text="Service Unavailable", status=503)
[docs] async def readyz(self, request): """Answer the ``GET /readyz`` readiness probe from cached health state. Returns ``200 Ready`` while the cached ``_is_healthy`` flag is set and ``503 Service Unavailable`` otherwise, telling the orchestrator whether this instance should receive traffic. Readiness shares the same cached Redis-health signal as liveness here, so a service that cannot reach Redis is pulled out of rotation; like :meth:`healthz` this handler reads only the cached flag and performs no I/O. Registered as the ``GET /readyz`` route in :meth:`__init__` and invoked by aiohttp for each readiness request; also called directly by ``tests/core/migration/test_health_server.py``. Args: request: The incoming aiohttp request (unused; present to satisfy the handler signature). Returns: A 200 ``aiohttp.web.Response`` when ready, otherwise a 503. """ if self._is_healthy: return web.Response(text="Ready", status=200) return web.Response(text="Service Unavailable", status=503)
[docs] async def start(self): """Bind the probe server to localhost and launch the Redis ping loop. Sets up the aiohttp :class:`~aiohttp.web.AppRunner`, binds a :class:`~aiohttp.web.TCPSite` to ``127.0.0.1`` on the configured port (deliberately non-routable from outside the host so the probes are not publicly exposed), and spawns :meth:`_ping_redis_loop` as a background ``asyncio`` task stored on ``self._ping_task``. After this returns the ``/healthz`` and ``/readyz`` endpoints are live and the cached health state begins refreshing on its own. Performs network setup (listening socket) and logs ``health_server_started``. Called by :meth:`core.service_base.StargazerService` during service startup, and by ``tests/core/migration/test_health_server.py``. """ await self._runner.setup() # Bind specifically to 127.0.0.1 (non-routable from outside) self._site = web.TCPSite(self._runner, '127.0.0.1', self.port) await self._site.start() logger.info(f"health_server_started host=127.0.0.1 port={self.port}") self._ping_task = asyncio.create_task(self._ping_redis_loop())
[docs] async def stop(self): """Cancel the ping loop and tear down the aiohttp probe server. Performs an orderly shutdown: cancels the background ``_ping_task`` and awaits it (absorbing the expected :class:`asyncio.CancelledError`), then cleans up the aiohttp runner so the listening socket is released. Safe to call even if :meth:`start` left some piece uninitialised, since the task and runner are guarded before use. Releases the network resources acquired by :meth:`start` and logs ``health_server_stopped``. Called by :meth:`core.service_base.StargazerService` during service shutdown, and by ``tests/core/migration/test_health_server.py``. """ if self._ping_task: self._ping_task.cancel() try: await self._ping_task except asyncio.CancelledError: pass if self._runner: await self._runner.cleanup() logger.info("health_server_stopped")
async def _ping_redis_loop(self): """Run the background loop that refreshes cached health from Redis. Repeatedly calls :meth:`_check_health_once` and then sleeps for ``ping_interval`` seconds, keeping ``self._is_healthy`` current without forcing every probe request to hit Redis. The loop runs until cancelled. Delegates the actual ping and threshold accounting to :meth:`_check_health_once` and uses :func:`asyncio.sleep` to pace itself. On :class:`asyncio.CancelledError` it re-raises so :meth:`stop` can await clean cancellation; any other unexpected exception is logged (``health_server_ping_loop_crashed``) and swallowed, which ends the loop. Called only as the coroutine wrapped in the ``asyncio.create_task`` issued by :meth:`start`; the resulting task is stored as ``self._ping_task``. Raises: asyncio.CancelledError: Propagated when the task is cancelled (by :meth:`stop`) so the awaiting caller observes the cancellation. """ try: while True: await self._check_health_once() await asyncio.sleep(self.ping_interval) except asyncio.CancelledError: raise except Exception as e: logger.error(f"health_server_ping_loop_crashed error={e}") async def _check_health_once(self): """Perform a single Redis ping and update the cached health state. On a successful ``redis.ping()`` it resets the consecutive-failure counter and, if previously unhealthy, flips ``self._is_healthy`` back to ``True`` (logging ``health_server_redis_recovered``). On failure it increments ``self._consecutive_failures``; only once the count reaches ``failure_threshold`` does it mark the service unhealthy (logging ``health_server_redis_failed`` once on the transition), so transient blips do not trip the probes. Awaits ``self.redis.ping()`` and mutates the cached fields ``self._is_healthy`` and ``self._consecutive_failures`` that :meth:`healthz` and :meth:`readyz` read to answer probe requests. Exceptions from the ping are caught internally and never propagate. Called once per iteration of :meth:`_ping_redis_loop`, and invoked directly by the migration tests in ``tests/core/migration/test_health_server.py`` to assert the threshold and recovery behaviour. """ try: await self.redis.ping() self._consecutive_failures = 0 if not self._is_healthy: logger.info("health_server_redis_recovered") self._is_healthy = True except Exception as e: self._consecutive_failures += 1 if self._consecutive_failures >= self.failure_threshold: if self._is_healthy: logger.warning(f"health_server_redis_failed threshold={self.failure_threshold} error={e}") self._is_healthy = False