"""Liveness / readiness HTTP probes for orchestrators.
:class:`HealthServer` exposes ``/healthz`` (liveness) and ``/readyz``
(readiness) on a local aiohttp server, backed by a cached Redis-ping
loop that flips to unhealthy only after several consecutive failures,
so probe traffic never hammers Redis. Started automatically by
:class:`~core.service_base.StargazerService`.
"""
import asyncio
import logging
from aiohttp import web
logger = logging.getLogger(__name__)
[docs]
class HealthServer:
"""Local HTTP liveness/readiness probe server backed by cached Redis health.
Runs a tiny aiohttp server bound to ``127.0.0.1`` that answers ``/healthz``
(liveness) and ``/readyz`` (readiness) for an orchestrator such as
Kubernetes or a process supervisor. Rather than pinging Redis on every probe
request, a background loop pings periodically and caches a single
``_is_healthy`` flag that only flips to unhealthy after several consecutive
failures, so health checks stay cheap and resistant to transient blips while
still reflecting a genuine Redis outage.
One instance is created per service by
:class:`core.service_base.StargazerService` when health probing is enabled,
which calls :meth:`start` during service startup and :meth:`stop` on
shutdown; the probe handlers and ping logic are also exercised by
``tests/core/migration/test_health_server.py``.
"""
[docs]
def __init__(self, redis, port: int = 9090, ping_interval: float = 5.0, failure_threshold: int = 3):
"""Initialize the health server's state and aiohttp routing.
Stores the Redis client and probe-tuning parameters, seeds the cached
health state to healthy, and builds the aiohttp :class:`~aiohttp.web.Application`
with ``GET /healthz`` -> :meth:`healthz` and ``GET /readyz`` -> :meth:`readyz`
routes plus an :class:`~aiohttp.web.AppRunner`. No socket is bound and no
background task is spawned here; that happens in :meth:`start`.
This constructor only registers routes against the bound methods and
creates the runner; it performs no I/O. The Redis client is retained for
the later ping loop (:meth:`_check_health_once`) but is not contacted yet.
Called by :class:`core.service_base.StargazerService.__init__`, which
instantiates one ``HealthServer`` per service when ``use_health_server``
is set and a Redis client is present (port read from ``SG_HEALTH_PORT``);
also constructed directly in the migration tests.
Args:
redis: An async Redis client (``redis.asyncio``) whose ``ping()`` is
polled by the background loop to derive the cached health state.
port (int): TCP port to bind the probe server on ``127.0.0.1``.
Defaults to ``9090``.
ping_interval (float): Seconds to sleep between Redis pings in the
background loop. Defaults to ``5.0``.
failure_threshold (int): Number of consecutive failed pings required
before the cached state flips to unhealthy. Defaults to ``3``.
"""
self.redis = redis
self.port = port
self.ping_interval = ping_interval
self.failure_threshold = failure_threshold
self._is_healthy = True
self._consecutive_failures = 0
self._ping_task = None
self._app = web.Application()
self._app.router.add_get('/healthz', self.healthz)
self._app.router.add_get('/readyz', self.readyz)
self._runner = web.AppRunner(self._app)
self._site = None
[docs]
async def healthz(self, request):
"""Answer the ``GET /healthz`` liveness probe from cached health state.
Returns ``200 OK`` while the cached ``_is_healthy`` flag is set and
``503 Service Unavailable`` once it has flipped, signalling the
orchestrator that the container is wedged and should be restarted. The
flag is maintained out-of-band by the background ping loop, so this
handler does no I/O and never touches Redis itself, keeping probes fast
and cheap.
Registered as the ``GET /healthz`` route in :meth:`__init__` and invoked
by aiohttp for each liveness request; also called directly by
``tests/core/migration/test_health_server.py``.
Args:
request: The incoming aiohttp request (unused; present to satisfy the
handler signature).
Returns:
A 200 ``aiohttp.web.Response`` when healthy, otherwise a 503.
"""
if self._is_healthy:
return web.Response(text="OK", status=200)
return web.Response(text="Service Unavailable", status=503)
[docs]
async def readyz(self, request):
"""Answer the ``GET /readyz`` readiness probe from cached health state.
Returns ``200 Ready`` while the cached ``_is_healthy`` flag is set and
``503 Service Unavailable`` otherwise, telling the orchestrator whether
this instance should receive traffic. Readiness shares the same cached
Redis-health signal as liveness here, so a service that cannot reach
Redis is pulled out of rotation; like :meth:`healthz` this handler reads
only the cached flag and performs no I/O.
Registered as the ``GET /readyz`` route in :meth:`__init__` and invoked
by aiohttp for each readiness request; also called directly by
``tests/core/migration/test_health_server.py``.
Args:
request: The incoming aiohttp request (unused; present to satisfy the
handler signature).
Returns:
A 200 ``aiohttp.web.Response`` when ready, otherwise a 503.
"""
if self._is_healthy:
return web.Response(text="Ready", status=200)
return web.Response(text="Service Unavailable", status=503)
[docs]
async def start(self):
"""Bind the probe server to localhost and launch the Redis ping loop.
Sets up the aiohttp :class:`~aiohttp.web.AppRunner`, binds a
:class:`~aiohttp.web.TCPSite` to ``127.0.0.1`` on the configured port
(deliberately non-routable from outside the host so the probes are not
publicly exposed), and spawns :meth:`_ping_redis_loop` as a background
``asyncio`` task stored on ``self._ping_task``. After this returns the
``/healthz`` and ``/readyz`` endpoints are live and the cached health
state begins refreshing on its own.
Performs network setup (listening socket) and logs
``health_server_started``. Called by
:meth:`core.service_base.StargazerService` during service startup, and
by ``tests/core/migration/test_health_server.py``.
"""
await self._runner.setup()
# Bind specifically to 127.0.0.1 (non-routable from outside)
self._site = web.TCPSite(self._runner, '127.0.0.1', self.port)
await self._site.start()
logger.info(f"health_server_started host=127.0.0.1 port={self.port}")
self._ping_task = asyncio.create_task(self._ping_redis_loop())
[docs]
async def stop(self):
"""Cancel the ping loop and tear down the aiohttp probe server.
Performs an orderly shutdown: cancels the background ``_ping_task`` and
awaits it (absorbing the expected :class:`asyncio.CancelledError`), then
cleans up the aiohttp runner so the listening socket is released. Safe to
call even if :meth:`start` left some piece uninitialised, since the task
and runner are guarded before use.
Releases the network resources acquired by :meth:`start` and logs
``health_server_stopped``. Called by
:meth:`core.service_base.StargazerService` during service shutdown, and
by ``tests/core/migration/test_health_server.py``.
"""
if self._ping_task:
self._ping_task.cancel()
try:
await self._ping_task
except asyncio.CancelledError:
pass
if self._runner:
await self._runner.cleanup()
logger.info("health_server_stopped")
async def _ping_redis_loop(self):
"""Run the background loop that refreshes cached health from Redis.
Repeatedly calls :meth:`_check_health_once` and then sleeps for
``ping_interval`` seconds, keeping ``self._is_healthy`` current without
forcing every probe request to hit Redis. The loop runs until cancelled.
Delegates the actual ping and threshold accounting to
:meth:`_check_health_once` and uses :func:`asyncio.sleep` to pace itself.
On :class:`asyncio.CancelledError` it re-raises so :meth:`stop` can await
clean cancellation; any other unexpected exception is logged
(``health_server_ping_loop_crashed``) and swallowed, which ends the loop.
Called only as the coroutine wrapped in the ``asyncio.create_task`` issued
by :meth:`start`; the resulting task is stored as ``self._ping_task``.
Raises:
asyncio.CancelledError: Propagated when the task is cancelled (by
:meth:`stop`) so the awaiting caller observes the cancellation.
"""
try:
while True:
await self._check_health_once()
await asyncio.sleep(self.ping_interval)
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(f"health_server_ping_loop_crashed error={e}")
async def _check_health_once(self):
"""Perform a single Redis ping and update the cached health state.
On a successful ``redis.ping()`` it resets the consecutive-failure
counter and, if previously unhealthy, flips ``self._is_healthy`` back to
``True`` (logging ``health_server_redis_recovered``). On failure it
increments ``self._consecutive_failures``; only once the count reaches
``failure_threshold`` does it mark the service unhealthy (logging
``health_server_redis_failed`` once on the transition), so transient
blips do not trip the probes.
Awaits ``self.redis.ping()`` and mutates the cached fields
``self._is_healthy`` and ``self._consecutive_failures`` that
:meth:`healthz` and :meth:`readyz` read to answer probe requests.
Exceptions from the ping are caught internally and never propagate.
Called once per iteration of :meth:`_ping_redis_loop`, and invoked
directly by the migration tests in ``tests/core/migration/test_health_server.py``
to assert the threshold and recovery behaviour.
"""
try:
await self.redis.ping()
self._consecutive_failures = 0
if not self._is_healthy:
logger.info("health_server_redis_recovered")
self._is_healthy = True
except Exception as e:
self._consecutive_failures += 1
if self._consecutive_failures >= self.failure_threshold:
if self._is_healthy:
logger.warning(f"health_server_redis_failed threshold={self.failure_threshold} error={e}")
self._is_healthy = False