"""Redis-backed service registry with TTL heartbeats.
Each running instance registers under
``sg:registry:service:{name}:{instance_id}`` with a short TTL
(:data:`SERVICE_TTL`) and periodically refreshes it via
:func:`heartbeat`, so the web dashboard and operators can see which
gateway / inference / agents instances are currently alive. Entries
expire automatically when an instance dies.
"""
import logging
import json
from typing import Dict, Any
logger = logging.getLogger(__name__)
SERVICE_TTL = 15 # seconds
[docs]
async def register_service(redis, service_name: str, instance_id: str, metadata: Dict[str, Any]):
"""Announce a live service instance in the Redis registry with a TTL.
Writes a JSON-encoded metadata blob under
``sg:registry:service:{service_name}:{instance_id}`` and stamps it with the
short :data:`SERVICE_TTL` so the entry self-expires if the instance dies
without deregistering. This is the presence signal the web dashboard and
operators read to see which gateway / inference / agents replicas are
currently alive, and it must be kept fresh by :func:`heartbeat`.
Issues a single Redis ``set`` with ``ex=SERVICE_TTL``; any failure is logged
as ``service_registration_failed`` and swallowed (registration is
best-effort and never blocks boot). Called by
:meth:`core.service_base.StargazerService.boot` (phase 8) with metadata
``{"status": "starting"}``; also exercised by
``tests/core/migration/test_service_registry.py``.
Args:
redis: An async Redis client supporting ``set`` with an ``ex`` TTL.
service_name: Logical service/tier name (e.g. ``"inference"``).
instance_id: Unique id for this replica, used in the registry key.
metadata: JSON-serializable dict describing the instance (e.g. its
current status); stored verbatim as the key's value.
"""
key = f"sg:registry:service:{service_name}:{instance_id}"
value = json.dumps(metadata)
try:
await redis.set(key, value, ex=SERVICE_TTL)
logger.info(f"service_registered name={service_name} id={instance_id}")
except Exception as e:
logger.error(f"service_registration_failed name={service_name} id={instance_id} error={e}")
[docs]
async def heartbeat(redis, service_name: str, instance_id: str) -> bool:
"""Refresh an existing registration's TTL to keep the instance marked alive.
Re-arms the expiry on ``sg:registry:service:{service_name}:{instance_id}``
back to :data:`SERVICE_TTL` without rewriting its metadata, so a healthy
instance stays visible in the registry between full re-registrations while a
crashed one lets the key lapse and disappear. Meant to be called on a periodic
timer shorter than the TTL.
Issues a single Redis ``expire``; failures are logged as
``service_heartbeat_failed`` and swallowed. No production caller invokes this
yet -- within the repo it is exercised only by
``tests/core/migration/test_service_registry.py`` -- so the periodic refresh
loop that would drive it is expected to be wired in by a service supervisor.
Args:
redis: An async Redis client supporting ``expire``.
service_name: Logical service/tier name used in the registry key.
instance_id: Unique id of the replica whose TTL is being refreshed.
Returns:
bool: True if the key existed and was successfully refreshed, False otherwise.
"""
key = f"sg:registry:service:{service_name}:{instance_id}"
try:
res = await redis.expire(key, SERVICE_TTL)
return bool(res)
except Exception as e:
logger.error(f"service_heartbeat_failed name={service_name} id={instance_id} error={e}")
return False
[docs]
async def deregister_service(redis, service_name: str, instance_id: str):
"""Remove a service instance from the registry on graceful shutdown.
Deletes ``sg:registry:service:{service_name}:{instance_id}`` so the instance
disappears from operator and dashboard views immediately rather than
lingering until its :data:`SERVICE_TTL` lapses. This is the clean-exit
counterpart to :func:`register_service`; the TTL is only the safety net for
instances that never reach this path (crashes, kills).
Issues a single Redis ``delete``; failures are logged as
``service_deregistration_failed`` and swallowed so teardown is never blocked.
Called by :meth:`core.service_base.StargazerService.shutdown` before the
subclass :meth:`~core.service_base.StargazerService.on_stop` runs; also
exercised by ``tests/core/migration/test_service_registry.py``.
Args:
redis: An async Redis client supporting ``delete``.
service_name: Logical service/tier name used in the registry key.
instance_id: Unique id of the replica to remove.
"""
key = f"sg:registry:service:{service_name}:{instance_id}"
try:
await redis.delete(key)
logger.info(f"service_deregistered name={service_name} id={instance_id}")
except Exception as e:
logger.error(f"service_deregistration_failed name={service_name} id={instance_id} error={e}")