Source code for core.service_base

"""Service boot contract shared by every Stargazer microservice.

Defines :class:`StargazerService`, the abstract base that runs a fixed
boot sequence (Redis connect/ping, the subclass's ``on_start``,
service-registry registration, and an HTTP health server via
:class:`~core.health_server.HealthServer`) and a matching graceful
shutdown. Every entry point (gateway / inference / agents /
consolidation / web) subclasses it and implements ``on_start`` and
``run``.
"""

import asyncio
import logging
import os
import random
import socket
import sys
from abc import ABC, abstractmethod
from typing import Optional

from core.health_server import HealthServer
from core.service_registry import (
    SERVICE_TTL,
    deregister_service,
    heartbeat,
    register_service,
)

logger = logging.getLogger(__name__)

[docs] class StargazerService(ABC): """Abstract base class establishing the 9-phase boot contract. Every Stargazer microservice (gateway / inference / agents / consolidation / web) subclasses this to inherit a uniform startup and shutdown lifecycle, so boot ordering -- Redis ping gate, subclass init, service-registry registration, health server -- is identical across the fleet and bugs in one tier cannot diverge from another. Subclasses fill in the three abstract hooks (:meth:`on_start`, :meth:`run`, :meth:`on_stop`) while :meth:`boot` and :meth:`shutdown` orchestrate the fixed sequence around them. Touches Redis (ping, registration via :func:`register_service`, deregistration via :func:`deregister_service`) and an HTTP health endpoint through :class:`~core.health_server.HealthServer`. It is never instantiated directly -- each ``*_main.py`` entry point defines a concrete subclass (``InferenceService``, ``AgentsService``, ``WebService``, ``GatewayService``, ``ConsolidationService``) that calls ``super().__init__`` and is launched from that module's ``main`` under ``asyncio.run``. """
[docs] def __init__(self, service_name: str, instance_id: str, redis_client=None, redis_required: bool = True, use_health_server: bool = True): """Capture service identity and wire up the optional health server. Performs only the cheap, synchronous Phase 1 (local init) portion of the boot contract; the network-touching phases run later in :meth:`boot`. The supplied Redis client is stored for use by :meth:`boot` / :meth:`shutdown` (ping, service-registry registration, deregistration) and by subclasses. When ``use_health_server`` is true and a Redis client is present, a :class:`~core.health_server.HealthServer` is constructed (but not started here) bound to the port from the ``SG_HEALTH_PORT`` environment variable, defaulting to ``9090``; it is started later in :meth:`boot`'s Phase 9. No callers construct ``StargazerService`` directly -- it is abstract and each microservice entry point (gateway / inference / agents / consolidation / web) subclasses it and reaches this via ``super().__init__``. Args: service_name: Logical service name used in boot logs and in the service registry (e.g. ``"inference"``). instance_id: Unique identifier distinguishing this process from other replicas of the same service. redis_client: An async Redis client, or ``None`` to run without Redis (in which case the registry, ping gate, and health server are all skipped). redis_required: When true and the Redis ping in :meth:`boot` fails, the process hard-fails via ``sys.exit(1)``; when false the failure is logged and boot continues. use_health_server: When true (and Redis is present), build a :class:`~core.health_server.HealthServer` to expose HTTP health. """ self.service_name = service_name self.instance_id = instance_id self.redis = redis_client self.redis_required = redis_required health_port = int(os.environ.get("SG_HEALTH_PORT", "9090")) self.health_server: Optional[HealthServer] = None if use_health_server and self.redis: self.health_server = HealthServer(redis=self.redis, port=health_port) # Fleet-wide control-ops daemon (restart / pull) and registry heartbeat; # started in boot() once Redis is verified, cancelled in shutdown(). self._control_ops_task: Optional[asyncio.Task] = None self._heartbeat_task: Optional[asyncio.Task] = None
[docs] async def boot(self): """Run the full 9-phase startup sequence and bring the service live. Drives the service from local init to a registered, health-serving process: it sleeps a random jitter (0-2s) to desynchronize replica starts, pings Redis and hard-fails the process if the ping fails and ``redis_required`` is set, runs the subclass's :meth:`on_start`, registers the instance in the service registry, and starts the health server. The jitter and ping gate exist so a thundering herd of restarts does not stampede Redis and so a service never claims work while its backing store is unreachable. Touches Redis (``ping`` plus a ``set`` of ``sg:registry:service:...`` with status ``"starting"`` via :func:`register_service`), may bind the HTTP health port, and on a required-Redis ping failure calls ``sys.exit(1)``. Called by each microservice's ``main`` (in ``inference_main.py``, ``agents_main.py``, ``consolidation_main.py``, ``web_main.py``, ``gateway_main.py``) immediately before ``service.run()``, and exercised by ``tests/core/migration``. Raises: SystemExit: Via ``sys.exit(1)`` when ``redis_required`` is true and the Redis ping fails. """ logger.info(f"service_boot_started name={self.service_name} id={self.instance_id}") # Phase 1: Local init (done in __init__) # Phase 2: Redis connection with jitter and hard-fail gate if self.redis: jitter = random.uniform(0, 2.0) logger.info(f"service_boot_jitter sleep={jitter:.2f}s") await asyncio.sleep(jitter) try: await self.redis.ping() logger.info("redis_connection_verified") except Exception as e: logger.error(f"redis_connection_failed error={e}") if self.redis_required: logger.critical("hard_fail_gate_triggered redis is required but unavailable") sys.exit(1) # Phase 3-7: Config, Tracing, Modules (simulated in on_start) await self.on_start() # Phase 8: Registration if self.redis: await register_service(self.redis, self.service_name, self.instance_id, {"status": "starting"}) # Phase 8.5: Control-ops daemon + registry heartbeat (all tiers). if self.redis: from core.control_ops import ControlOpsDaemon self._control_ops_task = asyncio.create_task( ControlOpsDaemon(self).run(), name=f"control-ops-{self.service_name}", ) self._heartbeat_task = asyncio.create_task( self._registry_heartbeat_loop(), name=f"registry-heartbeat-{self.service_name}", ) # Phase 9: Health Server if self.health_server: await self.health_server.start() logger.info("service_boot_completed")
async def _registry_heartbeat_loop(self) -> None: """Keep this instance fresh in the service registry. ``register_service`` writes a key with a short TTL; without a periodic refresh the instance vanishes after :data:`SERVICE_TTL` even while alive. This loop transitions the status to "running" with metadata on the first tick, then uses a lightweight TTL-only heartbeat to keep it alive. If the heartbeat indicates the key has vanished (e.g. due to Redis restart or eviction), it performs a full re-registration. """ interval = max(1.0, SERVICE_TTL / 3.0) host = socket.gethostname() pid = os.getpid() metadata = {"status": "running", "host": host, "pid": pid} # Transition status to "running" immediately when the heartbeat loop starts # (initial registration at boot was status "starting") try: await register_service(self.redis, self.service_name, self.instance_id, metadata) except Exception: # Best effort: we'll try again inside the loop logger.debug("initial running registration failed", exc_info=True) while True: try: await asyncio.sleep(interval) refreshed = await heartbeat(self.redis, self.service_name, self.instance_id) if not refreshed: # Key did not exist in Redis, perform full re-registration to restore it await register_service(self.redis, self.service_name, self.instance_id, metadata) except asyncio.CancelledError: raise except Exception: # Best-effort fallback to register_service try: await register_service(self.redis, self.service_name, self.instance_id, metadata) except Exception: logger.debug("registry fallback registration failed", exc_info=True)
[docs] async def shutdown(self): """Tear the service down gracefully in reverse boot order. Counterpart to :meth:`boot`: it stops the HTTP health server (so the instance immediately reports unready), removes the instance from the service registry, and then runs the subclass's :meth:`on_stop` cleanup. Ordering matters -- deregistering and dropping health before :meth:`on_stop` ensures operators and load balancers stop routing to a process that is mid-teardown. Touches Redis by deleting the ``sg:registry:service:...`` key via :func:`deregister_service` and closes the health port. Invoked from the SIGINT/SIGTERM signal handlers installed in each microservice's ``main`` (e.g. ``asyncio.create_task(service.shutdown())`` in ``inference_main.py`` / ``gateway_main.py`` / ``web_main.py`` / ``consolidation_main.py`` / ``agents_main.py``) and asserted by ``tests/core/migration/test_service_base.py``. """ logger.info("service_shutdown_started") # Stop the control-ops daemon + heartbeat first so they cannot act mid-teardown. for task in (self._control_ops_task, self._heartbeat_task): if task is not None and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass except Exception: logger.debug("error awaiting cancelled background task", exc_info=True) if self.health_server: await self.health_server.stop() if self.redis: await deregister_service(self.redis, self.service_name, self.instance_id) await self.on_stop() logger.info("service_shutdown_completed")
[docs] @abstractmethod async def on_start(self): """Subclass hook for tier-specific startup, run during boot. Abstract extension point invoked by :meth:`boot` (its phase 3-7) after the Redis ping succeeds but before registry registration and the health server start, so each tier can stand up its own machinery -- loading tools, building platform adapters, wiring stream consumers, initializing observability -- on a verified Redis connection. Has no behavior here; it must be overridden. Called by :meth:`boot`; the concrete implementations live in the ``*_main.py`` service classes (``InferenceService.on_start``, ``GatewayService.on_start``, etc.) and are exercised directly by the ``tests/core/migration`` suite. """ pass
[docs] @abstractmethod async def run(self): """Subclass hook for the main service loop, run after boot. Abstract extension point that holds the service's long-lived work -- typically consuming a Redis stream or serving HTTP -- and is expected to block until shutdown is signalled. Each tier's ``main`` awaits this immediately after :meth:`boot` returns, so it defines what the process actually *does* for its lifetime. Has no behavior here; it must be overridden. Called by each microservice's ``main`` via ``await service.run()`` (in ``inference_main.py``, ``gateway_main.py``, ``web_main.py``, ``consolidation_main.py``, ``agents_main.py``), right after ``service.boot()``. """ pass
[docs] @abstractmethod async def on_stop(self): """Subclass hook for tier-specific cleanup, run during shutdown. Abstract extension point invoked by :meth:`shutdown` after the health server is stopped and the instance is deregistered, giving each tier a place to release what :meth:`on_start` and :meth:`run` stood up (platform adapters, schedulers, consumers, sockets). Has no behavior here; it must be overridden. Called by :meth:`shutdown`; concrete implementations live in the ``*_main.py`` service classes (e.g. ``WebService.on_stop`` drives a graceful uvicorn exit, ``GatewayService.on_stop`` stops platform adapters) and are covered by ``tests/core/migration``. """ pass