Source code for knowledge_graph.manager

"""FalkorDB-backed knowledge graph with hybrid vector+graph retrieval.

Replaces the flat Redis memory system (``memory_manager.py``) with a
property graph that stores entities as nodes and relationships as typed
edges.  Each node carries a 3072-dim embedding vector indexed via HNSW
for semantic search, while multi-hop Cypher traversal connects related
facts across the graph.

The five-tier authority hierarchy (core > guild > channel > general > user)
is preserved as ``category`` / ``priority`` properties on every node and edge.
"""

from __future__ import annotations

import asyncio
import json
import logging
import time
from typing import Any, TYPE_CHECKING

import numpy as np
from falkordb.asyncio import FalkorDB
from uuid6 import uuid7

from .constants import (
    CATEGORY_PRIORITY,
    ENTITY_LABELS,
    RECOMMENDED_RELATION_TYPES,
    _NO_SCOPE,
    _PRIORITY_NORM,
    _SENTINEL_USER_ID,
    _vec_literal,
)

if TYPE_CHECKING:
    import redis.asyncio as aioredis
    from openrouter_client import OpenRouterClient

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# KnowledgeGraphManager
# ---------------------------------------------------------------------------

[docs] class KnowledgeGraphManager: """Manages the FalkorDB knowledge graph for entity/relationship CRUD and hybrid retrieval. """ GRAPH_NAME = "knowledge"
[docs] def __init__( self, redis_client: aioredis.Redis, openrouter: OpenRouterClient, embedding_model: str = "google/gemini-embedding-001", admin_user_ids: set[str] | None = None, ) -> None: """Initialize the instance. Args: redis_client (aioredis.Redis): Redis connection client. openrouter (OpenRouterClient): The openrouter value. embedding_model (str): The embedding model value. admin_user_ids (set[str] | None): The admin user ids value. """ self._redis = redis_client self._db = FalkorDB( connection_pool=redis_client.connection_pool, ) self._graph = self._db.select_graph(self.GRAPH_NAME) self._openrouter = openrouter self._embedding_model = embedding_model self._admin_user_ids: set[str] = ( admin_user_ids or set() )
# ------------------------------------------------------------------ # Embedding helper # ------------------------------------------------------------------ async def _embed(self, text: str) -> list[float]: """Embed text using Gemini direct API (free tier). Forces the Gemini-direct path to avoid paid OpenRouter embedding calls. The generic ``openrouter.embed()`` randomly routes through OpenRouter ~50% of the time. """ return await self._openrouter._embed_gemini( text, self._embedding_model, ) async def _embed_batch(self, texts: list[str]) -> list[list[float]]: """Batch-embed texts using Gemini direct batchEmbedContents API.""" if not texts: return [] return await self._openrouter._embed_gemini_batch( texts, self._embedding_model, ) # ------------------------------------------------------------------ # Index management # ------------------------------------------------------------------
[docs] async def ensure_indexes(self) -> None: """Create vector + range indexes for every entity label. This operation is idempotent. """ for label in ENTITY_LABELS: try: await self._graph.create_node_vector_index( label, "embedding", dim=3072, similarity_function="cosine", ) except Exception as exc: msg = str(exc).lower() if ( "already indexed" in msg or "already exists" in msg ): pass else: logger.warning( "Vector index creation for %s: %s", label, exc, ) for prop in ("name", "category", "scope_id", "pinned", "uuid", "user_id"): try: await self._graph.create_node_range_index(label, prop) except Exception as exc: msg = str(exc).lower() if "already exists" in msg or "already indexed" in msg: pass else: logger.warning( "Range index creation for %s.%s: %s", label, prop, exc, ) await self._backfill_user_ids() logger.info( "Knowledge graph indexes ensured for %d labels", len(ENTITY_LABELS), )
async def _backfill_user_ids(self) -> None: """Set ``user_id`` to the sentinel value on entities that lack one. Idempotent -- only touches nodes where ``user_id IS NULL``. """ for label in ENTITY_LABELS: try: q = ( f"MATCH (e:{label}) " f"WHERE e.user_id IS NULL " f"SET e.user_id = $uid " f"RETURN count(e)" ) result = await self._graph.query( q, params={"uid": _SENTINEL_USER_ID}, ) count = ( result.result_set[0][0] if result.result_set else 0 ) if count: logger.info( "Backfilled user_id on %d %s nodes", count, label, ) except Exception: logger.debug( "user_id backfill for %s failed", label, exc_info=True, ) # ------------------------------------------------------------------ # Entity CRUD # ------------------------------------------------------------------
[docs] async def add_entity( self, name: str, entity_type: str, description: str, category: str = "general", scope_id: str = "_", created_by: str = "unknown", pinned: bool = False, metadata: str = "{}", user_id: str = _SENTINEL_USER_ID, embedding: list[float] | None = None, ) -> dict[str, str]: """Create or update an entity. Returns ``{"name": ..., "uuid": ...}``. """ if entity_type not in ENTITY_LABELS: raise ValueError( f"Unknown entity type: {entity_type}" ) if category not in CATEGORY_PRIORITY: raise ValueError( f"Unknown category: {category}" ) priority = CATEGORY_PRIORITY[category] now = time.time() name_lower = name.strip().lower() new_uuid = str(uuid7()) if embedding is not None: vec = embedding else: embed_text = ( f"{name}: {description}" if description else name ) vec = await self._embed(embed_text) q = ( f"MERGE (e:{entity_type} {{name: $name, " f"scope_id: $sid, category: $cat}}) " f"ON CREATE SET " f"e.uuid = $uuid, " f"e.description = $desc, " f"e.priority = $pri, " f"e.embedding = vecf32($vec), " f"e.pinned = $pinned, " f"e.mention_count = 1, " f"e.created_at = $now, " f"e.updated_at = $now, " f"e.created_by = $creator, " f"e.user_id = $user_id, " f"e.metadata = $meta " f"ON MATCH SET " f"e.description = $desc, " f"e.embedding = vecf32($vec), " f"e.mention_count = e.mention_count + 1, " f"e.updated_at = $now " f"RETURN e.name, e.uuid" ) params = { "name": name_lower, "sid": scope_id or _NO_SCOPE, "cat": category, "desc": description, "pri": priority, "vec": vec, "pinned": pinned, "now": now, "creator": created_by, "uuid": new_uuid, "meta": metadata, "user_id": user_id or _SENTINEL_USER_ID, } result = await self._graph.query(q, params=params) row = result.result_set[0] if result.result_set else None return { "name": row[0] if row else name_lower, "uuid": row[1] if row else new_uuid, }
[docs] async def update_entity_description( self, name: str, entity_type: str, new_description: str, category: str | None = None, scope_id: str | None = None, ) -> bool: """Update an entity's description and re-embed.""" name_lower = name.strip().lower() embed_text = ( f"{name}: {new_description}" if new_description else name ) vec = await self._embed(embed_text) now = time.time() where_parts = ["e.name = $name"] params: dict[str, Any] = { "name": name_lower, "desc": new_description, "vec": vec, "now": now, } if category: where_parts.append("e.category = $cat") params["cat"] = category if scope_id: where_parts.append("e.scope_id = $sid") params["sid"] = scope_id where = " AND ".join(where_parts) q = ( f"MATCH (e:{entity_type}) WHERE {where} " f"SET e.description = $desc, " f"e.embedding = vecf32($vec), " f"e.updated_at = $now " f"RETURN e.name" ) result = await self._graph.query( q, params=params, ) return len(result.result_set) > 0
[docs] async def edit_entity( self, uuid: str, description: str | None = None, append_text: str | None = None, pinned: bool | None = None, category: str | None = None, metadata_updates: dict | None = None, ) -> dict | None: """Selectively update fields on an existing entity. Looks up by *uuid*. Only the provided fields are changed; everything else is preserved. *description* replaces the text entirely. *append_text* is concatenated to the existing description. (Mutually exclusive -- caller must pick one.) *metadata_updates* is shallow-merged into the existing metadata JSON (new keys added, existing overwritten, unmentioned preserved). Returns the full updated entity dict via :meth:`get_entity`, or ``None`` if the UUID was not found. """ # -- Read current state ------------------------------------------- read_q = ( "MATCH (e) WHERE e.uuid = $uuid " "RETURN e.name, labels(e)[0], " "e.description, e.metadata, e.category" ) result = await self._graph.query( read_q, params={"uuid": uuid}, ) if not result.result_set: return None row = result.result_set[0] cur_name: str = row[0] entity_type: str = row[1] cur_desc: str = row[2] or "" raw_meta: str = row[3] or "{}" cur_cat: str = row[4] or "general" try: cur_meta = json.loads(raw_meta) if isinstance( raw_meta, str, ) else {} except (json.JSONDecodeError, TypeError): cur_meta = {} # -- Compute new values ------------------------------------------- new_desc: str | None = None if description is not None: new_desc = description elif append_text: new_desc = ( (cur_desc + "\n" + append_text) if cur_desc else append_text ) new_meta: str | None = None if metadata_updates: merged = {**cur_meta, **metadata_updates} new_meta = json.dumps(merged, ensure_ascii=False) new_cat = category if new_cat and new_cat not in CATEGORY_PRIORITY: raise ValueError( f"Unknown category: {new_cat}" ) # -- Build SET clause dynamically --------------------------------- now = time.time() set_parts: list[str] = ["e.updated_at = $now"] params: dict[str, Any] = {"uuid": uuid, "now": now} if new_desc is not None: set_parts.append("e.description = $desc") params["desc"] = new_desc embed_text = ( f"{cur_name}: {new_desc}" if new_desc else cur_name ) vec = await self._embed(embed_text) set_parts.append("e.embedding = vecf32($vec)") params["vec"] = vec if pinned is not None: set_parts.append("e.pinned = $pinned") params["pinned"] = pinned if new_cat: set_parts.append("e.category = $cat") params["cat"] = new_cat set_parts.append("e.priority = $pri") params["pri"] = CATEGORY_PRIORITY[new_cat] if new_meta is not None: set_parts.append("e.metadata = $meta") params["meta"] = new_meta set_clause = ", ".join(set_parts) write_q = ( f"MATCH (e:{entity_type}) " f"WHERE e.uuid = $uuid " f"SET {set_clause} " f"RETURN e.uuid" ) await self._graph.query(write_q, params=params) return await self.get_entity(uuid=uuid)
[docs] async def delete_entity( self, name: str, entity_type: str, category: str, scope_id: str = "_", ) -> bool: """Delete the specified entity. Args: name (str): Human-readable name. entity_type (str): The entity type value. category (str): The category value. scope_id (str): The scope id value. Returns: bool: True on success, False otherwise. """ name_lower = name.strip().lower() q = ( f"MATCH (e:{entity_type} {{name: $name, " f"scope_id: $sid, category: $cat}}) " f"DETACH DELETE e " f"RETURN count(e) AS deleted" ) params = { "name": name_lower, "sid": scope_id, "cat": category, } result = await self._graph.query( q, params=params, ) if result.result_set: return result.result_set[0][0] > 0 return False
[docs] async def delete_entity_by_uuid( self, uuid: str, ) -> bool: """Delete an entity by UUID (detach-deletes all relationships).""" q = ( "MATCH (e {uuid: $uuid}) " "DETACH DELETE e " "RETURN count(e) AS deleted" ) result = await self._graph.query( q, params={"uuid": uuid}, ) if result.result_set: return result.result_set[0][0] > 0 return False
[docs] async def pin_entity( self, name: str, entity_type: str, pinned: bool = True, category: str | None = None, scope_id: str | None = None, ) -> bool: """Set or clear the pinned flag on an entity. When category and/or scope_id are provided, only entities matching those filters are updated. This avoids pinning the wrong entity when the same name exists in multiple scopes. """ name_lower = name.strip().lower() now = time.time() where_parts = ["e.name = $name"] params: dict[str, Any] = { "name": name_lower, "pinned": pinned, "now": now, } if category is not None: where_parts.append("e.category = $cat") params["cat"] = category if scope_id is not None: where_parts.append("e.scope_id = $sid") params["sid"] = scope_id or _NO_SCOPE where_clause = " AND ".join(where_parts) q = ( f"MATCH (e:{entity_type}) " f"WHERE {where_clause} " f"SET e.pinned = $pinned, " f"e.updated_at = $now " f"RETURN e.name" ) result = await self._graph.query(q, params=params) return len(result.result_set) > 0
[docs] async def get_entity( self, name: str = "", entity_type: str | None = None, category: str | None = None, scope_id: str | None = None, uuid: str | None = None, ) -> dict | None: """Fetch an entity with its immediate connections. Can look up by *name* or by *uuid*. """ label_filter = ( f":{entity_type}" if entity_type else "" ) where_parts: list[str] = [] params: dict[str, Any] = {} if uuid: where_parts.append("e.uuid = $uuid") params["uuid"] = uuid else: name_lower = name.strip().lower() where_parts.append("e.name = $name") params["name"] = name_lower if category: where_parts.append("e.category = $cat") params["cat"] = category if scope_id: where_parts.append("e.scope_id = $sid") params["sid"] = scope_id where = " AND ".join(where_parts) q = ( f"MATCH (e{label_filter}) WHERE {where} " f"OPTIONAL MATCH (e)-[r]-(neighbor) " f"RETURN e.name AS name, " f"labels(e)[0] AS type, " f"e.description AS description, " f"e.category AS category, " f"e.priority AS priority, " f"e.scope_id AS scope_id, " f"e.mention_count AS mention_count, " f"e.created_at AS created_at, " f"e.updated_at AS updated_at, " f"e.created_by AS created_by, " f"e.uuid AS uuid, " f"e.metadata AS metadata, " f"e.user_id AS user_id, " f"collect(DISTINCT {{rel: type(r), " f"target: neighbor.name, " f"target_uuid: neighbor.uuid, " f"target_type: labels(neighbor)[0], " f"target_category: neighbor.category, " f"weight: r.weight, " f"description: r.description, " f"priority: r.priority}}) AS connections " f"LIMIT 1" ) result = await self._graph.query( q, params=params, ) if not result.result_set: return None row = result.result_set[0] raw_meta = row[11] or "{}" try: meta = json.loads(raw_meta) if isinstance( raw_meta, str, ) else {} except (json.JSONDecodeError, TypeError): meta = {} entity_user_id = row[12] or _SENTINEL_USER_ID conns = row[13] if row[13] else [] conns = [ c for c in conns if c.get("rel") is not None ] return { "name": row[0], "type": row[1], "description": row[2], "category": row[3], "priority": row[4], "scope_id": row[5], "mention_count": row[6], "created_at": row[7], "updated_at": row[8], "created_by": row[9], "uuid": row[10], "metadata": meta, "user_id": entity_user_id, "connections": conns, }
[docs] async def inspect_entity( self, name: str = "", uuid: str | None = None, max_depth: int = 2, neighbor_limit: int = 50, ) -> dict | None: """Deep inspection of an entity and its full neighborhood. Returns the entity's properties plus all outgoing and incoming relationships (up to *max_depth* hops), with each neighbor's core properties included. """ where_parts: list[str] = [] params: dict[str, Any] = {} if uuid: where_parts.append("e.uuid = $uuid") params["uuid"] = uuid elif name: where_parts.append("e.name = $name") params["name"] = name.strip().lower() else: return None where = " AND ".join(where_parts) entity_q = ( f"MATCH (e) WHERE {where} " f"RETURN e.name, labels(e)[0], e.description, " f"e.category, e.priority, e.scope_id, " f"e.mention_count, e.created_at, e.updated_at, " f"e.created_by, e.uuid, e.metadata, e.user_id, " f"e.pinned " f"LIMIT 1" ) result = await self._graph.query(entity_q, params=params) if not result.result_set: return None row = result.result_set[0] raw_meta = row[11] or "{}" try: meta = ( json.loads(raw_meta) if isinstance(raw_meta, str) else {} ) except (json.JSONDecodeError, TypeError): meta = {} entity = { "name": row[0], "type": row[1], "description": row[2], "category": row[3], "priority": row[4], "scope_id": row[5], "mention_count": row[6], "created_at": row[7], "updated_at": row[8], "created_by": row[9], "uuid": row[10], "metadata": meta, "user_id": row[12] or _SENTINEL_USER_ID, "pinned": bool(row[13]), } entity_uuid = entity["uuid"] out_q = ( "MATCH (e {uuid: $uuid})-[r]->(t) " "RETURN type(r) AS rel, r.weight AS weight, " "r.description AS rdesc, r.priority AS rpri, " "t.name AS tname, labels(t)[0] AS ttype, " "t.uuid AS tuuid, t.category AS tcat, " "t.description AS tdesc " "ORDER BY r.weight DESC " "LIMIT $lim" ) out_result = await self._graph.query( out_q, params={"uuid": entity_uuid, "lim": neighbor_limit}, ) in_q = ( "MATCH (s)-[r]->(e {uuid: $uuid}) " "RETURN type(r) AS rel, r.weight AS weight, " "r.description AS rdesc, r.priority AS rpri, " "s.name AS sname, labels(s)[0] AS stype, " "s.uuid AS suuid, s.category AS scat, " "s.description AS sdesc " "ORDER BY r.weight DESC " "LIMIT $lim" ) in_result = await self._graph.query( in_q, params={"uuid": entity_uuid, "lim": neighbor_limit}, ) outgoing = [ { "relation": row[0], "weight": row[1], "rel_description": row[2], "rel_priority": row[3], "target_name": row[4], "target_type": row[5], "target_uuid": row[6], "target_category": row[7], "target_description": row[8], } for row in (out_result.result_set or []) ] incoming = [ { "relation": row[0], "weight": row[1], "rel_description": row[2], "rel_priority": row[3], "source_name": row[4], "source_type": row[5], "source_uuid": row[6], "source_category": row[7], "source_description": row[8], } for row in (in_result.result_set or []) ] # Optional: 2-hop neighbors (neighbors of neighbors) second_hop: list[dict] = [] if max_depth >= 2: hop2_q = ( "MATCH (e {uuid: $uuid})-[r1]-(n1)-[r2]-(n2) " "WHERE n2.uuid <> $uuid " "RETURN DISTINCT n2.name AS name, " "labels(n2)[0] AS type, " "n2.uuid AS uuid, n2.category AS cat, " "type(r2) AS via_rel, " "n1.name AS via_node " "LIMIT $lim" ) try: hop2 = await self._graph.query( hop2_q, params={ "uuid": entity_uuid, "lim": neighbor_limit, }, timeout=10_000, ) second_hop = [ { "name": row[0], "type": row[1], "uuid": row[2], "category": row[3], "via_relation": row[4], "via_node": row[5], } for row in (hop2.result_set or []) ] except Exception: logger.debug( "inspect_entity: 2-hop query timed out", exc_info=True, ) return { "entity": entity, "outgoing": outgoing, "incoming": incoming, "second_hop": second_hop, "summary": { "outgoing_count": len(outgoing), "incoming_count": len(incoming), "second_hop_count": len(second_hop), }, }
[docs] async def list_entities( self, entity_type: str | None = None, category: str | None = None, scope_id: str | None = None, limit: int = 50, ) -> list[dict]: """List entities with optional filtering.""" label_filter = ( f":{entity_type}" if entity_type else "" ) where_parts: list[str] = [] params: dict[str, Any] = {"lim": limit} if category: where_parts.append("e.category = $cat") params["cat"] = category if scope_id: where_parts.append("e.scope_id = $sid") params["sid"] = scope_id where = ( (" WHERE " + " AND ".join(where_parts)) if where_parts else "" ) q = ( f"MATCH (e{label_filter}){where} " f"RETURN e.name AS name, " f"labels(e)[0] AS type, " f"e.description AS description, " f"e.category AS category, " f"e.priority AS priority, " f"e.scope_id AS scope_id, " f"e.mention_count AS mention_count, " f"e.updated_at AS updated_at, " f"e.uuid AS uuid, " f"e.metadata AS metadata, " f"e.user_id AS user_id " f"ORDER BY e.updated_at DESC " f"LIMIT $lim" ) result = await self._graph.query( q, params=params, ) entities = [] for row in result.result_set: raw_meta = row[9] or "{}" try: meta = json.loads(raw_meta) if isinstance( raw_meta, str, ) else {} except (json.JSONDecodeError, TypeError): meta = {} entities.append({ "name": row[0], "type": row[1], "description": row[2], "category": row[3], "priority": row[4], "scope_id": row[5], "mention_count": row[6], "updated_at": row[7], "uuid": row[8], "metadata": meta, "user_id": row[10] or _SENTINEL_USER_ID, }) return entities
# ------------------------------------------------------------------ # Relationship CRUD # ------------------------------------------------------------------
[docs] async def add_relationship( self, source_uuid: str, target_uuid: str, relation_type: str, weight: float = 0.5, description: str = "", evidence: str = "", ) -> bool: """Create or reinforce a relationship between two entities identified by UUID. The edge inherits ``priority = min(source, target)`` and the ``category`` / ``scope_id`` from the lower-priority endpoint. Cross-category edges are fully supported. """ relation_type = ( relation_type.strip().upper().replace(" ", "_") ) if not relation_type: raise ValueError("Relation type cannot be empty") now = time.time() q = ( f"MATCH (a {{uuid: $src_uuid}}) " f"MATCH (b {{uuid: $tgt_uuid}}) " f"MERGE (a)-[r:{relation_type}]->(b) " f"ON CREATE SET " f"r.src_uuid = $src_uuid, " f"r.tgt_uuid = $tgt_uuid, " f"r.weight = $w, r.description = $desc, " f"r.priority = CASE " f"WHEN a.priority < b.priority " f"THEN a.priority " f"ELSE b.priority END, " f"r.category = CASE " f"WHEN a.priority < b.priority " f"THEN a.category " f"ELSE b.category END, " f"r.scope_id = CASE " f"WHEN a.priority < b.priority " f"THEN a.scope_id " f"ELSE b.scope_id END, " f"r.evidence = $ev, " f"r.created_at = $now, " f"r.updated_at = $now " f"ON MATCH SET " f"r.weight = CASE " f"WHEN r.weight + 0.1 > 1.0 " f"THEN 1.0 " f"ELSE r.weight + 0.1 END, " f"r.updated_at = $now " f"RETURN type(r)" ) params = { "src_uuid": source_uuid, "tgt_uuid": target_uuid, "w": weight, "desc": description, "ev": evidence, "now": now, } result = await self._graph.query( q, params=params, ) return len(result.result_set) > 0
[docs] async def delete_relationship( self, source_uuid: str, target_uuid: str, relation_type: str, ) -> bool: """Delete the specified relationship. Args: source_uuid (str): The source uuid value. target_uuid (str): The target uuid value. relation_type (str): The relation type value. Returns: bool: True on success, False otherwise. """ relation_type = relation_type.upper() q = ( f"MATCH (a {{uuid: $src_uuid}})" f"-[r:{relation_type}]->" f"(b {{uuid: $tgt_uuid}}) " f"DELETE r RETURN count(r) AS deleted" ) params = { "src_uuid": source_uuid, "tgt_uuid": target_uuid, } result = await self._graph.query( q, params=params, ) if result.result_set: return result.result_set[0][0] > 0 return False
[docs] async def list_relationships( self, entity_uuid: str | None = None, relation_type: str | None = None, category: str | None = None, limit: int = 50, order_by: bool = True, timeout: int | None = None, ) -> list[dict]: """List relationships. Args: entity_uuid (str | None): The entity uuid value. relation_type (str | None): The relation type value. category (str | None): The category value. limit (int): Maximum number of items. order_by (bool): Sort by updated_at DESC. Disable for large-graph visualization queries where the sort dominates query time. timeout (int | None): Per-query timeout in ms. When ``None`` the server default is used. Returns: list[dict]: The result. """ where_parts: list[str] = [] params: dict[str, Any] = {"lim": limit} if entity_uuid: where_parts.append( "(a.uuid = $euuid OR b.uuid = $euuid)" ) params["euuid"] = entity_uuid if category: where_parts.append("r.category = $cat") params["cat"] = category rel_pattern = ( f":{relation_type.upper()}" if relation_type else "" ) where = ( (" WHERE " + " AND ".join(where_parts)) if where_parts else "" ) order_clause = "ORDER BY r.updated_at DESC " if order_by else "" q = ( f"MATCH (a)-[r{rel_pattern}]->(b){where} " f"RETURN a.name AS source, " f"a.uuid AS source_uuid, " f"labels(a)[0] AS source_type, " f"type(r) AS relation, " f"b.name AS target, " f"b.uuid AS target_uuid, " f"labels(b)[0] AS target_type, " f"r.weight AS weight, " f"r.description AS description, " f"r.category AS category, " f"r.priority AS priority " f"{order_clause}" f"LIMIT $lim" ) query_kwargs: dict[str, Any] = {"params": params} if timeout is not None: query_kwargs["timeout"] = timeout result = await self._graph.query(q, **query_kwargs) rels = [] for row in result.result_set: rels.append({ "source": row[0], "source_uuid": row[1], "source_type": row[2], "relation": row[3], "target": row[4], "target_uuid": row[5], "target_type": row[6], "weight": row[7], "description": row[8], "category": row[9], "priority": row[10], }) return rels
# ------------------------------------------------------------------ # Entity resolution # ------------------------------------------------------------------ async def _resolve_or_create( self, name: str, entity_type: str, category: str, scope_id: str, description: str = "", created_by: str = "unknown", user_id: str = _SENTINEL_USER_ID, embedding: list[float] | None = None, ) -> dict[str, str]: """Resolve an existing entity or create a new one. Returns ``{"name": ..., "uuid": ...}``. Resolution strategy: 1. Exact name match within same category+scope+type. 2. Vector similarity > 0.90 within same category+scope+type. 3. Create new if no match. """ name_lower = name.strip().lower() sid = scope_id or _NO_SCOPE # Strategy 1: exact name match q = ( f"MATCH (e:{entity_type} " f"{{category: $cat, scope_id: $sid}}) " f"WHERE toLower(e.name) = $name " f"RETURN e.name, e.uuid LIMIT 1" ) params = { "cat": category, "sid": sid, "name": name_lower, } result = await self._graph.query( q, params=params, ) if result.result_set: existing = result.result_set[0][0] existing_uuid = result.result_set[0][1] await self._reinforce_entity( existing, entity_type, description, ) return { "name": existing, "uuid": existing_uuid, } # Strategy 2: vector similarity if embedding is not None: vec = embedding else: embed_text = ( f"{name}: {description}" if description else name ) vec = await self._embed(embed_text) try: vec_q = ( f"CALL db.idx.vector.queryNodes(" f"'{entity_type}', 'embedding', " f"5, vecf32($vec)) " f"YIELD node, score " f"WHERE node.category = $cat " f"AND node.scope_id = $sid " f"AND score > 0.90 " f"RETURN node.name, node.uuid, score " f"ORDER BY score DESC LIMIT 1" ) vec_params = { "vec": vec, "cat": category, "sid": sid, } vec_result = await self._graph.query( vec_q, params=vec_params, ) if vec_result.result_set: existing = vec_result.result_set[0][0] existing_uuid = vec_result.result_set[0][1] await self._reinforce_entity( existing, entity_type, description, ) return { "name": existing, "uuid": existing_uuid, } except Exception: logger.debug( "Vector resolution failed, " "creating new entity", exc_info=True, ) # Strategy 3: create new (reuse vector to avoid re-embedding) info = await self.add_entity( name, entity_type, description, category=category, scope_id=sid, created_by=created_by, user_id=user_id, embedding=vec, ) return info async def _reinforce_entity( self, name: str, entity_type: str, new_description: str = "", ) -> None: """Increment mention_count and optionally merge descriptions. """ now = time.time() if new_description: q = ( f"MATCH (e:{entity_type} " f"{{name: $name}}) " f"SET e.mention_count = " f"e.mention_count + 1, " f"e.updated_at = $now, " f"e.description = CASE " f"WHEN size(e.description) " f"> size($desc) " f"THEN e.description " f"ELSE $desc END " f"RETURN e.name" ) params = { "name": name, "desc": new_description, "now": now, } await self._graph.query( q, params=params, ) else: q = ( f"MATCH (e:{entity_type} " f"{{name: $name}}) " f"SET e.mention_count = " f"e.mention_count + 1, " f"e.updated_at = $now " f"RETURN e.name" ) await self._graph.query( q, params={"name": name, "now": now}, )
[docs] async def resolve_entity_cross_category( self, name: str, entity_type: str, ) -> dict | None: """Find an entity by name across all categories. Used for cross-category linking. """ name_lower = name.strip().lower() q = ( f"MATCH (e:{entity_type}) " f"WHERE toLower(e.name) = $name " f"RETURN e.name, e.category, " f"e.scope_id, e.priority, e.uuid " f"ORDER BY e.priority DESC LIMIT 1" ) result = await self._graph.query( q, params={"name": name_lower}, ) if result.result_set: row = result.result_set[0] return { "name": row[0], "category": row[1], "scope_id": row[2], "priority": row[3], "uuid": row[4], } return None
# ------------------------------------------------------------------ # Retrieval: hybrid vector + graph traversal # ------------------------------------------------------------------
[docs] async def retrieve_context( self, query: str, query_embedding: ( list[float] | np.ndarray | None ) = None, user_ids: list[str] | None = None, channel_id: str | None = None, guild_id: str | None = None, max_hops: int = 2, max_per_user: int = 20, max_channel: int = 15, max_guild: int = 15, max_general: int = 30, seed_top_k: int = 5000, ) -> dict[str, list[dict]]: from .retrieval import run_retrieve_context return await run_retrieve_context( self, query, query_embedding, user_ids, channel_id, guild_id, max_hops, max_per_user, max_channel, max_guild, max_general, seed_top_k, )
async def _fetch_core_knowledge(self) -> list[dict]: from .retrieval import run_fetch_core_knowledge return await run_fetch_core_knowledge(self) async def _fetch_basic_knowledge( self, max_entities: int = 50, ) -> list[dict]: from .retrieval import run_fetch_basic_knowledge return await run_fetch_basic_knowledge(self, max_entities) async def _fetch_pinned_entities( self, allowed: set[tuple[str, str]], ) -> list[dict]: from .retrieval import run_fetch_pinned_entities return await run_fetch_pinned_entities(self, allowed) async def _seed_vector_search( self, q_vec: list[float], allowed: set[tuple[str, str]], top_k: int, ) -> list[dict]: from .retrieval import run_seed_vector_search return await run_seed_vector_search(self, q_vec, allowed, top_k) async def _expand_graph( self, seed_uuids: list[str], max_hops: int, ) -> list[dict]: from .retrieval import run_expand_graph return await run_expand_graph(self, seed_uuids, max_hops) def _deconflict( self, entities: list[dict], ) -> list[dict]: from .retrieval import run_deconflict return run_deconflict(entities)
[docs] async def get_core_knowledge(self) -> list[dict]: from .retrieval import run_get_core_knowledge return await run_get_core_knowledge(self)
[docs] async def search_entities( self, query: str, query_embedding: list[float] | None = None, category: str | None = None, scope_id: str | None = None, entity_type: str | None = None, top_k: int = 10, ) -> list[dict]: from .retrieval import run_search_entities return await run_search_entities( self, query, query_embedding, category, scope_id, entity_type, top_k, )
[docs] async def get_graph_stats(self) -> dict: """Return high-level graph statistics. Each sub-query runs independently so a timeout on one (e.g. the expensive relationship-type scan) does not prevent the other stats from being returned. """ _TIMEOUT = 30_000 stats: dict[str, Any] = { "node_count": 0, "label_count": 0, "relationship_count": 0, "entities_by_label": {}, "entities_by_category": {}, "relationships_by_type": {}, } try: nr = await self._graph.query( "MATCH (n) " "RETURN count(n) AS node_count, " "count(DISTINCT labels(n)[0]) AS label_count", timeout=_TIMEOUT, ) if nr.result_set: stats["node_count"] = nr.result_set[0][0] stats["label_count"] = nr.result_set[0][1] except Exception: logger.warning("get_graph_stats: node count query failed", exc_info=True) try: rr = await self._graph.query( "MATCH ()-[r]->() RETURN count(r) AS rel_count", timeout=_TIMEOUT, ) if rr.result_set: stats["relationship_count"] = rr.result_set[0][0] except Exception: logger.warning("get_graph_stats: rel count query failed", exc_info=True) try: rl = await self._graph.query( "MATCH (n) " "RETURN labels(n)[0] AS label, count(n) AS cnt " "ORDER BY cnt DESC", timeout=_TIMEOUT, ) stats["entities_by_label"] = { str(row[0]): int(row[1]) for row in (rl.result_set or []) } except Exception: logger.warning("get_graph_stats: per-label query failed", exc_info=True) try: rc = await self._graph.query( "MATCH (n) " "RETURN coalesce(n.category, 'unknown') AS category, " "count(n) AS cnt " "ORDER BY cnt DESC", timeout=_TIMEOUT, ) stats["entities_by_category"] = { str(row[0]): int(row[1]) for row in (rc.result_set or []) } except Exception: logger.warning("get_graph_stats: per-category query failed", exc_info=True) try: types_result = await self._graph.query( "CALL db.relationshiptypes()", timeout=_TIMEOUT, ) rel_types = [ str(row[0]) for row in (types_result.result_set or []) ] counts: dict[str, int] = {} for rt in rel_types: try: cr = await self._graph.query( f"MATCH ()-[r:{rt}]->() " f"RETURN count(r) AS cnt", timeout=_TIMEOUT, ) cnt = ( int(cr.result_set[0][0]) if cr.result_set else 0 ) if cnt > 0: counts[rt] = cnt except Exception: pass stats["relationships_by_type"] = dict( sorted(counts.items(), key=lambda x: -x[1]) ) except Exception: logger.warning("get_graph_stats: per-rel-type query failed", exc_info=True) return stats