consolidation_main
Consolidation service entry point — knowledge-graph consolidation/decay.
Runs ConsolidationService, a minimal
StargazerService whose sole job is to schedule
the two long-horizon knowledge-graph maintenance jobs on a
BackgroundScheduler:
kg_consolidation_task()—kg_consolidation.consolidate_graph()(merge/prune duplicate entities).kg_decay_task()—kg_consolidation.decay_relationships()(decay relationship weights and remove weak edges).
These are split into their own service (separate from
agents_main) so this slow, LLM-heavy work can be scaled, scheduled,
and restarted independently. Launched standalone
(python consolidation_main.py) by
scripts/systemd/stargazer-consolidation.service.
- class consolidation_main.ConsolidationService(config, redis_client, instance_id)[source]
Bases:
StargazerService- __init__(config, redis_client, instance_id)[source]
Construct the consolidation service and its empty collaborator slots.
Initializes the base
StargazerServicewithservice_name="consolidation"andredis_required=True(so the boot sequence hard-fails if Redis is unreachable), stashes the loaded config, and creates a freshBackgroundScheduler. The heavyweight collaborators (message_cache,kg_manager,openrouter) are left asNonehere and built later inon_start().Interactions calls
super().__init__onStargazerService, which storesredis_clientonself.redisand, because Redis is present, spins up aHealthServer; it does no Redis I/O itself.Called by
main()in this module when launching the standalone service, and by the migration tests (tests/core/migration/test_consolidation_service.py,test_observability_init.py) which instantiate it directly.- Parameters:
config (
Config) – Loaded application configuration supplying Redis, OpenRouter, and embedding settings.redis_client (
Any) – Async Redis client (built withdecode_responses=False) shared with the base service and the knowledge-graph manager.instance_id (
str) – Unique per-process identity used for service registration and health reporting.
- async on_start()[source]
Build collaborators and schedule the KG maintenance jobs.
Implements the subclass startup hook invoked during phase 3-7 of the base boot sequence. It constructs the
OpenRouterClient(the LLM used for entity merging), aMessageCache(whose Redis client is then wired into the observability layer), and aKnowledgeGraphManager, then registers and starts the two daily background jobs. Redis SSL/connection kwargs are derived from the config depending on whether sentinels or a plain URL are configured.Interactions instantiates
OpenRouterClientandMessageCachefrom config values; callsobservability.set_observability_redis()with the message cache’s Redis client so debug events have a sink; buildsKnowledgeGraphManageron the sharedself.redis; registerskg_consolidation_task()(interval 86400s, initial delay 1800s, given the KG manager and OpenRouter client) andkg_decay_task()(interval 86400s, initial delay 3600s, given the KG manager) onself.scheduler; then callsself.scheduler.start()to spawn those loops as asyncio tasks. The KG tasks ultimately drivekg_consolidation.consolidate_graph()/kg_consolidation.decay_relationships(), which read and mutate the FalkorDB knowledge graph.Called by
boot()(phase 3-7), which is invoked frommain(); exercised directly by the migration tests intests/core/migration/.- Return type:
- Returns:
None
- async run()[source]
Block forever so the scheduled jobs keep running.
Implements the abstract main loop of the base service. Unlike the stream-consuming services, this one has no inbound work of its own once
on_start()has launched the scheduler, so it simply awaits a never-setasyncio.Eventuntil the process is signalled, swallowing theasyncio.CancelledErrorraised at shutdown.Interactions performs no Redis, LLM, or KG I/O directly; the actual work happens in the scheduler loops started by
on_start().Called by
main()immediately afterboot()succeeds; the migration tests assert it is invoked once.- Return type:
- Returns:
None
- async on_stop()[source]
Cancel scheduled jobs and close the LLM client on shutdown.
Implements the subclass cleanup hook run during graceful shutdown. Stops the background scheduler (cancelling both KG loops and awaiting them) and closes the OpenRouter HTTP client to release its connection pool.
Interactions calls
stop()onself.schedulerandOpenRouterClient.close()onself.openrouterwhen each is present.Called by
shutdown()(after it deregisters the service from Redis), which is triggered by the SIGINT/SIGTERM handlers installed inmain().- Return type:
- Returns:
None
- async consolidation_main.main()[source]
Bootstrap and run the consolidation service as a standalone process.
Configures logging, loads the application config, builds the byte-mode async Redis client, constructs a
ConsolidationServicewith a random instance id, installs SIGINT/SIGTERM handlers that trigger a graceful shutdown, and then boots and runs the service until cancelled. The Redis client is always closed in thefinallyblock.Interactions calls
Config.load(),Config.build_async_redis_client()(withdecode_responses=False), registers signal handlers that scheduleservice.shutdown(), and awaitsservice.boot()thenservice.run(); on exit it callsredis_client.aclose(). The boot/shutdown calls in turn perform Redis service registration/deregistration and theConsolidationService.on_start()/ConsolidationService.on_stop()lifecycle.Called as the module entry point under
if __name__ == "__main__"(run byscripts/systemd/stargazer-consolidation.serviceviapython consolidation_main.py) and directly by the entrypoint migration testtests/core/migration/test_service_entrypoints.py.- Return type:
- Returns:
None