kg_consolidation
Knowledge-graph consolidation, pruning, and relationship decay.
Replaces background_agents/memory_consolidation.py. Runs as a
periodic background task to:
Merge near-duplicate entities (same category tier only).
Prune orphan entities with no relationships and low mention counts.
Decay relationship weights over time (tier-aware).
- async kg_consolidation.transaction_safe_entity_merge(graph_client, target_node_id, source_node_id, merged_properties)[source]
Atomically merge one graph entity into another inside a single transaction.
Folds the source node into the target node so that no half-merged state can ever be observed: it updates the target’s properties, re-routes every outgoing relationship from source onto target (preserving relationship type), deletes the now-redundant source node, and commits — rolling the whole thing back if any step fails. The transactional boundary is what makes this safe to run against a live graph.
It opens a FalkorDB/Neo4j transaction via
graph_client.new_transaction(), issues the property-update, relationship-reroute, and delete Cypher queries, then commits or rolls back, logging the outcome either way. This standalone transactional merge is exercised directly bytests/test_consolidation_pipeline.py; it is not currently wired into the live consolidation path (which uses_do_merge()).- Parameters:
graph_client (
Any) – A FalkorDB/Neo4j graph client exposingnew_transaction().target_node_id (
str) –uuidof the surviving node that absorbs the source.source_node_id (
str) –uuidof the node being merged away and deleted.merged_properties (
dict[str,Any]) – Property map to set on the target node.
- Returns:
Trueif the merge committed,Falseif it was rolled back.- Return type:
- async kg_consolidation.merge_entity_descriptions_queued(redis, graph_client, target_node_id, entity_name, existing_desc, new_desc)[source]
Concatenate two entity descriptions, queueing oversized results for the LLM.
Joins existing_desc and new_desc with a separator and returns the combined string. When the result exceeds the 50,000-character limit, it additionally enqueues the entity for out-of-band LLM summarization so descriptions cannot grow without bound during repeated merges; the immediate return value is still the raw concatenation (the queued summary replaces it later, asynchronously).
On overflow it bumps observability counters and pushes a JSON payload onto the
queue:entity:consolidationRedis list viaRPUSH, whichrun_consolidation_daemon()later drains. It is called by_do_merge()while merging two entities, and is exercised directly bytests/test_consolidation_pipeline.py.- Parameters:
redis (
Any) – Async redis-py client used to enqueue oversized descriptions.graph_client (
Any) – Graph client (accepted for symmetry; not used directly here).target_node_id (
str) –uuidof the entity whose description is being combined.entity_name (
str) – Human-readable entity name, used in logs and metrics.existing_desc (
str) – The entity’s current description.new_desc (
str) – The incoming description to fold in.
- Returns:
The combined
"existing | new"description string.- Return type:
- async kg_consolidation.run_consolidation_daemon(redis, graph_client, llm_client, is_active)[source]
Long-running worker that drains the entity-consolidation queue via the LLM.
Runs an endless loop (while is_active returns true) that pops one entity at a time off the
queue:entity:consolidationRedis list, asks the LLM to summarize its over-long description, and writes the summary back to the graph inside a transaction. This is the asynchronous back half of the size-capping scheme started bymerge_entity_descriptions_queued(), keeping bloated descriptions out of the synchronous merge path.Each iteration sleeps briefly to yield the event loop, pops a task with
LPOP, callsllm_client.summarize_description(timed via observability), then commits the new description throughgraph_client.new_transaction(), incrementing success/error counters. Exceptions are caught per-iteration so one bad entity cannot kill the daemon. It is launched as a background task and is exercised bytests/test_consolidation_pipeline.py.- Parameters:
redis (
Any) – Async redis-py client backing the consolidation queue.graph_client (
Any) – Graph client used to persist summarized descriptions.llm_client (
Any) – Client exposingsummarize_descriptionfor the LLM call.is_active (
callable) – Zero-argument predicate; the loop runs while it returns truthy.
- Return type:
- async kg_consolidation.consolidate_graph(kg_manager, openrouter)[source]
Find and merge duplicate entities, prune orphans.
Only merges entities within the SAME category tier. Core entities are never pruned.
Note
The LLM-driven merge path is currently disabled via
_CONSOLIDATION_ENABLED = False. Entity deduplication is handled byknowledge_anchoring.dedup_worker(Phase A + B). Orphan pruning and relationship decay still run normally.- Return type:
- Parameters:
kg_manager (KnowledgeGraphManager)
openrouter (OpenRouterClient)
- async kg_consolidation.decay_relationships(kg_manager, decay_factor=0.95)[source]
Apply weight decay to all non-core relationships.
Core (priority=100) never decay. Guild (priority=80) decay at half rate. All others decay at the full rate.
- Return type:
- Parameters:
kg_manager (KnowledgeGraphManager)
decay_factor (float)