kg_consolidation

Knowledge-graph consolidation, pruning, and relationship decay.

Replaces background_agents/memory_consolidation.py. Runs as a periodic background task to:

  • Merge near-duplicate entities (same category tier only).

  • Prune orphan entities with no relationships and low mention counts.

  • Decay relationship weights over time (tier-aware).

async kg_consolidation.transaction_safe_entity_merge(graph_client, target_node_id, source_node_id, merged_properties)[source]

Atomically merge one graph entity into another inside a single transaction.

Folds the source node into the target node so that no half-merged state can ever be observed: it updates the target’s properties, re-routes every outgoing relationship from source onto target (preserving relationship type), deletes the now-redundant source node, and commits — rolling the whole thing back if any step fails. The transactional boundary is what makes this safe to run against a live graph.

It opens a FalkorDB/Neo4j transaction via graph_client.new_transaction(), issues the property-update, relationship-reroute, and delete Cypher queries, then commits or rolls back, logging the outcome either way. This standalone transactional merge is exercised directly by tests/test_consolidation_pipeline.py; it is not currently wired into the live consolidation path (which uses _do_merge()).

Parameters:
  • graph_client (Any) – A FalkorDB/Neo4j graph client exposing new_transaction().

  • target_node_id (str) – uuid of the surviving node that absorbs the source.

  • source_node_id (str) – uuid of the node being merged away and deleted.

  • merged_properties (dict[str, Any]) – Property map to set on the target node.

Returns:

True if the merge committed, False if it was rolled back.

Return type:

bool

async kg_consolidation.merge_entity_descriptions_queued(redis, graph_client, target_node_id, entity_name, existing_desc, new_desc)[source]

Concatenate two entity descriptions, queueing oversized results for the LLM.

Joins existing_desc and new_desc with a separator and returns the combined string. When the result exceeds the 50,000-character limit, it additionally enqueues the entity for out-of-band LLM summarization so descriptions cannot grow without bound during repeated merges; the immediate return value is still the raw concatenation (the queued summary replaces it later, asynchronously).

On overflow it bumps observability counters and pushes a JSON payload onto the queue:entity:consolidation Redis list via RPUSH, which run_consolidation_daemon() later drains. It is called by _do_merge() while merging two entities, and is exercised directly by tests/test_consolidation_pipeline.py.

Parameters:
  • redis (Any) – Async redis-py client used to enqueue oversized descriptions.

  • graph_client (Any) – Graph client (accepted for symmetry; not used directly here).

  • target_node_id (str) – uuid of the entity whose description is being combined.

  • entity_name (str) – Human-readable entity name, used in logs and metrics.

  • existing_desc (str) – The entity’s current description.

  • new_desc (str) – The incoming description to fold in.

Returns:

The combined "existing | new" description string.

Return type:

str

async kg_consolidation.run_consolidation_daemon(redis, graph_client, llm_client, is_active)[source]

Long-running worker that drains the entity-consolidation queue via the LLM.

Runs an endless loop (while is_active returns true) that pops one entity at a time off the queue:entity:consolidation Redis list, asks the LLM to summarize its over-long description, and writes the summary back to the graph inside a transaction. This is the asynchronous back half of the size-capping scheme started by merge_entity_descriptions_queued(), keeping bloated descriptions out of the synchronous merge path.

Each iteration sleeps briefly to yield the event loop, pops a task with LPOP, calls llm_client.summarize_description (timed via observability), then commits the new description through graph_client.new_transaction(), incrementing success/error counters. Exceptions are caught per-iteration so one bad entity cannot kill the daemon. It is launched as a background task and is exercised by tests/test_consolidation_pipeline.py.

Parameters:
  • redis (Any) – Async redis-py client backing the consolidation queue.

  • graph_client (Any) – Graph client used to persist summarized descriptions.

  • llm_client (Any) – Client exposing summarize_description for the LLM call.

  • is_active (callable) – Zero-argument predicate; the loop runs while it returns truthy.

Return type:

None

async kg_consolidation.consolidate_graph(kg_manager, openrouter)[source]

Find and merge duplicate entities, prune orphans.

Only merges entities within the SAME category tier. Core entities are never pruned.

Note

The LLM-driven merge path is currently disabled via _CONSOLIDATION_ENABLED = False. Entity deduplication is handled by knowledge_anchoring.dedup_worker (Phase A + B). Orphan pruning and relationship decay still run normally.

Return type:

dict[str, Any]

Parameters:
async kg_consolidation.decay_relationships(kg_manager, decay_factor=0.95)[source]

Apply weight decay to all non-core relationships.

Core (priority=100) never decay. Guild (priority=80) decay at half rate. All others decay at the full rate.

Return type:

int

Parameters: