consolidation_main

Consolidation service entry point — knowledge-graph consolidation/decay.

Runs ConsolidationService, a minimal StargazerService whose sole job is to schedule the two long-horizon knowledge-graph maintenance jobs on a BackgroundScheduler:

These are split into their own service (separate from agents_main) so this slow, LLM-heavy work can be scaled, scheduled, and restarted independently. Launched standalone (python consolidation_main.py) by scripts/systemd/stargazer-consolidation.service.

class consolidation_main.ConsolidationService(config, redis_client, instance_id)[source]

Bases: StargazerService

Parameters:
__init__(config, redis_client, instance_id)[source]

Construct the consolidation service and its empty collaborator slots.

Initializes the base StargazerService with service_name="consolidation" and redis_required=True (so the boot sequence hard-fails if Redis is unreachable), stashes the loaded config, and creates a fresh BackgroundScheduler. The heavyweight collaborators (message_cache, kg_manager, openrouter) are left as None here and built later in on_start().

Interactions calls super().__init__ on StargazerService, which stores redis_client on self.redis and, because Redis is present, spins up a HealthServer; it does no Redis I/O itself.

Called by main() in this module when launching the standalone service, and by the migration tests (tests/core/migration/test_consolidation_service.py, test_observability_init.py) which instantiate it directly.

Parameters:
  • config (Config) – Loaded application configuration supplying Redis, OpenRouter, and embedding settings.

  • redis_client (Any) – Async Redis client (built with decode_responses=False) shared with the base service and the knowledge-graph manager.

  • instance_id (str) – Unique per-process identity used for service registration and health reporting.

async on_start()[source]

Build collaborators and schedule the KG maintenance jobs.

Implements the subclass startup hook invoked during phase 3-7 of the base boot sequence. It constructs the OpenRouterClient (the LLM used for entity merging), a MessageCache (whose Redis client is then wired into the observability layer), and a KnowledgeGraphManager, then registers and starts the two daily background jobs. Redis SSL/connection kwargs are derived from the config depending on whether sentinels or a plain URL are configured.

Interactions instantiates OpenRouterClient and MessageCache from config values; calls observability.set_observability_redis() with the message cache’s Redis client so debug events have a sink; builds KnowledgeGraphManager on the shared self.redis; registers kg_consolidation_task() (interval 86400s, initial delay 1800s, given the KG manager and OpenRouter client) and kg_decay_task() (interval 86400s, initial delay 3600s, given the KG manager) on self.scheduler; then calls self.scheduler.start() to spawn those loops as asyncio tasks. The KG tasks ultimately drive kg_consolidation.consolidate_graph() / kg_consolidation.decay_relationships(), which read and mutate the FalkorDB knowledge graph.

Called by boot() (phase 3-7), which is invoked from main(); exercised directly by the migration tests in tests/core/migration/.

Return type:

None

Returns:

None

async run()[source]

Block forever so the scheduled jobs keep running.

Implements the abstract main loop of the base service. Unlike the stream-consuming services, this one has no inbound work of its own once on_start() has launched the scheduler, so it simply awaits a never-set asyncio.Event until the process is signalled, swallowing the asyncio.CancelledError raised at shutdown.

Interactions performs no Redis, LLM, or KG I/O directly; the actual work happens in the scheduler loops started by on_start().

Called by main() immediately after boot() succeeds; the migration tests assert it is invoked once.

Return type:

None

Returns:

None

async on_stop()[source]

Cancel scheduled jobs and close the LLM client on shutdown.

Implements the subclass cleanup hook run during graceful shutdown. Stops the background scheduler (cancelling both KG loops and awaiting them) and closes the OpenRouter HTTP client to release its connection pool.

Interactions calls stop() on self.scheduler and OpenRouterClient.close() on self.openrouter when each is present.

Called by shutdown() (after it deregisters the service from Redis), which is triggered by the SIGINT/SIGTERM handlers installed in main().

Return type:

None

Returns:

None

async consolidation_main.main()[source]

Bootstrap and run the consolidation service as a standalone process.

Configures logging, loads the application config, builds the byte-mode async Redis client, constructs a ConsolidationService with a random instance id, installs SIGINT/SIGTERM handlers that trigger a graceful shutdown, and then boots and runs the service until cancelled. The Redis client is always closed in the finally block.

Interactions calls Config.load(), Config.build_async_redis_client() (with decode_responses=False), registers signal handlers that schedule service.shutdown(), and awaits service.boot() then service.run(); on exit it calls redis_client.aclose(). The boot/shutdown calls in turn perform Redis service registration/deregistration and the ConsolidationService.on_start() / ConsolidationService.on_stop() lifecycle.

Called as the module entry point under if __name__ == "__main__" (run by scripts/systemd/stargazer-consolidation.service via python consolidation_main.py) and directly by the entrypoint migration test tests/core/migration/test_service_entrypoints.py.

Return type:

None

Returns:

None