"""Service boot contract shared by every Stargazer microservice.
Defines :class:`StargazerService`, the abstract base that runs a fixed
boot sequence (Redis connect/ping, the subclass's ``on_start``,
service-registry registration, and an HTTP health server via
:class:`~core.health_server.HealthServer`) and a matching graceful
shutdown. Every entry point (gateway / inference / agents /
consolidation / web) subclasses it and implements ``on_start`` and
``run``.
"""
import asyncio
import logging
import os
import random
import socket
import sys
from abc import ABC, abstractmethod
from typing import Optional
from core.health_server import HealthServer
from core.service_registry import (
SERVICE_TTL,
deregister_service,
heartbeat,
register_service,
)
logger = logging.getLogger(__name__)
[docs]
class StargazerService(ABC):
"""Abstract base class establishing the 9-phase boot contract.
Every Stargazer microservice (gateway / inference / agents / consolidation /
web) subclasses this to inherit a uniform startup and shutdown lifecycle, so
boot ordering -- Redis ping gate, subclass init, service-registry
registration, health server -- is identical across the fleet and bugs in one
tier cannot diverge from another. Subclasses fill in the three abstract hooks
(:meth:`on_start`, :meth:`run`, :meth:`on_stop`) while :meth:`boot` and
:meth:`shutdown` orchestrate the fixed sequence around them.
Touches Redis (ping, registration via :func:`register_service`,
deregistration via :func:`deregister_service`) and an HTTP health endpoint
through :class:`~core.health_server.HealthServer`. It is never instantiated
directly -- each ``*_main.py`` entry point defines a concrete subclass
(``InferenceService``, ``AgentsService``, ``WebService``, ``GatewayService``,
``ConsolidationService``) that calls ``super().__init__`` and is launched
from that module's ``main`` under ``asyncio.run``.
"""
[docs]
def __init__(self, service_name: str, instance_id: str, redis_client=None, redis_required: bool = True, use_health_server: bool = True):
"""Capture service identity and wire up the optional health server.
Performs only the cheap, synchronous Phase 1 (local init) portion of the
boot contract; the network-touching phases run later in :meth:`boot`. The
supplied Redis client is stored for use by :meth:`boot` /
:meth:`shutdown` (ping, service-registry registration, deregistration)
and by subclasses. When ``use_health_server`` is true and a Redis client
is present, a :class:`~core.health_server.HealthServer` is constructed
(but not started here) bound to the port from the ``SG_HEALTH_PORT``
environment variable, defaulting to ``9090``; it is started later in
:meth:`boot`'s Phase 9. No callers construct ``StargazerService``
directly -- it is abstract and each microservice entry point (gateway /
inference / agents / consolidation / web) subclasses it and reaches this
via ``super().__init__``.
Args:
service_name: Logical service name used in boot logs and in the
service registry (e.g. ``"inference"``).
instance_id: Unique identifier distinguishing this process from other
replicas of the same service.
redis_client: An async Redis client, or ``None`` to run without Redis
(in which case the registry, ping gate, and health server are all
skipped).
redis_required: When true and the Redis ping in :meth:`boot` fails,
the process hard-fails via ``sys.exit(1)``; when false the failure
is logged and boot continues.
use_health_server: When true (and Redis is present), build a
:class:`~core.health_server.HealthServer` to expose HTTP health.
"""
self.service_name = service_name
self.instance_id = instance_id
self.redis = redis_client
self.redis_required = redis_required
health_port = int(os.environ.get("SG_HEALTH_PORT", "9090"))
self.health_server: Optional[HealthServer] = None
if use_health_server and self.redis:
self.health_server = HealthServer(redis=self.redis, port=health_port)
# Fleet-wide control-ops daemon (restart / pull) and registry heartbeat;
# started in boot() once Redis is verified, cancelled in shutdown().
self._control_ops_task: Optional[asyncio.Task] = None
self._heartbeat_task: Optional[asyncio.Task] = None
[docs]
async def boot(self):
"""Run the full 9-phase startup sequence and bring the service live.
Drives the service from local init to a registered, health-serving
process: it sleeps a random jitter (0-2s) to desynchronize replica
starts, pings Redis and hard-fails the process if the ping fails and
``redis_required`` is set, runs the subclass's :meth:`on_start`,
registers the instance in the service registry, and starts the health
server. The jitter and ping gate exist so a thundering herd of restarts
does not stampede Redis and so a service never claims work while its
backing store is unreachable.
Touches Redis (``ping`` plus a ``set`` of ``sg:registry:service:...``
with status ``"starting"`` via :func:`register_service`), may bind the
HTTP health port, and on a required-Redis ping failure calls
``sys.exit(1)``. Called by each microservice's ``main`` (in
``inference_main.py``, ``agents_main.py``, ``consolidation_main.py``,
``web_main.py``, ``gateway_main.py``) immediately before
``service.run()``, and exercised by ``tests/core/migration``.
Raises:
SystemExit: Via ``sys.exit(1)`` when ``redis_required`` is true and
the Redis ping fails.
"""
logger.info(f"service_boot_started name={self.service_name} id={self.instance_id}")
# Phase 1: Local init (done in __init__)
# Phase 2: Redis connection with jitter and hard-fail gate
if self.redis:
jitter = random.uniform(0, 2.0)
logger.info(f"service_boot_jitter sleep={jitter:.2f}s")
await asyncio.sleep(jitter)
try:
await self.redis.ping()
logger.info("redis_connection_verified")
except Exception as e:
logger.error(f"redis_connection_failed error={e}")
if self.redis_required:
logger.critical("hard_fail_gate_triggered redis is required but unavailable")
sys.exit(1)
# Phase 3-7: Config, Tracing, Modules (simulated in on_start)
await self.on_start()
# Phase 8: Registration
if self.redis:
await register_service(self.redis, self.service_name, self.instance_id, {"status": "starting"})
# Phase 8.5: Control-ops daemon + registry heartbeat (all tiers).
if self.redis:
from core.control_ops import ControlOpsDaemon
self._control_ops_task = asyncio.create_task(
ControlOpsDaemon(self).run(),
name=f"control-ops-{self.service_name}",
)
self._heartbeat_task = asyncio.create_task(
self._registry_heartbeat_loop(),
name=f"registry-heartbeat-{self.service_name}",
)
# Phase 9: Health Server
if self.health_server:
await self.health_server.start()
logger.info("service_boot_completed")
async def _registry_heartbeat_loop(self) -> None:
"""Keep this instance fresh in the service registry.
``register_service`` writes a key with a short TTL; without a periodic
refresh the instance vanishes after :data:`SERVICE_TTL` even while alive.
This loop transitions the status to "running" with metadata on the first
tick, then uses a lightweight TTL-only heartbeat to keep it alive. If the
heartbeat indicates the key has vanished (e.g. due to Redis restart or
eviction), it performs a full re-registration.
"""
interval = max(1.0, SERVICE_TTL / 3.0)
host = socket.gethostname()
pid = os.getpid()
metadata = {"status": "running", "host": host, "pid": pid}
# Transition status to "running" immediately when the heartbeat loop starts
# (initial registration at boot was status "starting")
try:
await register_service(self.redis, self.service_name, self.instance_id, metadata)
except Exception:
# Best effort: we'll try again inside the loop
logger.debug("initial running registration failed", exc_info=True)
while True:
try:
await asyncio.sleep(interval)
refreshed = await heartbeat(self.redis, self.service_name, self.instance_id)
if not refreshed:
# Key did not exist in Redis, perform full re-registration to restore it
await register_service(self.redis, self.service_name, self.instance_id, metadata)
except asyncio.CancelledError:
raise
except Exception:
# Best-effort fallback to register_service
try:
await register_service(self.redis, self.service_name, self.instance_id, metadata)
except Exception:
logger.debug("registry fallback registration failed", exc_info=True)
[docs]
async def shutdown(self):
"""Tear the service down gracefully in reverse boot order.
Counterpart to :meth:`boot`: it stops the HTTP health server (so the
instance immediately reports unready), removes the instance from the
service registry, and then runs the subclass's :meth:`on_stop` cleanup.
Ordering matters -- deregistering and dropping health before
:meth:`on_stop` ensures operators and load balancers stop routing to a
process that is mid-teardown.
Touches Redis by deleting the ``sg:registry:service:...`` key via
:func:`deregister_service` and closes the health port. Invoked from the
SIGINT/SIGTERM signal handlers installed in each microservice's ``main``
(e.g. ``asyncio.create_task(service.shutdown())`` in
``inference_main.py`` / ``gateway_main.py`` / ``web_main.py`` /
``consolidation_main.py`` / ``agents_main.py``) and asserted by
``tests/core/migration/test_service_base.py``.
"""
logger.info("service_shutdown_started")
# Stop the control-ops daemon + heartbeat first so they cannot act mid-teardown.
for task in (self._control_ops_task, self._heartbeat_task):
if task is not None and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception:
logger.debug("error awaiting cancelled background task", exc_info=True)
if self.health_server:
await self.health_server.stop()
if self.redis:
await deregister_service(self.redis, self.service_name, self.instance_id)
await self.on_stop()
logger.info("service_shutdown_completed")
[docs]
@abstractmethod
async def on_start(self):
"""Subclass hook for tier-specific startup, run during boot.
Abstract extension point invoked by :meth:`boot` (its phase 3-7) after
the Redis ping succeeds but before registry registration and the health
server start, so each tier can stand up its own machinery -- loading
tools, building platform adapters, wiring stream consumers, initializing
observability -- on a verified Redis connection. Has no behavior here; it
must be overridden.
Called by :meth:`boot`; the concrete implementations live in the
``*_main.py`` service classes (``InferenceService.on_start``,
``GatewayService.on_start``, etc.) and are exercised directly by the
``tests/core/migration`` suite.
"""
pass
[docs]
@abstractmethod
async def run(self):
"""Subclass hook for the main service loop, run after boot.
Abstract extension point that holds the service's long-lived work --
typically consuming a Redis stream or serving HTTP -- and is expected to
block until shutdown is signalled. Each tier's ``main`` awaits this
immediately after :meth:`boot` returns, so it defines what the process
actually *does* for its lifetime. Has no behavior here; it must be
overridden.
Called by each microservice's ``main`` via ``await service.run()`` (in
``inference_main.py``, ``gateway_main.py``, ``web_main.py``,
``consolidation_main.py``, ``agents_main.py``), right after
``service.boot()``.
"""
pass
[docs]
@abstractmethod
async def on_stop(self):
"""Subclass hook for tier-specific cleanup, run during shutdown.
Abstract extension point invoked by :meth:`shutdown` after the health
server is stopped and the instance is deregistered, giving each tier a
place to release what :meth:`on_start` and :meth:`run` stood up
(platform adapters, schedulers, consumers, sockets). Has no behavior
here; it must be overridden.
Called by :meth:`shutdown`; concrete implementations live in the
``*_main.py`` service classes (e.g. ``WebService.on_stop`` drives a
graceful uvicorn exit, ``GatewayService.on_stop`` stops platform
adapters) and are covered by ``tests/core/migration``.
"""
pass