Source code for core.service_registry

"""Redis-backed service registry with TTL heartbeats.

Each running instance registers under
``sg:registry:service:{name}:{instance_id}`` with a short TTL
(:data:`SERVICE_TTL`) and periodically refreshes it via
:func:`heartbeat`, so the web dashboard and operators can see which
gateway / inference / agents instances are currently alive. Entries
expire automatically when an instance dies.
"""

import logging
import json
from typing import Dict, Any

logger = logging.getLogger(__name__)

SERVICE_TTL = 15  # seconds

[docs] async def register_service(redis, service_name: str, instance_id: str, metadata: Dict[str, Any]): """Announce a live service instance in the Redis registry with a TTL. Writes a JSON-encoded metadata blob under ``sg:registry:service:{service_name}:{instance_id}`` and stamps it with the short :data:`SERVICE_TTL` so the entry self-expires if the instance dies without deregistering. This is the presence signal the web dashboard and operators read to see which gateway / inference / agents replicas are currently alive, and it must be kept fresh by :func:`heartbeat`. Issues a single Redis ``set`` with ``ex=SERVICE_TTL``; any failure is logged as ``service_registration_failed`` and swallowed (registration is best-effort and never blocks boot). Called by :meth:`core.service_base.StargazerService.boot` (phase 8) with metadata ``{"status": "starting"}``; also exercised by ``tests/core/migration/test_service_registry.py``. Args: redis: An async Redis client supporting ``set`` with an ``ex`` TTL. service_name: Logical service/tier name (e.g. ``"inference"``). instance_id: Unique id for this replica, used in the registry key. metadata: JSON-serializable dict describing the instance (e.g. its current status); stored verbatim as the key's value. """ key = f"sg:registry:service:{service_name}:{instance_id}" value = json.dumps(metadata) try: await redis.set(key, value, ex=SERVICE_TTL) logger.info(f"service_registered name={service_name} id={instance_id}") except Exception as e: logger.error(f"service_registration_failed name={service_name} id={instance_id} error={e}")
[docs] async def heartbeat(redis, service_name: str, instance_id: str) -> bool: """Refresh an existing registration's TTL to keep the instance marked alive. Re-arms the expiry on ``sg:registry:service:{service_name}:{instance_id}`` back to :data:`SERVICE_TTL` without rewriting its metadata, so a healthy instance stays visible in the registry between full re-registrations while a crashed one lets the key lapse and disappear. Meant to be called on a periodic timer shorter than the TTL. Issues a single Redis ``expire``; failures are logged as ``service_heartbeat_failed`` and swallowed. No production caller invokes this yet -- within the repo it is exercised only by ``tests/core/migration/test_service_registry.py`` -- so the periodic refresh loop that would drive it is expected to be wired in by a service supervisor. Args: redis: An async Redis client supporting ``expire``. service_name: Logical service/tier name used in the registry key. instance_id: Unique id of the replica whose TTL is being refreshed. Returns: bool: True if the key existed and was successfully refreshed, False otherwise. """ key = f"sg:registry:service:{service_name}:{instance_id}" try: res = await redis.expire(key, SERVICE_TTL) return bool(res) except Exception as e: logger.error(f"service_heartbeat_failed name={service_name} id={instance_id} error={e}") return False
[docs] async def deregister_service(redis, service_name: str, instance_id: str): """Remove a service instance from the registry on graceful shutdown. Deletes ``sg:registry:service:{service_name}:{instance_id}`` so the instance disappears from operator and dashboard views immediately rather than lingering until its :data:`SERVICE_TTL` lapses. This is the clean-exit counterpart to :func:`register_service`; the TTL is only the safety net for instances that never reach this path (crashes, kills). Issues a single Redis ``delete``; failures are logged as ``service_deregistration_failed`` and swallowed so teardown is never blocked. Called by :meth:`core.service_base.StargazerService.shutdown` before the subclass :meth:`~core.service_base.StargazerService.on_stop` runs; also exercised by ``tests/core/migration/test_service_registry.py``. Args: redis: An async Redis client supporting ``delete``. service_name: Logical service/tier name used in the registry key. instance_id: Unique id of the replica to remove. """ key = f"sg:registry:service:{service_name}:{instance_id}" try: await redis.delete(key) logger.info(f"service_deregistered name={service_name} id={instance_id}") except Exception as e: logger.error(f"service_deregistration_failed name={service_name} id={instance_id} error={e}")