"""Knowledge-graph consolidation, pruning, and relationship decay.
Replaces ``background_agents/memory_consolidation.py``. Runs as a
periodic background task to:
* Merge near-duplicate entities (same category tier only).
* Prune orphan entities with no relationships and low mention counts.
* Decay relationship weights over time (tier-aware).
"""
from __future__ import annotations
import re
import logging
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from knowledge_graph import KnowledgeGraphManager
from openrouter_client import OpenRouterClient
logger = logging.getLogger(__name__)
_SAFE_REL_TYPE = re.compile(r"^[A-Z0-9_]+$")
def _sanitize_rel_type(rel_type: str | None) -> str:
"""Return a safe relationship type for Cypher; fallback to RELATED_TO."""
if not rel_type or not isinstance(rel_type, str):
return "RELATED_TO"
t = rel_type.strip().upper().replace(" ", "_")
if _SAFE_REL_TYPE.match(t) and len(t) <= 64:
return t
return "RELATED_TO"
[docs]
async def consolidate_graph(
kg_manager: KnowledgeGraphManager,
openrouter: OpenRouterClient,
) -> dict[str, Any]:
"""Find and merge duplicate entities, prune orphans.
Only merges entities within the SAME category tier. Core entities
are never pruned.
"""
stats: dict[str, int] = {
"merged": 0, "pruned_entities": 0,
"pruned_rels": 0, "errors": 0,
}
from knowledge_graph import ENTITY_LABELS, CATEGORY_PRIORITY
for label in ENTITY_LABELS:
for category in CATEGORY_PRIORITY:
try:
await _merge_similar_entities(
kg_manager, openrouter, label, category, stats,
)
except Exception:
logger.warning(
"Merge pass failed for %s/%s", label, category,
exc_info=True,
)
stats["errors"] += 1
try:
await _prune_orphans(kg_manager, stats)
except Exception:
logger.debug("Orphan pruning failed", exc_info=True)
stats["errors"] += 1
try:
await _prune_weak_relationships(kg_manager, stats)
except Exception:
logger.debug("Weak relationship pruning failed", exc_info=True)
stats["errors"] += 1
if stats["merged"] or stats["pruned_entities"] or stats["pruned_rels"]:
logger.info(
"Consolidation: merged=%d, pruned_entities=%d, pruned_rels=%d",
stats["merged"], stats["pruned_entities"], stats["pruned_rels"],
)
return stats
async def _merge_similar_entities(
kg_manager: KnowledgeGraphManager,
openrouter: OpenRouterClient,
label: str,
category: str,
stats: dict[str, int],
) -> None:
"""Find entity pairs with high embedding similarity within the same
category and ask the LLM whether to merge them.
"""
q = (
f"MATCH (a:{label} {{category: $cat}}) "
f"MATCH (b:{label} {{category: $cat}}) "
f"WHERE a.name < b.name AND a.scope_id = b.scope_id "
f"RETURN a.name, b.name, a.scope_id, "
f"a.uuid, b.uuid "
f"LIMIT 50"
)
try:
result = await kg_manager._graph.query(
q, params={"cat": category},
)
except Exception:
return
if not result.result_set:
return
pairs: list[tuple[str, str, str, str, str, dict, dict]] = []
embed_texts: list[str] = []
for row in result.result_set:
name_a, name_b, scope_id = row[0], row[1], row[2]
uuid_a, uuid_b = row[3], row[4]
try:
ent_a = await kg_manager.get_entity(
name_a, entity_type=label,
)
ent_b = await kg_manager.get_entity(
name_b, entity_type=label,
)
if not ent_a or not ent_b:
continue
pairs.append((
name_a, name_b, scope_id, uuid_a, uuid_b,
ent_a, ent_b,
))
desc_a = ent_a.get('description', '')
desc_b = ent_b.get('description', '')
embed_texts.append(f"{name_a}: {desc_a}")
embed_texts.append(f"{name_b}: {desc_b}")
except Exception:
logger.debug(
"Entity fetch failed for %s/%s",
name_a, name_b, exc_info=True,
)
stats["errors"] += 1
if not pairs:
return
try:
all_vecs = await kg_manager._embed_batch(embed_texts)
except Exception:
logger.debug(
"Batch embedding for consolidation failed", exc_info=True,
)
return
from utils.cosine import cosine_pair
for i, (name_a, name_b, scope_id, uuid_a, uuid_b, ent_a, ent_b) in enumerate(pairs):
try:
vec_a = all_vecs[i * 2]
vec_b = all_vecs[i * 2 + 1]
sim = cosine_pair(vec_a, vec_b)
if sim < 0.85:
continue
should_merge = await _ask_llm_merge(
openrouter,
name_a, ent_a.get("description", ""),
name_b, ent_b.get("description", ""),
)
if not should_merge:
continue
await _do_merge(
kg_manager, label, category,
scope_id,
name_a, ent_a, uuid_a,
name_b, ent_b, uuid_b,
)
stats["merged"] += 1
except Exception:
logger.debug(
"Merge check failed for %s/%s",
name_a, name_b, exc_info=True,
)
stats["errors"] += 1
async def _ask_llm_merge(
openrouter: OpenRouterClient,
name_a: str, desc_a: str,
name_b: str, desc_b: str,
) -> bool:
"""Internal helper: ask llm merge.
Args:
openrouter (OpenRouterClient): The openrouter value.
name_a (str): The name a value.
desc_a (str): The desc a value.
name_b (str): The name b value.
desc_b (str): The desc b value.
Returns:
bool: True on success, False otherwise.
"""
prompt = (
"Should these two knowledge graph entities"
" be merged into one?\n\n"
f"Entity A: {name_a} — {desc_a}\n"
f"Entity B: {name_b} — {desc_b}\n\n"
f"Answer ONLY 'yes' or 'no'."
)
sys_msg = (
"You help maintain a knowledge graph."
" Answer only 'yes' or 'no'."
)
msgs = [
{"role": "system", "content": sys_msg},
{"role": "user", "content": prompt},
]
try:
raw = await openrouter.chat(msgs)
return raw.strip().lower().startswith("yes")
except Exception:
return False
async def _do_merge(
kg_manager: KnowledgeGraphManager,
label: str,
category: str,
scope_id: str,
name_a: str,
ent_a: dict,
uuid_a: str,
name_b: str,
ent_b: dict,
uuid_b: str,
) -> None:
"""Merge entity_b into entity_a.
Combine descriptions, redirect relationships,
delete B.
"""
desc_a = ent_a.get('description', '')
desc_b = ent_b.get('description', '')
merged_desc = f"{desc_a}; {desc_b}"
mentions_a = ent_a.get("mention_count", 1) or 1
mentions_b = ent_b.get("mention_count", 1) or 1
import time
now = time.time()
q_update = (
"MATCH (a {uuid: $uuid_a}) "
"SET a.description = $desc, "
" a.mention_count = $mc, a.updated_at = $now "
"RETURN a.name"
)
await kg_manager._graph.query(q_update, params={
"uuid_a": uuid_a,
"desc": merged_desc[:500],
"mc": mentions_a + mentions_b, "now": now,
})
# Redirect outgoing: preserve original relationship types
q_get_out = (
"MATCH (b {uuid: $uuid_b})-[r]->(t) "
"RETURN type(r) AS rel_type, t.uuid AS tgt_uuid, "
"r.weight AS weight, r.description AS description, "
"r.category AS category, r.priority AS priority, "
"r.scope_id AS scope_id, r.created_at AS created_at, "
"r.updated_at AS updated_at"
)
try:
out_result = await kg_manager._graph.query(
q_get_out, params={"uuid_b": uuid_b},
)
for row in (out_result.result_set or []):
rel_type = _sanitize_rel_type(row[0])
tgt_uuid = row[1]
weight = row[2] or 0.5
desc = row[3] or ""
cat = row[4] or "general"
pri = row[5] or 40
sid = row[6] or "_"
created = row[7] or now
updated = row[8] or now
q_create = (
f"MATCH (a {{uuid: $uuid_a}}) MATCH (t {{uuid: $tgt_uuid}}) "
f"WHERE NOT (a)-[]->(t) "
f"CREATE (a)-[r2:{rel_type} "
f"{{weight: $w, description: $desc, category: $cat, "
f"priority: $pri, scope_id: $sid, src_uuid: $uuid_a, "
f"tgt_uuid: $tgt_uuid, created_at: $created, updated_at: $updated}}]->(t)"
)
await kg_manager._graph.query(
q_create,
params={
"uuid_a": uuid_a, "tgt_uuid": tgt_uuid,
"w": weight, "desc": desc, "cat": cat,
"pri": pri, "sid": sid, "created": created, "updated": updated,
},
)
except Exception:
logger.warning(
"Redirect outgoing rels failed", exc_info=True,
)
# Redirect incoming: preserve original relationship types
q_get_in = (
"MATCH (s)-[r]->(b {uuid: $uuid_b}) "
"RETURN type(r) AS rel_type, s.uuid AS src_uuid, "
"r.weight AS weight, r.description AS description, "
"r.category AS category, r.priority AS priority, "
"r.scope_id AS scope_id, r.created_at AS created_at, "
"r.updated_at AS updated_at"
)
try:
in_result = await kg_manager._graph.query(
q_get_in, params={"uuid_b": uuid_b},
)
for row in (in_result.result_set or []):
rel_type = _sanitize_rel_type(row[0])
src_uuid = row[1]
weight = row[2] or 0.5
desc = row[3] or ""
cat = row[4] or "general"
pri = row[5] or 40
sid = row[6] or "_"
created = row[7] or now
updated = row[8] or now
q_create = (
f"MATCH (s {{uuid: $src_uuid}}) MATCH (a {{uuid: $uuid_a}}) "
f"WHERE NOT (s)-[]->(a) "
f"CREATE (s)-[r2:{rel_type} "
f"{{weight: $w, description: $desc, category: $cat, "
f"priority: $pri, scope_id: $sid, src_uuid: $src_uuid, "
f"tgt_uuid: $uuid_a, created_at: $created, updated_at: $updated}}]->(a)"
)
await kg_manager._graph.query(
q_create,
params={
"src_uuid": src_uuid, "uuid_a": uuid_a,
"w": weight, "desc": desc, "cat": cat,
"pri": pri, "sid": sid, "created": created, "updated": updated,
},
)
except Exception:
logger.warning(
"Redirect incoming rels failed", exc_info=True,
)
q_delete = (
"MATCH (b {uuid: $uuid_b}) "
"DETACH DELETE b"
)
await kg_manager._graph.query(q_delete, params={
"uuid_b": uuid_b,
})
embed_text = f"{name_a}: {merged_desc[:500]}"
vec = await kg_manager._embed(embed_text)
q_re_embed = (
"MATCH (a {uuid: $uuid_a}) "
"SET a.embedding = vecf32($vec) "
"RETURN a.name"
)
await kg_manager._graph.query(q_re_embed, params={
"uuid_a": uuid_a, "vec": vec,
})
async def _prune_orphans(
kg_manager: KnowledgeGraphManager,
stats: dict[str, int],
) -> None:
"""Remove orphan entities with no relationships,
low mention count, and old.
"""
import time
cutoff = time.time() - 30 * 86400 # 30 days
q = (
"MATCH (e) "
"WHERE e.category <> 'core' "
" AND e.category <> 'basic' "
" AND (e.pinned IS NULL OR e.pinned = false) "
" AND NOT (e)-[]-() "
" AND e.mention_count <= 1 "
" AND e.updated_at < $cutoff "
"DETACH DELETE e "
"RETURN count(e) AS pruned"
)
try:
result = await kg_manager._graph.query(
q, params={"cutoff": cutoff},
)
if result.result_set:
stats["pruned_entities"] += (
result.result_set[0][0]
)
except Exception:
logger.debug(
"Orphan prune query failed", exc_info=True,
)
async def _prune_weak_relationships(
kg_manager: KnowledgeGraphManager,
stats: dict[str, int],
) -> None:
"""Remove relationships with very low weight (except core)."""
q = (
"MATCH ()-[r]->() "
"WHERE r.weight < 0.05 AND r.priority < 100 "
"DELETE r "
"RETURN count(r) AS pruned"
)
try:
result = await kg_manager._graph.query(q)
if result.result_set:
stats["pruned_rels"] += result.result_set[0][0]
except Exception:
logger.debug(
"Weak relationship prune failed", exc_info=True,
)
# -------------------------------------------------------------------
# Relationship decay
# -------------------------------------------------------------------
[docs]
async def decay_relationships(
kg_manager: KnowledgeGraphManager,
decay_factor: float = 0.95,
) -> int:
"""Apply weight decay to all non-core relationships.
Core (priority=100) never decay.
Guild (priority=80) decay at half rate.
All others decay at the full rate.
"""
import math
guild_factor = math.sqrt(decay_factor)
q_others = (
"MATCH ()-[r]->() "
"WHERE r.priority < 80 "
"SET r.weight = r.weight * $factor "
"RETURN count(r)"
)
q_guild = (
"MATCH ()-[r]->() "
"WHERE r.priority >= 80 AND r.priority < 100 "
"SET r.weight = r.weight * $factor "
"RETURN count(r)"
)
q_cleanup = (
"MATCH ()-[r]->() "
"WHERE r.weight < 0.05 AND r.priority < 100 "
"DELETE r "
"RETURN count(r)"
)
total = 0
try:
await kg_manager._graph.query(
q_others, params={"factor": decay_factor},
)
await kg_manager._graph.query(
q_guild, params={"factor": guild_factor},
)
r3 = await kg_manager._graph.query(q_cleanup)
deleted = (
r3.result_set[0][0] if r3.result_set else 0
)
total = deleted
if deleted:
logger.info(
"Relationship decay: removed "
"%d weak relationships",
deleted,
)
except Exception:
logger.debug(
"Relationship decay failed", exc_info=True,
)
return total