"""FalkorDB-backed knowledge graph with hybrid vector+graph retrieval.
Replaces the flat Redis memory system (``memory_manager.py``) with a
property graph that stores entities as nodes and relationships as typed
edges. Each node carries a 3072-dim embedding vector indexed via HNSW
for semantic search, while multi-hop Cypher traversal connects related
facts across the graph.
The five-tier authority hierarchy (core > guild > channel > general > user)
is preserved as ``category`` / ``priority`` properties on every node and edge.
"""
from __future__ import annotations
import asyncio
import json
import logging
import time
from typing import Any, TYPE_CHECKING
import numpy as np
from falkordb.asyncio import FalkorDB
from uuid6 import uuid7
from .constants import (
CATEGORY_PRIORITY,
ENTITY_LABELS,
RECOMMENDED_RELATION_TYPES,
_NO_SCOPE,
_PRIORITY_NORM,
_SENTINEL_USER_ID,
_vec_literal,
)
if TYPE_CHECKING:
import redis.asyncio as aioredis
from openrouter_client import OpenRouterClient
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# KnowledgeGraphManager
# ---------------------------------------------------------------------------
[docs]
class KnowledgeGraphManager:
"""Manages the FalkorDB knowledge graph for entity/relationship
CRUD and hybrid retrieval.
"""
GRAPH_NAME = "knowledge"
[docs]
def __init__(
self,
redis_client: aioredis.Redis,
openrouter: OpenRouterClient,
embedding_model: str = "google/gemini-embedding-001",
admin_user_ids: set[str] | None = None,
) -> None:
"""Initialize the instance.
Args:
redis_client (aioredis.Redis): Redis connection client.
openrouter (OpenRouterClient): The openrouter value.
embedding_model (str): The embedding model value.
admin_user_ids (set[str] | None): The admin user ids value.
"""
self._redis = redis_client
self._db = FalkorDB(
connection_pool=redis_client.connection_pool,
)
self._graph = self._db.select_graph(self.GRAPH_NAME)
self._openrouter = openrouter
self._embedding_model = embedding_model
self._admin_user_ids: set[str] = (
admin_user_ids or set()
)
# ------------------------------------------------------------------
# Embedding helper
# ------------------------------------------------------------------
async def _embed(self, text: str) -> list[float]:
"""Embed text using Gemini direct API (free tier).
Forces the Gemini-direct path to avoid paid OpenRouter
embedding calls. The generic ``openrouter.embed()`` randomly
routes through OpenRouter ~50% of the time.
"""
return await self._openrouter._embed_gemini(
text, self._embedding_model,
)
async def _embed_batch(self, texts: list[str]) -> list[list[float]]:
"""Batch-embed texts using Gemini direct batchEmbedContents API."""
if not texts:
return []
return await self._openrouter._embed_gemini_batch(
texts, self._embedding_model,
)
# ------------------------------------------------------------------
# Index management
# ------------------------------------------------------------------
[docs]
async def ensure_indexes(self) -> None:
"""Create vector + range indexes for every entity label.
This operation is idempotent.
"""
for label in ENTITY_LABELS:
try:
await self._graph.create_node_vector_index(
label, "embedding",
dim=3072,
similarity_function="cosine",
)
except Exception as exc:
msg = str(exc).lower()
if (
"already indexed" in msg
or "already exists" in msg
):
pass
else:
logger.warning(
"Vector index creation for %s: %s",
label, exc,
)
for prop in ("name", "category", "scope_id", "pinned", "uuid", "user_id"):
try:
await self._graph.create_node_range_index(label, prop)
except Exception as exc:
msg = str(exc).lower()
if "already exists" in msg or "already indexed" in msg:
pass
else:
logger.warning(
"Range index creation for %s.%s: %s",
label, prop, exc,
)
await self._backfill_user_ids()
logger.info(
"Knowledge graph indexes ensured for %d labels",
len(ENTITY_LABELS),
)
async def _backfill_user_ids(self) -> None:
"""Set ``user_id`` to the sentinel value on entities that lack one.
Idempotent -- only touches nodes where ``user_id IS NULL``.
"""
for label in ENTITY_LABELS:
try:
q = (
f"MATCH (e:{label}) "
f"WHERE e.user_id IS NULL "
f"SET e.user_id = $uid "
f"RETURN count(e)"
)
result = await self._graph.query(
q, params={"uid": _SENTINEL_USER_ID},
)
count = (
result.result_set[0][0]
if result.result_set else 0
)
if count:
logger.info(
"Backfilled user_id on %d %s nodes",
count, label,
)
except Exception:
logger.debug(
"user_id backfill for %s failed",
label, exc_info=True,
)
# ------------------------------------------------------------------
# Entity CRUD
# ------------------------------------------------------------------
[docs]
async def add_entity(
self,
name: str,
entity_type: str,
description: str,
category: str = "general",
scope_id: str = "_",
created_by: str = "unknown",
pinned: bool = False,
metadata: str = "{}",
user_id: str = _SENTINEL_USER_ID,
embedding: list[float] | None = None,
) -> dict[str, str]:
"""Create or update an entity.
Returns ``{"name": ..., "uuid": ...}``.
"""
if entity_type not in ENTITY_LABELS:
raise ValueError(
f"Unknown entity type: {entity_type}"
)
if category not in CATEGORY_PRIORITY:
raise ValueError(
f"Unknown category: {category}"
)
priority = CATEGORY_PRIORITY[category]
now = time.time()
name_lower = name.strip().lower()
new_uuid = str(uuid7())
if embedding is not None:
vec = embedding
else:
embed_text = (
f"{name}: {description}" if description else name
)
vec = await self._embed(embed_text)
q = (
f"MERGE (e:{entity_type} {{name: $name, "
f"scope_id: $sid, category: $cat}}) "
f"ON CREATE SET "
f"e.uuid = $uuid, "
f"e.description = $desc, "
f"e.priority = $pri, "
f"e.embedding = vecf32($vec), "
f"e.pinned = $pinned, "
f"e.mention_count = 1, "
f"e.created_at = $now, "
f"e.updated_at = $now, "
f"e.created_by = $creator, "
f"e.user_id = $user_id, "
f"e.metadata = $meta "
f"ON MATCH SET "
f"e.description = $desc, "
f"e.embedding = vecf32($vec), "
f"e.mention_count = e.mention_count + 1, "
f"e.updated_at = $now "
f"RETURN e.name, e.uuid"
)
params = {
"name": name_lower,
"sid": scope_id or _NO_SCOPE,
"cat": category,
"desc": description,
"pri": priority,
"vec": vec,
"pinned": pinned,
"now": now,
"creator": created_by,
"uuid": new_uuid,
"meta": metadata,
"user_id": user_id or _SENTINEL_USER_ID,
}
result = await self._graph.query(q, params=params)
row = result.result_set[0] if result.result_set else None
return {
"name": row[0] if row else name_lower,
"uuid": row[1] if row else new_uuid,
}
[docs]
async def update_entity_description(
self,
name: str,
entity_type: str,
new_description: str,
category: str | None = None,
scope_id: str | None = None,
) -> bool:
"""Update an entity's description and re-embed."""
name_lower = name.strip().lower()
embed_text = (
f"{name}: {new_description}"
if new_description else name
)
vec = await self._embed(embed_text)
now = time.time()
where_parts = ["e.name = $name"]
params: dict[str, Any] = {
"name": name_lower,
"desc": new_description,
"vec": vec,
"now": now,
}
if category:
where_parts.append("e.category = $cat")
params["cat"] = category
if scope_id:
where_parts.append("e.scope_id = $sid")
params["sid"] = scope_id
where = " AND ".join(where_parts)
q = (
f"MATCH (e:{entity_type}) WHERE {where} "
f"SET e.description = $desc, "
f"e.embedding = vecf32($vec), "
f"e.updated_at = $now "
f"RETURN e.name"
)
result = await self._graph.query(
q, params=params,
)
return len(result.result_set) > 0
[docs]
async def edit_entity(
self,
uuid: str,
description: str | None = None,
append_text: str | None = None,
pinned: bool | None = None,
category: str | None = None,
metadata_updates: dict | None = None,
) -> dict | None:
"""Selectively update fields on an existing entity.
Looks up by *uuid*. Only the provided fields are changed;
everything else is preserved.
*description* replaces the text entirely.
*append_text* is concatenated to the existing description.
(Mutually exclusive -- caller must pick one.)
*metadata_updates* is shallow-merged into the existing
metadata JSON (new keys added, existing overwritten,
unmentioned preserved).
Returns the full updated entity dict via :meth:`get_entity`,
or ``None`` if the UUID was not found.
"""
# -- Read current state -------------------------------------------
read_q = (
"MATCH (e) WHERE e.uuid = $uuid "
"RETURN e.name, labels(e)[0], "
"e.description, e.metadata, e.category"
)
result = await self._graph.query(
read_q, params={"uuid": uuid},
)
if not result.result_set:
return None
row = result.result_set[0]
cur_name: str = row[0]
entity_type: str = row[1]
cur_desc: str = row[2] or ""
raw_meta: str = row[3] or "{}"
cur_cat: str = row[4] or "general"
try:
cur_meta = json.loads(raw_meta) if isinstance(
raw_meta, str,
) else {}
except (json.JSONDecodeError, TypeError):
cur_meta = {}
# -- Compute new values -------------------------------------------
new_desc: str | None = None
if description is not None:
new_desc = description
elif append_text:
new_desc = (
(cur_desc + "\n" + append_text)
if cur_desc else append_text
)
new_meta: str | None = None
if metadata_updates:
merged = {**cur_meta, **metadata_updates}
new_meta = json.dumps(merged, ensure_ascii=False)
new_cat = category
if new_cat and new_cat not in CATEGORY_PRIORITY:
raise ValueError(
f"Unknown category: {new_cat}"
)
# -- Build SET clause dynamically ---------------------------------
now = time.time()
set_parts: list[str] = ["e.updated_at = $now"]
params: dict[str, Any] = {"uuid": uuid, "now": now}
if new_desc is not None:
set_parts.append("e.description = $desc")
params["desc"] = new_desc
embed_text = (
f"{cur_name}: {new_desc}"
if new_desc else cur_name
)
vec = await self._embed(embed_text)
set_parts.append("e.embedding = vecf32($vec)")
params["vec"] = vec
if pinned is not None:
set_parts.append("e.pinned = $pinned")
params["pinned"] = pinned
if new_cat:
set_parts.append("e.category = $cat")
params["cat"] = new_cat
set_parts.append("e.priority = $pri")
params["pri"] = CATEGORY_PRIORITY[new_cat]
if new_meta is not None:
set_parts.append("e.metadata = $meta")
params["meta"] = new_meta
set_clause = ", ".join(set_parts)
write_q = (
f"MATCH (e:{entity_type}) "
f"WHERE e.uuid = $uuid "
f"SET {set_clause} "
f"RETURN e.uuid"
)
await self._graph.query(write_q, params=params)
return await self.get_entity(uuid=uuid)
[docs]
async def delete_entity(
self,
name: str,
entity_type: str,
category: str,
scope_id: str = "_",
) -> bool:
"""Delete the specified entity.
Args:
name (str): Human-readable name.
entity_type (str): The entity type value.
category (str): The category value.
scope_id (str): The scope id value.
Returns:
bool: True on success, False otherwise.
"""
name_lower = name.strip().lower()
q = (
f"MATCH (e:{entity_type} {{name: $name, "
f"scope_id: $sid, category: $cat}}) "
f"DETACH DELETE e "
f"RETURN count(e) AS deleted"
)
params = {
"name": name_lower,
"sid": scope_id,
"cat": category,
}
result = await self._graph.query(
q, params=params,
)
if result.result_set:
return result.result_set[0][0] > 0
return False
[docs]
async def delete_entity_by_uuid(
self,
uuid: str,
) -> bool:
"""Delete an entity by UUID (detach-deletes all relationships)."""
q = (
"MATCH (e {uuid: $uuid}) "
"DETACH DELETE e "
"RETURN count(e) AS deleted"
)
result = await self._graph.query(
q, params={"uuid": uuid},
)
if result.result_set:
return result.result_set[0][0] > 0
return False
[docs]
async def pin_entity(
self,
name: str,
entity_type: str,
pinned: bool = True,
category: str | None = None,
scope_id: str | None = None,
) -> bool:
"""Set or clear the pinned flag on an entity.
When category and/or scope_id are provided, only entities
matching those filters are updated. This avoids pinning
the wrong entity when the same name exists in multiple scopes.
"""
name_lower = name.strip().lower()
now = time.time()
where_parts = ["e.name = $name"]
params: dict[str, Any] = {
"name": name_lower,
"pinned": pinned,
"now": now,
}
if category is not None:
where_parts.append("e.category = $cat")
params["cat"] = category
if scope_id is not None:
where_parts.append("e.scope_id = $sid")
params["sid"] = scope_id or _NO_SCOPE
where_clause = " AND ".join(where_parts)
q = (
f"MATCH (e:{entity_type}) "
f"WHERE {where_clause} "
f"SET e.pinned = $pinned, "
f"e.updated_at = $now "
f"RETURN e.name"
)
result = await self._graph.query(q, params=params)
return len(result.result_set) > 0
[docs]
async def get_entity(
self,
name: str = "",
entity_type: str | None = None,
category: str | None = None,
scope_id: str | None = None,
uuid: str | None = None,
) -> dict | None:
"""Fetch an entity with its immediate connections.
Can look up by *name* or by *uuid*.
"""
label_filter = (
f":{entity_type}" if entity_type else ""
)
where_parts: list[str] = []
params: dict[str, Any] = {}
if uuid:
where_parts.append("e.uuid = $uuid")
params["uuid"] = uuid
else:
name_lower = name.strip().lower()
where_parts.append("e.name = $name")
params["name"] = name_lower
if category:
where_parts.append("e.category = $cat")
params["cat"] = category
if scope_id:
where_parts.append("e.scope_id = $sid")
params["sid"] = scope_id
where = " AND ".join(where_parts)
q = (
f"MATCH (e{label_filter}) WHERE {where} "
f"OPTIONAL MATCH (e)-[r]-(neighbor) "
f"RETURN e.name AS name, "
f"labels(e)[0] AS type, "
f"e.description AS description, "
f"e.category AS category, "
f"e.priority AS priority, "
f"e.scope_id AS scope_id, "
f"e.mention_count AS mention_count, "
f"e.created_at AS created_at, "
f"e.updated_at AS updated_at, "
f"e.created_by AS created_by, "
f"e.uuid AS uuid, "
f"e.metadata AS metadata, "
f"e.user_id AS user_id, "
f"collect(DISTINCT {{rel: type(r), "
f"target: neighbor.name, "
f"target_uuid: neighbor.uuid, "
f"target_type: labels(neighbor)[0], "
f"target_category: neighbor.category, "
f"weight: r.weight, "
f"description: r.description, "
f"priority: r.priority}}) AS connections "
f"LIMIT 1"
)
result = await self._graph.query(
q, params=params,
)
if not result.result_set:
return None
row = result.result_set[0]
raw_meta = row[11] or "{}"
try:
meta = json.loads(raw_meta) if isinstance(
raw_meta, str,
) else {}
except (json.JSONDecodeError, TypeError):
meta = {}
entity_user_id = row[12] or _SENTINEL_USER_ID
conns = row[13] if row[13] else []
conns = [
c for c in conns if c.get("rel") is not None
]
return {
"name": row[0],
"type": row[1],
"description": row[2],
"category": row[3],
"priority": row[4],
"scope_id": row[5],
"mention_count": row[6],
"created_at": row[7],
"updated_at": row[8],
"created_by": row[9],
"uuid": row[10],
"metadata": meta,
"user_id": entity_user_id,
"connections": conns,
}
[docs]
async def inspect_entity(
self,
name: str = "",
uuid: str | None = None,
max_depth: int = 2,
neighbor_limit: int = 50,
) -> dict | None:
"""Deep inspection of an entity and its full neighborhood.
Returns the entity's properties plus all outgoing and
incoming relationships (up to *max_depth* hops), with
each neighbor's core properties included.
"""
where_parts: list[str] = []
params: dict[str, Any] = {}
if uuid:
where_parts.append("e.uuid = $uuid")
params["uuid"] = uuid
elif name:
where_parts.append("e.name = $name")
params["name"] = name.strip().lower()
else:
return None
where = " AND ".join(where_parts)
entity_q = (
f"MATCH (e) WHERE {where} "
f"RETURN e.name, labels(e)[0], e.description, "
f"e.category, e.priority, e.scope_id, "
f"e.mention_count, e.created_at, e.updated_at, "
f"e.created_by, e.uuid, e.metadata, e.user_id, "
f"e.pinned "
f"LIMIT 1"
)
result = await self._graph.query(entity_q, params=params)
if not result.result_set:
return None
row = result.result_set[0]
raw_meta = row[11] or "{}"
try:
meta = (
json.loads(raw_meta)
if isinstance(raw_meta, str) else {}
)
except (json.JSONDecodeError, TypeError):
meta = {}
entity = {
"name": row[0],
"type": row[1],
"description": row[2],
"category": row[3],
"priority": row[4],
"scope_id": row[5],
"mention_count": row[6],
"created_at": row[7],
"updated_at": row[8],
"created_by": row[9],
"uuid": row[10],
"metadata": meta,
"user_id": row[12] or _SENTINEL_USER_ID,
"pinned": bool(row[13]),
}
entity_uuid = entity["uuid"]
out_q = (
"MATCH (e {uuid: $uuid})-[r]->(t) "
"RETURN type(r) AS rel, r.weight AS weight, "
"r.description AS rdesc, r.priority AS rpri, "
"t.name AS tname, labels(t)[0] AS ttype, "
"t.uuid AS tuuid, t.category AS tcat, "
"t.description AS tdesc "
"ORDER BY r.weight DESC "
"LIMIT $lim"
)
out_result = await self._graph.query(
out_q,
params={"uuid": entity_uuid, "lim": neighbor_limit},
)
in_q = (
"MATCH (s)-[r]->(e {uuid: $uuid}) "
"RETURN type(r) AS rel, r.weight AS weight, "
"r.description AS rdesc, r.priority AS rpri, "
"s.name AS sname, labels(s)[0] AS stype, "
"s.uuid AS suuid, s.category AS scat, "
"s.description AS sdesc "
"ORDER BY r.weight DESC "
"LIMIT $lim"
)
in_result = await self._graph.query(
in_q,
params={"uuid": entity_uuid, "lim": neighbor_limit},
)
outgoing = [
{
"relation": row[0],
"weight": row[1],
"rel_description": row[2],
"rel_priority": row[3],
"target_name": row[4],
"target_type": row[5],
"target_uuid": row[6],
"target_category": row[7],
"target_description": row[8],
}
for row in (out_result.result_set or [])
]
incoming = [
{
"relation": row[0],
"weight": row[1],
"rel_description": row[2],
"rel_priority": row[3],
"source_name": row[4],
"source_type": row[5],
"source_uuid": row[6],
"source_category": row[7],
"source_description": row[8],
}
for row in (in_result.result_set or [])
]
# Optional: 2-hop neighbors (neighbors of neighbors)
second_hop: list[dict] = []
if max_depth >= 2:
hop2_q = (
"MATCH (e {uuid: $uuid})-[r1]-(n1)-[r2]-(n2) "
"WHERE n2.uuid <> $uuid "
"RETURN DISTINCT n2.name AS name, "
"labels(n2)[0] AS type, "
"n2.uuid AS uuid, n2.category AS cat, "
"type(r2) AS via_rel, "
"n1.name AS via_node "
"LIMIT $lim"
)
try:
hop2 = await self._graph.query(
hop2_q,
params={
"uuid": entity_uuid,
"lim": neighbor_limit,
},
timeout=10_000,
)
second_hop = [
{
"name": row[0],
"type": row[1],
"uuid": row[2],
"category": row[3],
"via_relation": row[4],
"via_node": row[5],
}
for row in (hop2.result_set or [])
]
except Exception:
logger.debug(
"inspect_entity: 2-hop query timed out",
exc_info=True,
)
return {
"entity": entity,
"outgoing": outgoing,
"incoming": incoming,
"second_hop": second_hop,
"summary": {
"outgoing_count": len(outgoing),
"incoming_count": len(incoming),
"second_hop_count": len(second_hop),
},
}
[docs]
async def list_entities(
self,
entity_type: str | None = None,
category: str | None = None,
scope_id: str | None = None,
limit: int = 50,
) -> list[dict]:
"""List entities with optional filtering."""
label_filter = (
f":{entity_type}" if entity_type else ""
)
where_parts: list[str] = []
params: dict[str, Any] = {"lim": limit}
if category:
where_parts.append("e.category = $cat")
params["cat"] = category
if scope_id:
where_parts.append("e.scope_id = $sid")
params["sid"] = scope_id
where = (
(" WHERE " + " AND ".join(where_parts))
if where_parts else ""
)
q = (
f"MATCH (e{label_filter}){where} "
f"RETURN e.name AS name, "
f"labels(e)[0] AS type, "
f"e.description AS description, "
f"e.category AS category, "
f"e.priority AS priority, "
f"e.scope_id AS scope_id, "
f"e.mention_count AS mention_count, "
f"e.updated_at AS updated_at, "
f"e.uuid AS uuid, "
f"e.metadata AS metadata, "
f"e.user_id AS user_id "
f"ORDER BY e.updated_at DESC "
f"LIMIT $lim"
)
result = await self._graph.query(
q, params=params,
)
entities = []
for row in result.result_set:
raw_meta = row[9] or "{}"
try:
meta = json.loads(raw_meta) if isinstance(
raw_meta, str,
) else {}
except (json.JSONDecodeError, TypeError):
meta = {}
entities.append({
"name": row[0],
"type": row[1],
"description": row[2],
"category": row[3],
"priority": row[4],
"scope_id": row[5],
"mention_count": row[6],
"updated_at": row[7],
"uuid": row[8],
"metadata": meta,
"user_id": row[10] or _SENTINEL_USER_ID,
})
return entities
# ------------------------------------------------------------------
# Relationship CRUD
# ------------------------------------------------------------------
[docs]
async def add_relationship(
self,
source_uuid: str,
target_uuid: str,
relation_type: str,
weight: float = 0.5,
description: str = "",
evidence: str = "",
) -> bool:
"""Create or reinforce a relationship between two
entities identified by UUID.
The edge inherits ``priority = min(source, target)``
and the ``category`` / ``scope_id`` from the
lower-priority endpoint. Cross-category edges are
fully supported.
"""
relation_type = (
relation_type.strip().upper().replace(" ", "_")
)
if not relation_type:
raise ValueError("Relation type cannot be empty")
now = time.time()
q = (
f"MATCH (a {{uuid: $src_uuid}}) "
f"MATCH (b {{uuid: $tgt_uuid}}) "
f"MERGE (a)-[r:{relation_type}]->(b) "
f"ON CREATE SET "
f"r.src_uuid = $src_uuid, "
f"r.tgt_uuid = $tgt_uuid, "
f"r.weight = $w, r.description = $desc, "
f"r.priority = CASE "
f"WHEN a.priority < b.priority "
f"THEN a.priority "
f"ELSE b.priority END, "
f"r.category = CASE "
f"WHEN a.priority < b.priority "
f"THEN a.category "
f"ELSE b.category END, "
f"r.scope_id = CASE "
f"WHEN a.priority < b.priority "
f"THEN a.scope_id "
f"ELSE b.scope_id END, "
f"r.evidence = $ev, "
f"r.created_at = $now, "
f"r.updated_at = $now "
f"ON MATCH SET "
f"r.weight = CASE "
f"WHEN r.weight + 0.1 > 1.0 "
f"THEN 1.0 "
f"ELSE r.weight + 0.1 END, "
f"r.updated_at = $now "
f"RETURN type(r)"
)
params = {
"src_uuid": source_uuid,
"tgt_uuid": target_uuid,
"w": weight, "desc": description,
"ev": evidence, "now": now,
}
result = await self._graph.query(
q, params=params,
)
return len(result.result_set) > 0
[docs]
async def delete_relationship(
self,
source_uuid: str,
target_uuid: str,
relation_type: str,
) -> bool:
"""Delete the specified relationship.
Args:
source_uuid (str): The source uuid value.
target_uuid (str): The target uuid value.
relation_type (str): The relation type value.
Returns:
bool: True on success, False otherwise.
"""
relation_type = relation_type.upper()
q = (
f"MATCH (a {{uuid: $src_uuid}})"
f"-[r:{relation_type}]->"
f"(b {{uuid: $tgt_uuid}}) "
f"DELETE r RETURN count(r) AS deleted"
)
params = {
"src_uuid": source_uuid,
"tgt_uuid": target_uuid,
}
result = await self._graph.query(
q, params=params,
)
if result.result_set:
return result.result_set[0][0] > 0
return False
[docs]
async def list_relationships(
self,
entity_uuid: str | None = None,
relation_type: str | None = None,
category: str | None = None,
limit: int = 50,
order_by: bool = True,
timeout: int | None = None,
) -> list[dict]:
"""List relationships.
Args:
entity_uuid (str | None): The entity uuid value.
relation_type (str | None): The relation type value.
category (str | None): The category value.
limit (int): Maximum number of items.
order_by (bool): Sort by updated_at DESC. Disable for
large-graph visualization queries where the sort
dominates query time.
timeout (int | None): Per-query timeout in ms. When
``None`` the server default is used.
Returns:
list[dict]: The result.
"""
where_parts: list[str] = []
params: dict[str, Any] = {"lim": limit}
if entity_uuid:
where_parts.append(
"(a.uuid = $euuid OR b.uuid = $euuid)"
)
params["euuid"] = entity_uuid
if category:
where_parts.append("r.category = $cat")
params["cat"] = category
rel_pattern = (
f":{relation_type.upper()}"
if relation_type else ""
)
where = (
(" WHERE " + " AND ".join(where_parts))
if where_parts else ""
)
order_clause = "ORDER BY r.updated_at DESC " if order_by else ""
q = (
f"MATCH (a)-[r{rel_pattern}]->(b){where} "
f"RETURN a.name AS source, "
f"a.uuid AS source_uuid, "
f"labels(a)[0] AS source_type, "
f"type(r) AS relation, "
f"b.name AS target, "
f"b.uuid AS target_uuid, "
f"labels(b)[0] AS target_type, "
f"r.weight AS weight, "
f"r.description AS description, "
f"r.category AS category, "
f"r.priority AS priority "
f"{order_clause}"
f"LIMIT $lim"
)
query_kwargs: dict[str, Any] = {"params": params}
if timeout is not None:
query_kwargs["timeout"] = timeout
result = await self._graph.query(q, **query_kwargs)
rels = []
for row in result.result_set:
rels.append({
"source": row[0],
"source_uuid": row[1],
"source_type": row[2],
"relation": row[3],
"target": row[4],
"target_uuid": row[5],
"target_type": row[6],
"weight": row[7],
"description": row[8],
"category": row[9],
"priority": row[10],
})
return rels
# ------------------------------------------------------------------
# Entity resolution
# ------------------------------------------------------------------
async def _resolve_or_create(
self,
name: str,
entity_type: str,
category: str,
scope_id: str,
description: str = "",
created_by: str = "unknown",
user_id: str = _SENTINEL_USER_ID,
embedding: list[float] | None = None,
) -> dict[str, str]:
"""Resolve an existing entity or create a new one.
Returns ``{"name": ..., "uuid": ...}``.
Resolution strategy:
1. Exact name match within same category+scope+type.
2. Vector similarity > 0.90 within same
category+scope+type.
3. Create new if no match.
"""
name_lower = name.strip().lower()
sid = scope_id or _NO_SCOPE
# Strategy 1: exact name match
q = (
f"MATCH (e:{entity_type} "
f"{{category: $cat, scope_id: $sid}}) "
f"WHERE toLower(e.name) = $name "
f"RETURN e.name, e.uuid LIMIT 1"
)
params = {
"cat": category,
"sid": sid,
"name": name_lower,
}
result = await self._graph.query(
q, params=params,
)
if result.result_set:
existing = result.result_set[0][0]
existing_uuid = result.result_set[0][1]
await self._reinforce_entity(
existing, entity_type, description,
)
return {
"name": existing, "uuid": existing_uuid,
}
# Strategy 2: vector similarity
if embedding is not None:
vec = embedding
else:
embed_text = (
f"{name}: {description}"
if description else name
)
vec = await self._embed(embed_text)
try:
vec_q = (
f"CALL db.idx.vector.queryNodes("
f"'{entity_type}', 'embedding', "
f"5, vecf32($vec)) "
f"YIELD node, score "
f"WHERE node.category = $cat "
f"AND node.scope_id = $sid "
f"AND score > 0.90 "
f"RETURN node.name, node.uuid, score "
f"ORDER BY score DESC LIMIT 1"
)
vec_params = {
"vec": vec,
"cat": category,
"sid": sid,
}
vec_result = await self._graph.query(
vec_q, params=vec_params,
)
if vec_result.result_set:
existing = vec_result.result_set[0][0]
existing_uuid = vec_result.result_set[0][1]
await self._reinforce_entity(
existing,
entity_type,
description,
)
return {
"name": existing,
"uuid": existing_uuid,
}
except Exception:
logger.debug(
"Vector resolution failed, "
"creating new entity",
exc_info=True,
)
# Strategy 3: create new (reuse vector to avoid re-embedding)
info = await self.add_entity(
name, entity_type, description,
category=category, scope_id=sid,
created_by=created_by,
user_id=user_id,
embedding=vec,
)
return info
async def _reinforce_entity(
self,
name: str,
entity_type: str,
new_description: str = "",
) -> None:
"""Increment mention_count and optionally merge
descriptions.
"""
now = time.time()
if new_description:
q = (
f"MATCH (e:{entity_type} "
f"{{name: $name}}) "
f"SET e.mention_count = "
f"e.mention_count + 1, "
f"e.updated_at = $now, "
f"e.description = CASE "
f"WHEN size(e.description) "
f"> size($desc) "
f"THEN e.description "
f"ELSE $desc END "
f"RETURN e.name"
)
params = {
"name": name,
"desc": new_description,
"now": now,
}
await self._graph.query(
q, params=params,
)
else:
q = (
f"MATCH (e:{entity_type} "
f"{{name: $name}}) "
f"SET e.mention_count = "
f"e.mention_count + 1, "
f"e.updated_at = $now "
f"RETURN e.name"
)
await self._graph.query(
q, params={"name": name, "now": now},
)
[docs]
async def resolve_entity_cross_category(
self,
name: str,
entity_type: str,
) -> dict | None:
"""Find an entity by name across all categories.
Used for cross-category linking.
"""
name_lower = name.strip().lower()
q = (
f"MATCH (e:{entity_type}) "
f"WHERE toLower(e.name) = $name "
f"RETURN e.name, e.category, "
f"e.scope_id, e.priority, e.uuid "
f"ORDER BY e.priority DESC LIMIT 1"
)
result = await self._graph.query(
q, params={"name": name_lower},
)
if result.result_set:
row = result.result_set[0]
return {
"name": row[0],
"category": row[1],
"scope_id": row[2],
"priority": row[3],
"uuid": row[4],
}
return None
# ------------------------------------------------------------------
# Retrieval: hybrid vector + graph traversal
# ------------------------------------------------------------------
[docs]
async def retrieve_context(
self,
query: str,
query_embedding: (
list[float] | np.ndarray | None
) = None,
user_ids: list[str] | None = None,
channel_id: str | None = None,
guild_id: str | None = None,
max_hops: int = 2,
max_per_user: int = 20,
max_channel: int = 15,
max_guild: int = 15,
max_general: int = 30,
seed_top_k: int = 5000,
) -> dict[str, list[dict]]:
from .retrieval import run_retrieve_context
return await run_retrieve_context(
self, query, query_embedding, user_ids, channel_id, guild_id,
max_hops, max_per_user, max_channel, max_guild, max_general,
seed_top_k,
)
async def _fetch_core_knowledge(self) -> list[dict]:
from .retrieval import run_fetch_core_knowledge
return await run_fetch_core_knowledge(self)
async def _fetch_basic_knowledge(
self, max_entities: int = 50,
) -> list[dict]:
from .retrieval import run_fetch_basic_knowledge
return await run_fetch_basic_knowledge(self, max_entities)
async def _fetch_pinned_entities(
self,
allowed: set[tuple[str, str]],
) -> list[dict]:
from .retrieval import run_fetch_pinned_entities
return await run_fetch_pinned_entities(self, allowed)
async def _seed_vector_search(
self,
q_vec: list[float],
allowed: set[tuple[str, str]],
top_k: int,
) -> list[dict]:
from .retrieval import run_seed_vector_search
return await run_seed_vector_search(self, q_vec, allowed, top_k)
async def _expand_graph(
self,
seed_uuids: list[str],
max_hops: int,
) -> list[dict]:
from .retrieval import run_expand_graph
return await run_expand_graph(self, seed_uuids, max_hops)
def _deconflict(
self, entities: list[dict],
) -> list[dict]:
from .retrieval import run_deconflict
return run_deconflict(entities)
[docs]
async def get_core_knowledge(self) -> list[dict]:
from .retrieval import run_get_core_knowledge
return await run_get_core_knowledge(self)
[docs]
async def search_entities(
self,
query: str,
query_embedding: list[float] | None = None,
category: str | None = None,
scope_id: str | None = None,
entity_type: str | None = None,
top_k: int = 10,
) -> list[dict]:
from .retrieval import run_search_entities
return await run_search_entities(
self, query, query_embedding, category, scope_id, entity_type,
top_k,
)
[docs]
async def get_graph_stats(self) -> dict:
"""Return high-level graph statistics.
Each sub-query runs independently so a timeout on one
(e.g. the expensive relationship-type scan) does not
prevent the other stats from being returned.
"""
_TIMEOUT = 30_000
stats: dict[str, Any] = {
"node_count": 0,
"label_count": 0,
"relationship_count": 0,
"entities_by_label": {},
"entities_by_category": {},
"relationships_by_type": {},
}
try:
nr = await self._graph.query(
"MATCH (n) "
"RETURN count(n) AS node_count, "
"count(DISTINCT labels(n)[0]) AS label_count",
timeout=_TIMEOUT,
)
if nr.result_set:
stats["node_count"] = nr.result_set[0][0]
stats["label_count"] = nr.result_set[0][1]
except Exception:
logger.warning("get_graph_stats: node count query failed", exc_info=True)
try:
rr = await self._graph.query(
"MATCH ()-[r]->() RETURN count(r) AS rel_count",
timeout=_TIMEOUT,
)
if rr.result_set:
stats["relationship_count"] = rr.result_set[0][0]
except Exception:
logger.warning("get_graph_stats: rel count query failed", exc_info=True)
try:
rl = await self._graph.query(
"MATCH (n) "
"RETURN labels(n)[0] AS label, count(n) AS cnt "
"ORDER BY cnt DESC",
timeout=_TIMEOUT,
)
stats["entities_by_label"] = {
str(row[0]): int(row[1])
for row in (rl.result_set or [])
}
except Exception:
logger.warning("get_graph_stats: per-label query failed", exc_info=True)
try:
rc = await self._graph.query(
"MATCH (n) "
"RETURN coalesce(n.category, 'unknown') AS category, "
"count(n) AS cnt "
"ORDER BY cnt DESC",
timeout=_TIMEOUT,
)
stats["entities_by_category"] = {
str(row[0]): int(row[1])
for row in (rc.result_set or [])
}
except Exception:
logger.warning("get_graph_stats: per-category query failed", exc_info=True)
try:
types_result = await self._graph.query(
"CALL db.relationshiptypes()",
timeout=_TIMEOUT,
)
rel_types = [
str(row[0])
for row in (types_result.result_set or [])
]
counts: dict[str, int] = {}
for rt in rel_types:
try:
cr = await self._graph.query(
f"MATCH ()-[r:{rt}]->() "
f"RETURN count(r) AS cnt",
timeout=_TIMEOUT,
)
cnt = (
int(cr.result_set[0][0])
if cr.result_set else 0
)
if cnt > 0:
counts[rt] = cnt
except Exception:
pass
stats["relationships_by_type"] = dict(
sorted(counts.items(), key=lambda x: -x[1])
)
except Exception:
logger.warning("get_graph_stats: per-rel-type query failed", exc_info=True)
return stats