"""Knowledge-graph consolidation, pruning, and relationship decay.
Replaces ``background_agents/memory_consolidation.py``. Runs as a
periodic background task to:
* Merge near-duplicate entities (same category tier only).
* Prune orphan entities with no relationships and low mention counts.
* Decay relationship weights over time (tier-aware).
"""
from __future__ import annotations
import asyncio
import json
import re
import logging
from typing import Any, TYPE_CHECKING
from observability import observability
if TYPE_CHECKING:
from knowledge_graph import KnowledgeGraphManager
from openrouter_client import OpenRouterClient
logger = logging.getLogger(__name__)
[docs]
async def transaction_safe_entity_merge(
graph_client: Any, # FalkorDB / Neo4j Graph client
target_node_id: str,
source_node_id: str,
merged_properties: dict[str, Any],
) -> bool:
"""Atomically merge one graph entity into another inside a single transaction.
Folds the *source* node into the *target* node so that no half-merged state
can ever be observed: it updates the target's properties, re-routes every
outgoing relationship from source onto target (preserving relationship type),
deletes the now-redundant source node, and commits — rolling the whole thing
back if any step fails. The transactional boundary is what makes this safe to
run against a live graph.
It opens a FalkorDB/Neo4j transaction via ``graph_client.new_transaction()``,
issues the property-update, relationship-reroute, and delete Cypher queries,
then commits or rolls back, logging the outcome either way. This standalone
transactional merge is exercised directly by
``tests/test_consolidation_pipeline.py``; it is not currently wired into the
live consolidation path (which uses :func:`_do_merge`).
Args:
graph_client: A FalkorDB/Neo4j graph client exposing ``new_transaction()``.
target_node_id: ``uuid`` of the surviving node that absorbs the source.
source_node_id: ``uuid`` of the node being merged away and deleted.
merged_properties: Property map to set on the target node.
Returns:
bool: ``True`` if the merge committed, ``False`` if it was rolled back.
"""
# 1. Begin FalkorDB query transaction
tx = graph_client.new_transaction()
try:
# 2. Update properties on target node
update_query = """
MATCH (target) WHERE target.uuid = $target_id
SET target += $props
"""
await tx.query(
update_query, {"target_id": target_node_id, "props": merged_properties}
)
# 3. Re-route all relationships from source to target
route_query = """
MATCH (source)-[r]->(other) WHERE source.uuid = $source_id
MERGE (target)-[new_r:TYPE(r)]->(other)
ON CREATE SET new_r += properties(r)
DELETE r
"""
await tx.query(
route_query, {"source_id": source_node_id, "target_id": target_node_id}
)
# 4. Delete the now redundant source node
delete_query = "MATCH (source) WHERE source.uuid = $source_id DELETE source"
await tx.query(delete_query, {"source_id": source_node_id})
# 5. Commit the transaction atomically
await tx.commit()
logger.info(
"Atomically merged node %s into %s successfully.",
source_node_id,
target_node_id,
)
return True
except Exception as e:
logger.error(
"Transaction failed during entity merge! Rolling back. Error: %s", str(e)
)
# 6. Roll back changes on failure to preserve graph ACID safety
await tx.rollback()
return False
[docs]
async def merge_entity_descriptions_queued(
redis: Any,
graph_client: Any,
target_node_id: str,
entity_name: str,
existing_desc: str,
new_desc: str,
) -> str:
"""Concatenate two entity descriptions, queueing oversized results for the LLM.
Joins *existing_desc* and *new_desc* with a separator and returns the combined
string. When the result exceeds the 50,000-character limit, it additionally
enqueues the entity for out-of-band LLM summarization so descriptions cannot
grow without bound during repeated merges; the immediate return value is still
the raw concatenation (the queued summary replaces it later, asynchronously).
On overflow it bumps observability counters and pushes a JSON payload onto the
``queue:entity:consolidation`` Redis list via ``RPUSH``, which
:func:`run_consolidation_daemon` later drains. It is called by :func:`_do_merge`
while merging two entities, and is exercised directly by
``tests/test_consolidation_pipeline.py``.
Args:
redis: Async redis-py client used to enqueue oversized descriptions.
graph_client: Graph client (accepted for symmetry; not used directly here).
target_node_id: ``uuid`` of the entity whose description is being combined.
entity_name: Human-readable entity name, used in logs and metrics.
existing_desc: The entity's current description.
new_desc: The incoming description to fold in.
Returns:
str: The combined ``"existing | new"`` description string.
"""
combined = f"{existing_desc.strip()} | {new_desc.strip()}"
limit = 50000
if len(combined) > limit:
observability.increment("description_limit_exceeded", {"entity": entity_name})
logger.info(
"Entity '%s' (ID: %s) exceeds description limit (%d chars). Queueing for LLM consolidation.",
entity_name,
target_node_id,
len(combined),
)
queue_payload = {
"entity_id": target_node_id,
"entity_name": entity_name,
"char_count": len(combined),
"raw_text": combined,
}
# Push target entity details to dedicated Redis list queue
await redis.rpush("queue:entity:consolidation", json.dumps(queue_payload))
observability.increment("consolidation_queued", {"entity_id": target_node_id})
return combined
[docs]
async def run_consolidation_daemon(
redis: Any, graph_client: Any, llm_client: Any, is_active: callable
) -> None:
"""Long-running worker that drains the entity-consolidation queue via the LLM.
Runs an endless loop (while *is_active* returns true) that pops one entity at a
time off the ``queue:entity:consolidation`` Redis list, asks the LLM to
summarize its over-long description, and writes the summary back to the graph
inside a transaction. This is the asynchronous back half of the size-capping
scheme started by :func:`merge_entity_descriptions_queued`, keeping bloated
descriptions out of the synchronous merge path.
Each iteration sleeps briefly to yield the event loop, pops a task with ``LPOP``, calls
``llm_client.summarize_description`` (timed via observability), then commits the
new description through ``graph_client.new_transaction()``, incrementing
success/error counters. Exceptions are caught per-iteration so one bad entity
cannot kill the daemon. It is launched as a background task and is exercised by
``tests/test_consolidation_pipeline.py``.
Args:
redis: Async redis-py client backing the consolidation queue.
graph_client: Graph client used to persist summarized descriptions.
llm_client: Client exposing ``summarize_description`` for the LLM call.
is_active: Zero-argument predicate; the loop runs while it returns truthy.
"""
logger.info("Entity consolidation background daemon initialized.")
while is_active():
try:
# Yield control back to async event loop
await asyncio.sleep(1.0)
# Dequeue from Redis list
task_raw = await redis.lpop("queue:entity:consolidation")
if not task_raw:
continue
task = json.loads(task_raw)
entity_id = task["entity_id"]
raw_text = task["raw_text"]
with observability.timer("llm_consolidation_duration", subsystem="knowledge_anchoring"):
# Run out-of-path asynchronous LLM summarization
summary = await llm_client.summarize_description(raw_text)
# Write back atomically inside a transaction boundary
tx = graph_client.new_transaction()
query = "MATCH (n) WHERE n.uuid = $id SET n.description = $desc"
await tx.query(query, {"id": entity_id, "desc": summary})
await tx.commit()
observability.increment("consolidation_success", {"entity_id": entity_id})
logger.info("Successfully consolidated entity ID: %s", entity_id)
except Exception as e:
observability.increment(
"consolidation_error", {"error": e.__class__.__name__}
)
logger.error("Failed to consolidate entity: %s", str(e), exc_info=True)
# ---------------------------------------------------------------------------
# Kill-switch: the LLM-driven entity-merge pipeline below
# (consolidate_graph / _merge_similar_entities / _ask_llm_merge / _do_merge)
# has been superseded by knowledge_anchoring.dedup_worker (Phase A + B),
# which uses neighbourhood context, Pydantic-validated JSON output, the
# correct extraction model, and pair-memory via [:DEDUP_EVALUATED] edges.
#
# Set to True to re-enable if the dedup_worker is ever disabled.
# ---------------------------------------------------------------------------
_CONSOLIDATION_ENABLED: bool = False
_SAFE_REL_TYPE = re.compile(r"^[A-Z0-9_]+$")
def _sanitize_rel_type(rel_type: str | None) -> str:
"""Coerce an arbitrary relationship label into a Cypher-injection-safe type.
Relationship types are interpolated directly into Cypher (they cannot be
parameterized), so this normalizes the input — uppercasing and turning spaces
into underscores — and only accepts the result if it matches the strict
``^[A-Z0-9_]+$`` whitelist (:data:`_SAFE_REL_TYPE`) and is at most 64
characters. Anything blank, non-string, or unsafe falls back to
``"RELATED_TO"``, guaranteeing the value can never break out of the query.
This is a pure helper called by :func:`_do_merge` for each relationship it
re-routes between the merged nodes, sanitizing the source relationship type
before it is spliced into the ``CREATE`` clause.
Args:
rel_type: Raw relationship type from the graph, possibly ``None`` or unsafe.
Returns:
str: A whitelist-safe uppercase relationship type, or ``"RELATED_TO"``.
"""
if not rel_type or not isinstance(rel_type, str):
return "RELATED_TO"
t = rel_type.strip().upper().replace(" ", "_")
if _SAFE_REL_TYPE.match(t) and len(t) <= 64:
return t
return "RELATED_TO"
[docs]
async def consolidate_graph(
kg_manager: KnowledgeGraphManager,
openrouter: OpenRouterClient,
) -> dict[str, Any]:
"""Find and merge duplicate entities, prune orphans.
Only merges entities within the SAME category tier. Core entities
are never pruned.
.. note::
The LLM-driven merge path is currently **disabled** via
``_CONSOLIDATION_ENABLED = False``. Entity deduplication is
handled by ``knowledge_anchoring.dedup_worker`` (Phase A + B).
Orphan pruning and relationship decay still run normally.
"""
stats: dict[str, int] = {
"merged": 0,
"pruned_entities": 0,
"pruned_rels": 0,
"errors": 0,
}
if not _CONSOLIDATION_ENABLED:
logger.debug(
"consolidate_graph: LLM merge path disabled "
"(_CONSOLIDATION_ENABLED=False); skipping entity merge passes."
)
try:
from observability import publish_debug_event as _pde
asyncio.create_task(
_pde(
"kg_consolidation",
"kg_consolidation",
phase="consolidation",
status="disabled",
preview="LLM merge path disabled; superseded by dedup_worker",
payload={
"merged": 0,
"pruned_entities": 0,
"pruned_rels": 0,
"error_count": 0,
},
)
)
except Exception:
pass
# Orphan pruning and weak-rel pruning still run below.
try:
await _prune_orphans(kg_manager, stats)
except Exception:
logger.debug("Orphan pruning failed", exc_info=True)
stats["errors"] += 1
try:
await _prune_weak_relationships(kg_manager, stats)
except Exception:
logger.debug("Weak relationship pruning failed", exc_info=True)
stats["errors"] += 1
return stats
from knowledge_graph import ENTITY_LABELS, CATEGORY_PRIORITY
for label in ENTITY_LABELS:
for category in CATEGORY_PRIORITY:
try:
await _merge_similar_entities(
kg_manager,
openrouter,
label,
category,
stats,
)
except Exception:
logger.warning(
"Merge pass failed for %s/%s",
label,
category,
exc_info=True,
)
stats["errors"] += 1
try:
await _prune_orphans(kg_manager, stats)
except Exception:
logger.debug("Orphan pruning failed", exc_info=True)
stats["errors"] += 1
try:
await _prune_weak_relationships(kg_manager, stats)
except Exception:
logger.debug("Weak relationship pruning failed", exc_info=True)
stats["errors"] += 1
if stats["merged"] or stats["pruned_entities"] or stats["pruned_rels"]:
logger.info(
"Consolidation: merged=%d, pruned_entities=%d, pruned_rels=%d",
stats["merged"],
stats["pruned_entities"],
stats["pruned_rels"],
)
# Emit kg_consolidation debug event (fire-and-forget)
try:
from observability import publish_debug_event as _pde
asyncio.create_task(
_pde(
"kg_consolidation",
"kg_consolidation",
phase="consolidation",
status="ok" if stats["errors"] == 0 else "partial_error",
preview=(
f"merged={stats['merged']} pruned_entities={stats['pruned_entities']} "
f"pruned_rels={stats['pruned_rels']} errors={stats['errors']}"
),
payload={
"merged": stats["merged"],
"pruned_entities": stats["pruned_entities"],
"pruned_rels": stats["pruned_rels"],
"error_count": stats["errors"],
},
)
)
except Exception:
pass
return stats
async def _merge_similar_entities(
kg_manager: KnowledgeGraphManager,
openrouter: OpenRouterClient,
label: str,
category: str,
stats: dict[str, int],
) -> None:
"""Merge near-duplicate entities of one label and category using embeddings + LLM.
Pulls up to 50 same-label, same-category, same-scope candidate pairs from the
graph, embeds each entity's ``name``/``description``, and for pairs whose cosine
similarity clears 0.85 asks the LLM (a final, conservative gate) whether they
truly refer to the same thing before merging. This is one inner pass of the
legacy LLM-driven consolidation; it runs only when ``_CONSOLIDATION_ENABLED``
is true, restricting merges to within a single category tier so distinct scopes
never collapse together.
It queries ``kg_manager._graph``, hydrates each side via
``kg_manager.get_entity``, batch-embeds through ``kg_manager._embed_batch``,
scores pairs with :func:`utils.cosine.cosine_pair`, confirms via
:func:`_ask_llm_merge`, and applies accepted merges through :func:`_do_merge`,
accumulating ``merged`` and ``errors`` into *stats*. It is called by
:func:`consolidate_graph` once per ``(label, category)`` combination.
Args:
kg_manager: Knowledge-graph manager providing graph, embedding, and entity
access.
openrouter: LLM client passed through to :func:`_ask_llm_merge`.
label: Entity label (e.g. ``"Person"``) to scan.
category: Category tier to restrict candidate pairs to.
stats: Mutable run-stats dict; ``merged`` and ``errors`` are incremented.
"""
q = (
f"MATCH (a:{label} {{category: $cat}}) "
f"MATCH (b:{label} {{category: $cat}}) "
f"WHERE a.name < b.name AND a.scope_id = b.scope_id "
f"RETURN a.name, b.name, a.scope_id, "
f"a.uuid, b.uuid "
f"LIMIT 50"
)
try:
result = await kg_manager._graph.query(
q,
params={"cat": category},
)
except Exception:
return
if not result.result_set:
return
pairs: list[tuple[str, str, str, str, str, dict, dict]] = []
embed_texts: list[str] = []
for row in result.result_set:
name_a, name_b, scope_id = row[0], row[1], row[2]
uuid_a, uuid_b = row[3], row[4]
try:
ent_a = await kg_manager.get_entity(
name_a,
entity_type=label,
)
ent_b = await kg_manager.get_entity(
name_b,
entity_type=label,
)
if not ent_a or not ent_b:
continue
pairs.append(
(
name_a,
name_b,
scope_id,
uuid_a,
uuid_b,
ent_a,
ent_b,
)
)
desc_a = ent_a.get("description", "")
desc_b = ent_b.get("description", "")
embed_texts.append(f"{name_a}: {desc_a}")
embed_texts.append(f"{name_b}: {desc_b}")
except Exception:
logger.debug(
"Entity fetch failed for %s/%s",
name_a,
name_b,
exc_info=True,
)
stats["errors"] += 1
if not pairs:
return
try:
all_vecs = await kg_manager._embed_batch(embed_texts)
except Exception:
logger.debug(
"Batch embedding for consolidation failed",
exc_info=True,
)
return
from utils.cosine import cosine_pair
for i, (name_a, name_b, scope_id, uuid_a, uuid_b, ent_a, ent_b) in enumerate(pairs):
try:
vec_a = all_vecs[i * 2]
vec_b = all_vecs[i * 2 + 1]
sim = cosine_pair(vec_a, vec_b)
if sim < 0.85:
continue
should_merge = await _ask_llm_merge(
openrouter,
name_a,
ent_a.get("description", ""),
name_b,
ent_b.get("description", ""),
)
if not should_merge:
continue
await _do_merge(
kg_manager,
label,
category,
scope_id,
name_a,
ent_a,
uuid_a,
name_b,
ent_b,
uuid_b,
)
stats["merged"] += 1
except Exception:
logger.debug(
"Merge check failed for %s/%s",
name_a,
name_b,
exc_info=True,
)
stats["errors"] += 1
async def _ask_llm_merge(
openrouter: OpenRouterClient,
name_a: str,
desc_a: str,
name_b: str,
desc_b: str,
) -> bool:
"""Ask the LLM for a yes/no verdict on whether two entities should be merged.
Serves as the final, conservative gate after embedding similarity has already
flagged a pair as likely duplicates: it builds a tiny prompt presenting both
entities' names and descriptions and asks for a bare ``yes``/``no``, treating
only an affirmative answer as a green light to merge. Any LLM error is swallowed
and reported as "do not merge", so a flaky model can never trigger a spurious
merge.
It issues a single ``openrouter.chat`` call with a system and user message and
inspects the reply text. It is called by :func:`_merge_similar_entities` for
each high-similarity candidate pair.
Args:
openrouter: LLM client used for the one-shot yes/no chat completion.
name_a: Name of the first entity.
desc_a: Description of the first entity.
name_b: Name of the second entity.
desc_b: Description of the second entity.
Returns:
bool: ``True`` only if the model's reply starts with ``"yes"``; ``False``
otherwise, including on any error.
"""
prompt = (
"Should these two knowledge graph entities"
" be merged into one?\n\n"
f"Entity A: {name_a} — {desc_a}\n"
f"Entity B: {name_b} — {desc_b}\n\n"
f"Answer ONLY 'yes' or 'no'."
)
sys_msg = "You help maintain a knowledge graph." " Answer only 'yes' or 'no'."
msgs = [
{"role": "system", "content": sys_msg},
{"role": "user", "content": prompt},
]
try:
raw = await openrouter.chat(msgs)
return raw.strip().lower().startswith("yes")
except Exception:
return False
async def _do_merge(
kg_manager: KnowledgeGraphManager,
label: str,
category: str,
scope_id: str,
name_a: str,
ent_a: dict,
uuid_a: str,
name_b: str,
ent_b: dict,
uuid_b: str,
) -> None:
"""Fold entity B into entity A inside one transaction, then re-embed A.
Performs the concrete graph mutation behind a confirmed merge: it combines the
two descriptions, sums their mention counts, re-routes every outgoing and
incoming relationship of B onto A (preserving each relationship's type and
properties, skipping ones that would duplicate an existing edge), deletes B, and
commits — rolling back and re-raising on any failure so the graph is never left
half-merged. After the transaction succeeds it recomputes and stores A's
embedding so future similarity checks reflect the merged content.
It combines descriptions through :func:`merge_entity_descriptions_queued` (which
may enqueue an LLM summary for over-long results), sanitizes every re-routed
relationship type with :func:`_sanitize_rel_type`, runs all Cypher inside a
``kg_manager._graph.new_transaction()`` boundary, and finally writes a fresh
vector via ``kg_manager._embed``. It is called by :func:`_merge_similar_entities`
once the LLM has approved a pair.
Args:
kg_manager: Knowledge-graph manager providing graph, redis, and embedding
access.
label: Entity label of the merged nodes (carried for context).
category: Category tier of the merge (carried for context).
scope_id: Shared scope id of the two entities (carried for context).
name_a: Name of the surviving entity A.
ent_a: Property dict of entity A.
uuid_a: ``uuid`` of the surviving entity A.
name_b: Name of the entity B being merged away.
ent_b: Property dict of entity B.
uuid_b: ``uuid`` of the entity B being deleted.
Raises:
Exception: Re-raised after rollback if any query in the merge transaction
fails.
"""
desc_a = ent_a.get("description", "")
desc_b = ent_b.get("description", "")
# Concatenate descriptions with queueing fallback if >50,000 chars
merged_desc = await merge_entity_descriptions_queued(
kg_manager._redis, kg_manager._graph, uuid_a, name_a, desc_a, desc_b
)
mentions_a = ent_a.get("mention_count", 1) or 1
mentions_b = ent_b.get("mention_count", 1) or 1
import time
now = time.time()
tx = kg_manager._graph.new_transaction()
try:
q_update = (
"MATCH (a {uuid: $uuid_a}) "
"SET a.description = $desc, "
" a.mention_count = $mc, a.updated_at = $now "
"RETURN a.name"
)
await tx.query(
q_update,
params={
"uuid_a": uuid_a,
"desc": merged_desc,
"mc": mentions_a + mentions_b,
"now": now,
},
)
# Redirect outgoing: preserve original relationship types
q_get_out = (
"MATCH (b {uuid: $uuid_b})-[r]->(t) "
"RETURN type(r) AS rel_type, t.uuid AS tgt_uuid, "
"r.weight AS weight, r.description AS description, "
"r.category AS category, r.priority AS priority, "
"r.scope_id AS scope_id, r.created_at AS created_at, "
"r.updated_at AS updated_at"
)
out_result = await tx.query(q_get_out, params={"uuid_b": uuid_b})
for row in out_result.result_set or []:
rel_type = _sanitize_rel_type(row[0])
tgt_uuid = row[1]
weight = row[2] or 0.5
desc = row[3] or ""
cat = row[4] or "general"
pri = row[5] or 40
sid = row[6] or "_"
created = row[7] or now
updated = row[8] or now
q_create = (
f"MATCH (a {{uuid: $uuid_a}}) MATCH (t {{uuid: $tgt_uuid}}) "
f"WHERE NOT (a)-[]->(t) "
f"CREATE (a)-[r2:{rel_type} "
f"{{weight: $w, description: $desc, category: $cat, "
f"priority: $pri, scope_id: $sid, src_uuid: $uuid_a, "
f"tgt_uuid: $tgt_uuid, created_at: $created, updated_at: $updated}}]->(t)"
)
await tx.query(
q_create,
params={
"uuid_a": uuid_a,
"tgt_uuid": tgt_uuid,
"w": weight,
"desc": desc,
"cat": cat,
"pri": pri,
"sid": sid,
"created": created,
"updated": updated,
},
)
# Redirect incoming: preserve original relationship types
q_get_in = (
"MATCH (s)-[r]->(b {uuid: $uuid_b}) "
"RETURN type(r) AS rel_type, s.uuid AS src_uuid, "
"r.weight AS weight, r.description AS description, "
"r.category AS category, r.priority AS priority, "
"r.scope_id AS scope_id, r.created_at AS created_at, "
"r.updated_at AS updated_at"
)
in_result = await tx.query(q_get_in, params={"uuid_b": uuid_b})
for row in in_result.result_set or []:
rel_type = _sanitize_rel_type(row[0])
src_uuid = row[1]
weight = row[2] or 0.5
desc = row[3] or ""
cat = row[4] or "general"
pri = row[5] or 40
sid = row[6] or "_"
created = row[7] or now
updated = row[8] or now
q_create = (
f"MATCH (s {{uuid: $src_uuid}}) MATCH (a {{uuid: $uuid_a}}) "
f"WHERE NOT (s)-[]->(a) "
f"CREATE (s)-[r2:{rel_type} "
f"{{weight: $w, description: $desc, category: $cat, "
f"priority: $pri, scope_id: $sid, src_uuid: $src_uuid, "
f"tgt_uuid: $uuid_a, created_at: $created, updated_at: $updated}}]->(a)"
)
await tx.query(
q_create,
params={
"src_uuid": src_uuid,
"uuid_a": uuid_a,
"w": weight,
"desc": desc,
"cat": cat,
"pri": pri,
"sid": sid,
"created": created,
"updated": updated,
},
)
q_delete = "MATCH (b {uuid: $uuid_b}) " "DETACH DELETE b"
await tx.query(
q_delete,
params={
"uuid_b": uuid_b,
},
)
await tx.commit()
except Exception as e:
logger.error(
"Transaction failed inside _do_merge! Rolling back. Error: %s", str(e)
)
await tx.rollback()
raise
embed_text = f"{name_a}: {merged_desc}"
vec = await kg_manager._embed(embed_text)
q_re_embed = (
"MATCH (a {uuid: $uuid_a}) " "SET a.embedding = vecf32($vec) " "RETURN a.name"
)
await kg_manager._graph.query(
q_re_embed,
params={
"uuid_a": uuid_a,
"vec": vec,
},
)
async def _prune_orphans(
kg_manager: KnowledgeGraphManager,
stats: dict[str, int],
) -> None:
"""Delete stale, disconnected, rarely-mentioned non-core entities from the graph.
Garbage-collects entities that have clearly become noise: those with no
relationships at all, a mention count of one or less, and an ``updated_at``
older than 30 days. Core, basic, and pinned entities are explicitly excluded so
identity-level and human-curated knowledge is never pruned. Running this keeps
the graph from accumulating one-off mentions that were never reinforced.
It executes a single ``DETACH DELETE`` Cypher query against
``kg_manager._graph`` and adds the deleted count to ``stats["pruned_entities"]``.
It is called by :func:`consolidate_graph` (in both the disabled and enabled
branches) as part of every consolidation run.
Args:
kg_manager: Knowledge-graph manager providing graph access.
stats: Mutable run-stats dict; ``pruned_entities`` is incremented.
"""
import time
cutoff = time.time() - 30 * 86400 # 30 days
q = (
"MATCH (e) "
"WHERE e.category <> 'core' "
" AND e.category <> 'basic' "
" AND (e.pinned IS NULL OR e.pinned = false) "
" AND NOT (e)-[]-() "
" AND e.mention_count <= 1 "
" AND e.updated_at < $cutoff "
"DETACH DELETE e "
"RETURN count(e) AS pruned"
)
try:
result = await kg_manager._graph.query(
q,
params={"cutoff": cutoff},
)
if result.result_set:
stats["pruned_entities"] += result.result_set[0][0]
except Exception:
logger.debug(
"Orphan prune query failed",
exc_info=True,
)
async def _prune_weak_relationships(
kg_manager: KnowledgeGraphManager,
stats: dict[str, int],
) -> None:
"""Delete relationships whose weight has decayed below the usefulness floor.
Sweeps the graph for edges with ``weight`` under 0.05, sparing high-priority
(``priority >= 100``, i.e. core) relationships so foundational links are never
severed. This is the counterpart to entity orphan pruning: over time
:func:`decay_relationships` erodes edge weights, and this pass removes the ones
that have effectively faded to nothing.
It runs a single ``DELETE`` Cypher query against ``kg_manager._graph`` and adds
the deleted count to ``stats["pruned_rels"]``. It is called by
:func:`consolidate_graph` (in both the disabled and enabled branches) on every
consolidation run.
Args:
kg_manager: Knowledge-graph manager providing graph access.
stats: Mutable run-stats dict; ``pruned_rels`` is incremented.
"""
q = (
"MATCH ()-[r]->() "
"WHERE r.weight < 0.05 AND r.priority < 100 "
"DELETE r "
"RETURN count(r) AS pruned"
)
try:
result = await kg_manager._graph.query(q)
if result.result_set:
stats["pruned_rels"] += result.result_set[0][0]
except Exception:
logger.debug(
"Weak relationship prune failed",
exc_info=True,
)
# -------------------------------------------------------------------
# Relationship decay
# -------------------------------------------------------------------
[docs]
async def decay_relationships(
kg_manager: KnowledgeGraphManager,
decay_factor: float = 0.95,
) -> int:
"""Apply weight decay to all non-core relationships.
Core (priority=100) never decay.
Guild (priority=80) decay at half rate.
All others decay at the full rate.
"""
import math
guild_factor = math.sqrt(decay_factor)
q_others = (
"MATCH ()-[r]->() "
"WHERE r.priority < 80 "
"SET r.weight = r.weight * $factor "
"RETURN count(r)"
)
q_guild = (
"MATCH ()-[r]->() "
"WHERE r.priority >= 80 AND r.priority < 100 "
"SET r.weight = r.weight * $factor "
"RETURN count(r)"
)
q_cleanup = (
"MATCH ()-[r]->() "
"WHERE r.weight < 0.05 AND r.priority < 100 "
"DELETE r "
"RETURN count(r)"
)
total = 0
try:
await kg_manager._graph.query(
q_others,
params={"factor": decay_factor},
)
await kg_manager._graph.query(
q_guild,
params={"factor": guild_factor},
)
r3 = await kg_manager._graph.query(q_cleanup)
deleted = r3.result_set[0][0] if r3.result_set else 0
total = deleted
if deleted:
logger.info(
"Relationship decay: removed " "%d weak relationships",
deleted,
)
except Exception:
logger.debug(
"Relationship decay failed",
exc_info=True,
)
_emit_kg_decay(total)
return total
def _emit_kg_decay(relationships_removed: int) -> None:
"""Publish a fire-and-forget ``kg_decay`` observability event.
Emits a debug event recording how many relationships the decay pass removed, so
the consolidation subsystem's activity is visible on the observability surface.
Any failure to publish is swallowed — telemetry must never break or block the
decay itself.
It schedules ``observability.publish_debug_event`` on the event loop via
``asyncio.create_task`` (lazily imported so the dependency stays optional). It
is called once at the end of :func:`decay_relationships`.
Args:
relationships_removed: Number of weak relationships deleted during decay,
reported in the event preview and payload.
"""
try:
from observability import publish_debug_event as _pde
asyncio.create_task(
_pde(
"kg_decay",
"kg_consolidation",
phase="decay",
status="ok",
preview=f"relationships_removed={relationships_removed}",
payload={"relationships_removed": relationships_removed},
)
)
except Exception:
pass