Source code for consolidation_main

"""Consolidation service entry point — knowledge-graph consolidation/decay.

Runs :class:`ConsolidationService`, a minimal
:class:`~core.service_base.StargazerService` whose sole job is to schedule
the two long-horizon knowledge-graph maintenance jobs on a
:class:`~background_tasks.BackgroundScheduler`:

* :func:`~background_tasks.kg_consolidation_task` —
  :func:`kg_consolidation.consolidate_graph` (merge/prune duplicate entities).
* :func:`~background_tasks.kg_decay_task` —
  :func:`kg_consolidation.decay_relationships` (decay relationship weights and
  remove weak edges).

These are split into their own service (separate from
:mod:`agents_main`) so this slow, LLM-heavy work can be scaled, scheduled,
and restarted independently. Launched standalone
(``python consolidation_main.py``) by
``scripts/systemd/stargazer-consolidation.service``.
"""

import asyncio
import logging
from typing import Any
from core.service_base import StargazerService
from config import Config
from message_cache import MessageCache
from knowledge_graph import KnowledgeGraphManager
from openrouter_client import OpenRouterClient
from background_tasks import BackgroundScheduler, kg_consolidation_task, kg_decay_task

logger = logging.getLogger("consolidation_service")

[docs] class ConsolidationService(StargazerService):
[docs] def __init__(self, config: Config, redis_client: Any, instance_id: str): """Construct the consolidation service and its empty collaborator slots. Initializes the base :class:`~core.service_base.StargazerService` with ``service_name="consolidation"`` and ``redis_required=True`` (so the boot sequence hard-fails if Redis is unreachable), stashes the loaded config, and creates a fresh :class:`~background_tasks.BackgroundScheduler`. The heavyweight collaborators (``message_cache``, ``kg_manager``, ``openrouter``) are left as ``None`` here and built later in :meth:`on_start`. Interactions calls ``super().__init__`` on :class:`~core.service_base.StargazerService`, which stores ``redis_client`` on ``self.redis`` and, because Redis is present, spins up a :class:`HealthServer`; it does no Redis I/O itself. Called by :func:`main` in this module when launching the standalone service, and by the migration tests (``tests/core/migration/test_consolidation_service.py``, ``test_observability_init.py``) which instantiate it directly. Args: config (Config): Loaded application configuration supplying Redis, OpenRouter, and embedding settings. redis_client (Any): Async Redis client (built with ``decode_responses=False``) shared with the base service and the knowledge-graph manager. instance_id (str): Unique per-process identity used for service registration and health reporting. """ super().__init__( service_name="consolidation", instance_id=instance_id, redis_client=redis_client, redis_required=True ) self.cfg = config self.message_cache = None self.kg_manager = None self.openrouter = None self.scheduler = BackgroundScheduler()
[docs] async def on_start(self) -> None: """Build collaborators and schedule the KG maintenance jobs. Implements the subclass startup hook invoked during phase 3-7 of the base boot sequence. It constructs the :class:`OpenRouterClient` (the LLM used for entity merging), a :class:`MessageCache` (whose Redis client is then wired into the observability layer), and a :class:`KnowledgeGraphManager`, then registers and starts the two daily background jobs. Redis SSL/connection kwargs are derived from the config depending on whether sentinels or a plain URL are configured. Interactions instantiates ``OpenRouterClient`` and ``MessageCache`` from config values; calls :func:`observability.set_observability_redis` with the message cache's Redis client so debug events have a sink; builds ``KnowledgeGraphManager`` on the shared ``self.redis``; registers :func:`~background_tasks.kg_consolidation_task` (interval 86400s, initial delay 1800s, given the KG manager and OpenRouter client) and :func:`~background_tasks.kg_decay_task` (interval 86400s, initial delay 3600s, given the KG manager) on ``self.scheduler``; then calls ``self.scheduler.start()`` to spawn those loops as asyncio tasks. The KG tasks ultimately drive :func:`kg_consolidation.consolidate_graph` / :func:`kg_consolidation.decay_relationships`, which read and mutate the FalkorDB knowledge graph. Called by :meth:`~core.service_base.StargazerService.boot` (phase 3-7), which is invoked from :func:`main`; exercised directly by the migration tests in ``tests/core/migration/``. Returns: None """ redis_client = self.redis if self.cfg.redis_sentinels: ssl_kw = self.cfg.redis_ssl_kwargs() elif self.cfg.redis_url: ssl_kw = self.cfg.redis_connection_kwargs_for_url(self.cfg.redis_url) else: ssl_kw = {} self.openrouter = OpenRouterClient( api_key=self.cfg.openrouter_api_key, model=self.cfg.model, temperature=self.cfg.temperature, max_tokens=self.cfg.max_tokens, top_p=self.cfg.top_p, base_url=self.cfg.llm_base_url, gemini_api_key=self.cfg.gemini_api_key, ) self.message_cache = MessageCache( redis_url=self.cfg.redis_url, openrouter_client=self.openrouter, embedding_model=self.cfg.embedding_model, ssl_kwargs=ssl_kw, redis_sentinels=self.cfg.redis_sentinels, redis_sentinel_master=self.cfg.redis_sentinel_master, resilience_kwargs=self.cfg.redis_resilience_kwargs(), ) from observability import set_observability_redis set_observability_redis(self.message_cache.redis_client) self.kg_manager = KnowledgeGraphManager( redis_client=redis_client, openrouter=self.openrouter, embedding_model=self.cfg.embedding_model, ) self.scheduler.register( "kg_consolidation", kg_consolidation_task, interval=86400, initial_delay=1800, args=(self.kg_manager, self.openrouter), ) self.scheduler.register( "kg_decay", kg_decay_task, interval=86400, initial_delay=3600, args=(self.kg_manager,), ) await self.scheduler.start() logger.info("ConsolidationService started successfully.")
[docs] async def run(self) -> None: """Block forever so the scheduled jobs keep running. Implements the abstract main loop of the base service. Unlike the stream-consuming services, this one has no inbound work of its own once :meth:`on_start` has launched the scheduler, so it simply awaits a never-set :class:`asyncio.Event` until the process is signalled, swallowing the :class:`asyncio.CancelledError` raised at shutdown. Interactions performs no Redis, LLM, or KG I/O directly; the actual work happens in the scheduler loops started by :meth:`on_start`. Called by :func:`main` immediately after :meth:`~core.service_base.StargazerService.boot` succeeds; the migration tests assert it is invoked once. Returns: None """ try: logger.info("ConsolidationService running... Press Ctrl+C to stop.") await asyncio.Event().wait() except asyncio.CancelledError: pass
[docs] async def on_stop(self) -> None: """Cancel scheduled jobs and close the LLM client on shutdown. Implements the subclass cleanup hook run during graceful shutdown. Stops the background scheduler (cancelling both KG loops and awaiting them) and closes the OpenRouter HTTP client to release its connection pool. Interactions calls :meth:`~background_tasks.BackgroundScheduler.stop` on ``self.scheduler`` and :meth:`OpenRouterClient.close` on ``self.openrouter`` when each is present. Called by :meth:`~core.service_base.StargazerService.shutdown` (after it deregisters the service from Redis), which is triggered by the SIGINT/SIGTERM handlers installed in :func:`main`. Returns: None """ if self.scheduler: await self.scheduler.stop() if self.openrouter: await self.openrouter.close() logger.info("ConsolidationService stopped gracefully.")
[docs] async def main() -> None: """Bootstrap and run the consolidation service as a standalone process. Configures logging, loads the application config, builds the byte-mode async Redis client, constructs a :class:`ConsolidationService` with a random instance id, installs SIGINT/SIGTERM handlers that trigger a graceful shutdown, and then boots and runs the service until cancelled. The Redis client is always closed in the ``finally`` block. Interactions calls :meth:`Config.load`, :meth:`Config.build_async_redis_client` (with ``decode_responses=False``), registers signal handlers that schedule ``service.shutdown()``, and awaits ``service.boot()`` then ``service.run()``; on exit it calls ``redis_client.aclose()``. The boot/shutdown calls in turn perform Redis service registration/deregistration and the :meth:`ConsolidationService.on_start` / :meth:`ConsolidationService.on_stop` lifecycle. Called as the module entry point under ``if __name__ == "__main__"`` (run by ``scripts/systemd/stargazer-consolidation.service`` via ``python consolidation_main.py``) and directly by the entrypoint migration test ``tests/core/migration/test_service_entrypoints.py``. Returns: None """ import uuid import signal logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" ) cfg = Config.load() instance_id = f"consolidation-{uuid.uuid4().hex[:8]}" redis_client = cfg.build_async_redis_client(decode_responses=False) service = ConsolidationService(cfg, redis_client, instance_id) loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, lambda: asyncio.create_task(service.shutdown())) try: await service.boot() await service.run() finally: if redis_client: await redis_client.aclose()
if __name__ == "__main__": asyncio.run(main())