"""Consolidation service entry point — knowledge-graph consolidation/decay.
Runs :class:`ConsolidationService`, a minimal
:class:`~core.service_base.StargazerService` whose sole job is to schedule
the two long-horizon knowledge-graph maintenance jobs on a
:class:`~background_tasks.BackgroundScheduler`:
* :func:`~background_tasks.kg_consolidation_task` —
:func:`kg_consolidation.consolidate_graph` (merge/prune duplicate entities).
* :func:`~background_tasks.kg_decay_task` —
:func:`kg_consolidation.decay_relationships` (decay relationship weights and
remove weak edges).
These are split into their own service (separate from
:mod:`agents_main`) so this slow, LLM-heavy work can be scaled, scheduled,
and restarted independently. Launched standalone
(``python consolidation_main.py``) by
``scripts/systemd/stargazer-consolidation.service``.
"""
import asyncio
import logging
from typing import Any
from core.service_base import StargazerService
from config import Config
from message_cache import MessageCache
from knowledge_graph import KnowledgeGraphManager
from openrouter_client import OpenRouterClient
from background_tasks import BackgroundScheduler, kg_consolidation_task, kg_decay_task
logger = logging.getLogger("consolidation_service")
[docs]
class ConsolidationService(StargazerService):
[docs]
def __init__(self, config: Config, redis_client: Any, instance_id: str):
"""Construct the consolidation service and its empty collaborator slots.
Initializes the base :class:`~core.service_base.StargazerService` with
``service_name="consolidation"`` and ``redis_required=True`` (so the boot
sequence hard-fails if Redis is unreachable), stashes the loaded config,
and creates a fresh :class:`~background_tasks.BackgroundScheduler`. The
heavyweight collaborators (``message_cache``, ``kg_manager``,
``openrouter``) are left as ``None`` here and built later in
:meth:`on_start`.
Interactions calls ``super().__init__`` on
:class:`~core.service_base.StargazerService`, which stores ``redis_client``
on ``self.redis`` and, because Redis is present, spins up a
:class:`HealthServer`; it does no Redis I/O itself.
Called by :func:`main` in this module when launching the standalone
service, and by the migration tests
(``tests/core/migration/test_consolidation_service.py``,
``test_observability_init.py``) which instantiate it directly.
Args:
config (Config): Loaded application configuration supplying Redis,
OpenRouter, and embedding settings.
redis_client (Any): Async Redis client (built with
``decode_responses=False``) shared with the base service and the
knowledge-graph manager.
instance_id (str): Unique per-process identity used for service
registration and health reporting.
"""
super().__init__(
service_name="consolidation",
instance_id=instance_id,
redis_client=redis_client,
redis_required=True
)
self.cfg = config
self.message_cache = None
self.kg_manager = None
self.openrouter = None
self.scheduler = BackgroundScheduler()
[docs]
async def on_start(self) -> None:
"""Build collaborators and schedule the KG maintenance jobs.
Implements the subclass startup hook invoked during phase 3-7 of the base
boot sequence. It constructs the :class:`OpenRouterClient` (the LLM used
for entity merging), a :class:`MessageCache` (whose Redis client is then
wired into the observability layer), and a
:class:`KnowledgeGraphManager`, then registers and starts the two daily
background jobs. Redis SSL/connection kwargs are derived from the config
depending on whether sentinels or a plain URL are configured.
Interactions instantiates ``OpenRouterClient`` and ``MessageCache`` from
config values; calls
:func:`observability.set_observability_redis` with the message cache's
Redis client so debug events have a sink; builds ``KnowledgeGraphManager``
on the shared ``self.redis``; registers
:func:`~background_tasks.kg_consolidation_task` (interval 86400s, initial
delay 1800s, given the KG manager and OpenRouter client) and
:func:`~background_tasks.kg_decay_task` (interval 86400s, initial delay
3600s, given the KG manager) on ``self.scheduler``; then calls
``self.scheduler.start()`` to spawn those loops as asyncio tasks. The KG
tasks ultimately drive :func:`kg_consolidation.consolidate_graph` /
:func:`kg_consolidation.decay_relationships`, which read and mutate the
FalkorDB knowledge graph.
Called by :meth:`~core.service_base.StargazerService.boot` (phase 3-7),
which is invoked from :func:`main`; exercised directly by the migration
tests in ``tests/core/migration/``.
Returns:
None
"""
redis_client = self.redis
if self.cfg.redis_sentinels:
ssl_kw = self.cfg.redis_ssl_kwargs()
elif self.cfg.redis_url:
ssl_kw = self.cfg.redis_connection_kwargs_for_url(self.cfg.redis_url)
else:
ssl_kw = {}
self.openrouter = OpenRouterClient(
api_key=self.cfg.openrouter_api_key,
model=self.cfg.model,
temperature=self.cfg.temperature,
max_tokens=self.cfg.max_tokens,
top_p=self.cfg.top_p,
base_url=self.cfg.llm_base_url,
gemini_api_key=self.cfg.gemini_api_key,
)
self.message_cache = MessageCache(
redis_url=self.cfg.redis_url,
openrouter_client=self.openrouter,
embedding_model=self.cfg.embedding_model,
ssl_kwargs=ssl_kw,
redis_sentinels=self.cfg.redis_sentinels,
redis_sentinel_master=self.cfg.redis_sentinel_master,
resilience_kwargs=self.cfg.redis_resilience_kwargs(),
)
from observability import set_observability_redis
set_observability_redis(self.message_cache.redis_client)
self.kg_manager = KnowledgeGraphManager(
redis_client=redis_client,
openrouter=self.openrouter,
embedding_model=self.cfg.embedding_model,
)
self.scheduler.register(
"kg_consolidation",
kg_consolidation_task,
interval=86400,
initial_delay=1800,
args=(self.kg_manager, self.openrouter),
)
self.scheduler.register(
"kg_decay",
kg_decay_task,
interval=86400,
initial_delay=3600,
args=(self.kg_manager,),
)
await self.scheduler.start()
logger.info("ConsolidationService started successfully.")
[docs]
async def run(self) -> None:
"""Block forever so the scheduled jobs keep running.
Implements the abstract main loop of the base service. Unlike the
stream-consuming services, this one has no inbound work of its own once
:meth:`on_start` has launched the scheduler, so it simply awaits a never-set
:class:`asyncio.Event` until the process is signalled, swallowing the
:class:`asyncio.CancelledError` raised at shutdown.
Interactions performs no Redis, LLM, or KG I/O directly; the actual work
happens in the scheduler loops started by :meth:`on_start`.
Called by :func:`main` immediately after :meth:`~core.service_base.StargazerService.boot`
succeeds; the migration tests assert it is invoked once.
Returns:
None
"""
try:
logger.info("ConsolidationService running... Press Ctrl+C to stop.")
await asyncio.Event().wait()
except asyncio.CancelledError:
pass
[docs]
async def on_stop(self) -> None:
"""Cancel scheduled jobs and close the LLM client on shutdown.
Implements the subclass cleanup hook run during graceful shutdown. Stops
the background scheduler (cancelling both KG loops and awaiting them) and
closes the OpenRouter HTTP client to release its connection pool.
Interactions calls :meth:`~background_tasks.BackgroundScheduler.stop` on
``self.scheduler`` and :meth:`OpenRouterClient.close` on
``self.openrouter`` when each is present.
Called by :meth:`~core.service_base.StargazerService.shutdown` (after it
deregisters the service from Redis), which is triggered by the SIGINT/SIGTERM
handlers installed in :func:`main`.
Returns:
None
"""
if self.scheduler:
await self.scheduler.stop()
if self.openrouter:
await self.openrouter.close()
logger.info("ConsolidationService stopped gracefully.")
[docs]
async def main() -> None:
"""Bootstrap and run the consolidation service as a standalone process.
Configures logging, loads the application config, builds the byte-mode async
Redis client, constructs a :class:`ConsolidationService` with a random
instance id, installs SIGINT/SIGTERM handlers that trigger a graceful
shutdown, and then boots and runs the service until cancelled. The Redis
client is always closed in the ``finally`` block.
Interactions calls :meth:`Config.load`, :meth:`Config.build_async_redis_client`
(with ``decode_responses=False``), registers signal handlers that schedule
``service.shutdown()``, and awaits ``service.boot()`` then ``service.run()``;
on exit it calls ``redis_client.aclose()``. The boot/shutdown calls in turn
perform Redis service registration/deregistration and the
:meth:`ConsolidationService.on_start` / :meth:`ConsolidationService.on_stop`
lifecycle.
Called as the module entry point under ``if __name__ == "__main__"`` (run by
``scripts/systemd/stargazer-consolidation.service`` via
``python consolidation_main.py``) and directly by the entrypoint migration
test ``tests/core/migration/test_service_entrypoints.py``.
Returns:
None
"""
import uuid
import signal
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
cfg = Config.load()
instance_id = f"consolidation-{uuid.uuid4().hex[:8]}"
redis_client = cfg.build_async_redis_client(decode_responses=False)
service = ConsolidationService(cfg, redis_client, instance_id)
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(service.shutdown()))
try:
await service.boot()
await service.run()
finally:
if redis_client:
await redis_client.aclose()
if __name__ == "__main__":
asyncio.run(main())