Source code for kg_consolidation

"""Knowledge-graph consolidation, pruning, and relationship decay.

Replaces ``background_agents/memory_consolidation.py``.  Runs as a
periodic background task to:

* Merge near-duplicate entities (same category tier only).
* Prune orphan entities with no relationships and low mention counts.
* Decay relationship weights over time (tier-aware).
"""

from __future__ import annotations

import re
import logging
from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
    from knowledge_graph import KnowledgeGraphManager
    from openrouter_client import OpenRouterClient

logger = logging.getLogger(__name__)

_SAFE_REL_TYPE = re.compile(r"^[A-Z0-9_]+$")


def _sanitize_rel_type(rel_type: str | None) -> str:
    """Return a safe relationship type for Cypher; fallback to RELATED_TO."""
    if not rel_type or not isinstance(rel_type, str):
        return "RELATED_TO"
    t = rel_type.strip().upper().replace(" ", "_")
    if _SAFE_REL_TYPE.match(t) and len(t) <= 64:
        return t
    return "RELATED_TO"


[docs] async def consolidate_graph( kg_manager: KnowledgeGraphManager, openrouter: OpenRouterClient, ) -> dict[str, Any]: """Find and merge duplicate entities, prune orphans. Only merges entities within the SAME category tier. Core entities are never pruned. """ stats: dict[str, int] = { "merged": 0, "pruned_entities": 0, "pruned_rels": 0, "errors": 0, } from knowledge_graph import ENTITY_LABELS, CATEGORY_PRIORITY for label in ENTITY_LABELS: for category in CATEGORY_PRIORITY: try: await _merge_similar_entities( kg_manager, openrouter, label, category, stats, ) except Exception: logger.warning( "Merge pass failed for %s/%s", label, category, exc_info=True, ) stats["errors"] += 1 try: await _prune_orphans(kg_manager, stats) except Exception: logger.debug("Orphan pruning failed", exc_info=True) stats["errors"] += 1 try: await _prune_weak_relationships(kg_manager, stats) except Exception: logger.debug("Weak relationship pruning failed", exc_info=True) stats["errors"] += 1 if stats["merged"] or stats["pruned_entities"] or stats["pruned_rels"]: logger.info( "Consolidation: merged=%d, pruned_entities=%d, pruned_rels=%d", stats["merged"], stats["pruned_entities"], stats["pruned_rels"], ) return stats
async def _merge_similar_entities( kg_manager: KnowledgeGraphManager, openrouter: OpenRouterClient, label: str, category: str, stats: dict[str, int], ) -> None: """Find entity pairs with high embedding similarity within the same category and ask the LLM whether to merge them. """ q = ( f"MATCH (a:{label} {{category: $cat}}) " f"MATCH (b:{label} {{category: $cat}}) " f"WHERE a.name < b.name AND a.scope_id = b.scope_id " f"RETURN a.name, b.name, a.scope_id, " f"a.uuid, b.uuid " f"LIMIT 50" ) try: result = await kg_manager._graph.query( q, params={"cat": category}, ) except Exception: return if not result.result_set: return pairs: list[tuple[str, str, str, str, str, dict, dict]] = [] embed_texts: list[str] = [] for row in result.result_set: name_a, name_b, scope_id = row[0], row[1], row[2] uuid_a, uuid_b = row[3], row[4] try: ent_a = await kg_manager.get_entity( name_a, entity_type=label, ) ent_b = await kg_manager.get_entity( name_b, entity_type=label, ) if not ent_a or not ent_b: continue pairs.append(( name_a, name_b, scope_id, uuid_a, uuid_b, ent_a, ent_b, )) desc_a = ent_a.get('description', '') desc_b = ent_b.get('description', '') embed_texts.append(f"{name_a}: {desc_a}") embed_texts.append(f"{name_b}: {desc_b}") except Exception: logger.debug( "Entity fetch failed for %s/%s", name_a, name_b, exc_info=True, ) stats["errors"] += 1 if not pairs: return try: all_vecs = await kg_manager._embed_batch(embed_texts) except Exception: logger.debug( "Batch embedding for consolidation failed", exc_info=True, ) return from utils.cosine import cosine_pair for i, (name_a, name_b, scope_id, uuid_a, uuid_b, ent_a, ent_b) in enumerate(pairs): try: vec_a = all_vecs[i * 2] vec_b = all_vecs[i * 2 + 1] sim = cosine_pair(vec_a, vec_b) if sim < 0.85: continue should_merge = await _ask_llm_merge( openrouter, name_a, ent_a.get("description", ""), name_b, ent_b.get("description", ""), ) if not should_merge: continue await _do_merge( kg_manager, label, category, scope_id, name_a, ent_a, uuid_a, name_b, ent_b, uuid_b, ) stats["merged"] += 1 except Exception: logger.debug( "Merge check failed for %s/%s", name_a, name_b, exc_info=True, ) stats["errors"] += 1 async def _ask_llm_merge( openrouter: OpenRouterClient, name_a: str, desc_a: str, name_b: str, desc_b: str, ) -> bool: """Internal helper: ask llm merge. Args: openrouter (OpenRouterClient): The openrouter value. name_a (str): The name a value. desc_a (str): The desc a value. name_b (str): The name b value. desc_b (str): The desc b value. Returns: bool: True on success, False otherwise. """ prompt = ( "Should these two knowledge graph entities" " be merged into one?\n\n" f"Entity A: {name_a}{desc_a}\n" f"Entity B: {name_b}{desc_b}\n\n" f"Answer ONLY 'yes' or 'no'." ) sys_msg = ( "You help maintain a knowledge graph." " Answer only 'yes' or 'no'." ) msgs = [ {"role": "system", "content": sys_msg}, {"role": "user", "content": prompt}, ] try: raw = await openrouter.chat(msgs) return raw.strip().lower().startswith("yes") except Exception: return False async def _do_merge( kg_manager: KnowledgeGraphManager, label: str, category: str, scope_id: str, name_a: str, ent_a: dict, uuid_a: str, name_b: str, ent_b: dict, uuid_b: str, ) -> None: """Merge entity_b into entity_a. Combine descriptions, redirect relationships, delete B. """ desc_a = ent_a.get('description', '') desc_b = ent_b.get('description', '') merged_desc = f"{desc_a}; {desc_b}" mentions_a = ent_a.get("mention_count", 1) or 1 mentions_b = ent_b.get("mention_count", 1) or 1 import time now = time.time() q_update = ( "MATCH (a {uuid: $uuid_a}) " "SET a.description = $desc, " " a.mention_count = $mc, a.updated_at = $now " "RETURN a.name" ) await kg_manager._graph.query(q_update, params={ "uuid_a": uuid_a, "desc": merged_desc[:500], "mc": mentions_a + mentions_b, "now": now, }) # Redirect outgoing: preserve original relationship types q_get_out = ( "MATCH (b {uuid: $uuid_b})-[r]->(t) " "RETURN type(r) AS rel_type, t.uuid AS tgt_uuid, " "r.weight AS weight, r.description AS description, " "r.category AS category, r.priority AS priority, " "r.scope_id AS scope_id, r.created_at AS created_at, " "r.updated_at AS updated_at" ) try: out_result = await kg_manager._graph.query( q_get_out, params={"uuid_b": uuid_b}, ) for row in (out_result.result_set or []): rel_type = _sanitize_rel_type(row[0]) tgt_uuid = row[1] weight = row[2] or 0.5 desc = row[3] or "" cat = row[4] or "general" pri = row[5] or 40 sid = row[6] or "_" created = row[7] or now updated = row[8] or now q_create = ( f"MATCH (a {{uuid: $uuid_a}}) MATCH (t {{uuid: $tgt_uuid}}) " f"WHERE NOT (a)-[]->(t) " f"CREATE (a)-[r2:{rel_type} " f"{{weight: $w, description: $desc, category: $cat, " f"priority: $pri, scope_id: $sid, src_uuid: $uuid_a, " f"tgt_uuid: $tgt_uuid, created_at: $created, updated_at: $updated}}]->(t)" ) await kg_manager._graph.query( q_create, params={ "uuid_a": uuid_a, "tgt_uuid": tgt_uuid, "w": weight, "desc": desc, "cat": cat, "pri": pri, "sid": sid, "created": created, "updated": updated, }, ) except Exception: logger.warning( "Redirect outgoing rels failed", exc_info=True, ) # Redirect incoming: preserve original relationship types q_get_in = ( "MATCH (s)-[r]->(b {uuid: $uuid_b}) " "RETURN type(r) AS rel_type, s.uuid AS src_uuid, " "r.weight AS weight, r.description AS description, " "r.category AS category, r.priority AS priority, " "r.scope_id AS scope_id, r.created_at AS created_at, " "r.updated_at AS updated_at" ) try: in_result = await kg_manager._graph.query( q_get_in, params={"uuid_b": uuid_b}, ) for row in (in_result.result_set or []): rel_type = _sanitize_rel_type(row[0]) src_uuid = row[1] weight = row[2] or 0.5 desc = row[3] or "" cat = row[4] or "general" pri = row[5] or 40 sid = row[6] or "_" created = row[7] or now updated = row[8] or now q_create = ( f"MATCH (s {{uuid: $src_uuid}}) MATCH (a {{uuid: $uuid_a}}) " f"WHERE NOT (s)-[]->(a) " f"CREATE (s)-[r2:{rel_type} " f"{{weight: $w, description: $desc, category: $cat, " f"priority: $pri, scope_id: $sid, src_uuid: $src_uuid, " f"tgt_uuid: $uuid_a, created_at: $created, updated_at: $updated}}]->(a)" ) await kg_manager._graph.query( q_create, params={ "src_uuid": src_uuid, "uuid_a": uuid_a, "w": weight, "desc": desc, "cat": cat, "pri": pri, "sid": sid, "created": created, "updated": updated, }, ) except Exception: logger.warning( "Redirect incoming rels failed", exc_info=True, ) q_delete = ( "MATCH (b {uuid: $uuid_b}) " "DETACH DELETE b" ) await kg_manager._graph.query(q_delete, params={ "uuid_b": uuid_b, }) embed_text = f"{name_a}: {merged_desc[:500]}" vec = await kg_manager._embed(embed_text) q_re_embed = ( "MATCH (a {uuid: $uuid_a}) " "SET a.embedding = vecf32($vec) " "RETURN a.name" ) await kg_manager._graph.query(q_re_embed, params={ "uuid_a": uuid_a, "vec": vec, }) async def _prune_orphans( kg_manager: KnowledgeGraphManager, stats: dict[str, int], ) -> None: """Remove orphan entities with no relationships, low mention count, and old. """ import time cutoff = time.time() - 30 * 86400 # 30 days q = ( "MATCH (e) " "WHERE e.category <> 'core' " " AND e.category <> 'basic' " " AND (e.pinned IS NULL OR e.pinned = false) " " AND NOT (e)-[]-() " " AND e.mention_count <= 1 " " AND e.updated_at < $cutoff " "DETACH DELETE e " "RETURN count(e) AS pruned" ) try: result = await kg_manager._graph.query( q, params={"cutoff": cutoff}, ) if result.result_set: stats["pruned_entities"] += ( result.result_set[0][0] ) except Exception: logger.debug( "Orphan prune query failed", exc_info=True, ) async def _prune_weak_relationships( kg_manager: KnowledgeGraphManager, stats: dict[str, int], ) -> None: """Remove relationships with very low weight (except core).""" q = ( "MATCH ()-[r]->() " "WHERE r.weight < 0.05 AND r.priority < 100 " "DELETE r " "RETURN count(r) AS pruned" ) try: result = await kg_manager._graph.query(q) if result.result_set: stats["pruned_rels"] += result.result_set[0][0] except Exception: logger.debug( "Weak relationship prune failed", exc_info=True, ) # ------------------------------------------------------------------- # Relationship decay # -------------------------------------------------------------------
[docs] async def decay_relationships( kg_manager: KnowledgeGraphManager, decay_factor: float = 0.95, ) -> int: """Apply weight decay to all non-core relationships. Core (priority=100) never decay. Guild (priority=80) decay at half rate. All others decay at the full rate. """ import math guild_factor = math.sqrt(decay_factor) q_others = ( "MATCH ()-[r]->() " "WHERE r.priority < 80 " "SET r.weight = r.weight * $factor " "RETURN count(r)" ) q_guild = ( "MATCH ()-[r]->() " "WHERE r.priority >= 80 AND r.priority < 100 " "SET r.weight = r.weight * $factor " "RETURN count(r)" ) q_cleanup = ( "MATCH ()-[r]->() " "WHERE r.weight < 0.05 AND r.priority < 100 " "DELETE r " "RETURN count(r)" ) total = 0 try: await kg_manager._graph.query( q_others, params={"factor": decay_factor}, ) await kg_manager._graph.query( q_guild, params={"factor": guild_factor}, ) r3 = await kg_manager._graph.query(q_cleanup) deleted = ( r3.result_set[0][0] if r3.result_set else 0 ) total = deleted if deleted: logger.info( "Relationship decay: removed " "%d weak relationships", deleted, ) except Exception: logger.debug( "Relationship decay failed", exc_info=True, ) return total