knowledge_graph

FalkorDB-backed knowledge graph with hybrid vector+graph retrieval.

class knowledge_graph.KnowledgeGraphManager(redis_client, openrouter, embedding_model='google/gemini-embedding-001', admin_user_ids=None, dedup_threshold=0.9)[source]

Bases: object

Manages the FalkorDB knowledge graph for entity/relationship CRUD and hybrid retrieval.

Parameters:
GRAPH_NAME = 'knowledge'
__init__(redis_client, openrouter, embedding_model='google/gemini-embedding-001', admin_user_ids=None, dedup_threshold=0.9)[source]

Initialize the instance.

Parameters:
  • redis_client (Redis) – Redis connection client.

  • openrouter (OpenRouterClient) – The openrouter value.

  • embedding_model (str) – The embedding model value.

  • admin_user_ids (set[str] | None) – The admin user ids value.

  • dedup_threshold (float)

Return type:

None

property indexes_ready: bool

True once ensure_indexes() has completed all phases.

Retrieval code checks this flag before issuing HNSW KNN queries; if False it means the index warm-up is still in progress and vector search will return no results from FalkorDB.

async wait_for_foreground_idle()[source]

Block until no foreground queries are pending or executing.

Called by background workers before each query attempt. Because background callers hold _query_semaphore for only one query at a time, a foreground caller that arrives mid-background-query only waits for the current background query to finish (not the entire batch).

Return type:

None

async wait_for_idle()[source]

Alias for wait_for_foreground_idle().

Return type:

None

async query(q, params=None, is_background=False, **kwargs)[source]

Execute a Cypher query against the knowledge graph.

Foreground callers (is_background=False) increment the waiter counter before acquiring the concurrency semaphore so that background workers will yield to them at their next boundary.

Return type:

Any

Parameters:
async ro_query(q, params=None, is_background=False, **kwargs)[source]

Execute a read-only Cypher query against the knowledge graph.

Proxies the underlying FalkorDB ro_query method. Applies the same concurrency and priority logic as query().

Return type:

Any

Parameters:
async embed(text)[source]

Public proxy for the internal _embed method.

Return type:

list[float]

Parameters:

text (str)

async embed_batch(texts)[source]

Public proxy for the internal _embed_batch method.

Return type:

list[list[float]]

Parameters:

texts (list[str])

async ensure_indexes()[source]

Create vector + range indexes for every entity label.

Optimized with two pre-flight checks to eliminate unnecessary FalkorDB round-trips on warm restarts:

  • Strategy 1 — existence pre-check: CALL db.indexes() is issued once before the Phase 1 loop. Labels whose HNSW index is already present are skipped entirely; no CREATE VECTOR INDEX call is made for them.

  • Strategy 2 — zero-node skip: A single MATCH aggregation counts nodes per label. Labels with zero nodes are also skipped because building an empty index wastes a full server round-trip.

  • Strategy 4 — readiness flag: self._indexes_ready is set to True only after all phases complete successfully. Retrieval code checks this flag before issuing KNN queries.

Both pre-checks fail safe: if the introspection query errors (e.g. older FalkorDB that lacks db.indexes()), the corresponding skip-set is empty and all labels are attempted as before.

Return type:

None

async add_entity(name, entity_type, description, category='general', scope_id='_', created_by='unknown', pinned=False, metadata='{}', user_id='000000000000', embedding=None)[source]

Create or update an entity.

Returns {"name": ..., "uuid": ...}.

Return type:

dict[str, str]

Parameters:
async update_entity_description(name, entity_type, new_description, category=None, scope_id=None)[source]

Update an entity’s description and re-embed.

Return type:

bool

Parameters:
  • name (str)

  • entity_type (str)

  • new_description (str)

  • category (str | None)

  • scope_id (str | None)

async edit_entity(uuid, description=None, append_text=None, pinned=None, category=None, metadata_updates=None)[source]

Selectively update fields on an existing entity.

Looks up by uuid. Only the provided fields are changed; everything else is preserved.

description replaces the text entirely. append_text is concatenated to the existing description. (Mutually exclusive – caller must pick one.)

metadata_updates is shallow-merged into the existing metadata JSON (new keys added, existing overwritten, unmentioned preserved).

Returns the full updated entity dict via get_entity(), or None if the UUID was not found.

Return type:

dict | None

Parameters:
  • uuid (str)

  • description (str | None)

  • append_text (str | None)

  • pinned (bool | None)

  • category (str | None)

  • metadata_updates (dict | None)

async delete_entity(name, entity_type, category, scope_id='_')[source]

Delete the specified entity.

Parameters:
  • name (str) – Human-readable name.

  • entity_type (str) – The entity type value.

  • category (str) – The category value.

  • scope_id (str) – The scope id value.

Returns:

True on success, False otherwise.

Return type:

bool

async delete_entity_by_uuid(uuid)[source]

Delete an entity by UUID (detach-deletes all relationships).

Return type:

bool

Parameters:

uuid (str)

async pin_entity(name, entity_type, pinned=True, category=None, scope_id=None)[source]

Set or clear the pinned flag on an entity.

When category and/or scope_id are provided, only entities matching those filters are updated. This avoids pinning the wrong entity when the same name exists in multiple scopes.

Return type:

bool

Parameters:
  • name (str)

  • entity_type (str)

  • pinned (bool)

  • category (str | None)

  • scope_id (str | None)

async get_entity(name='', entity_type=None, category=None, scope_id=None, uuid=None)[source]

Fetch an entity with its immediate connections.

Can look up by name or by uuid.

Return type:

dict | None

Parameters:
  • name (str)

  • entity_type (str | None)

  • category (str | None)

  • scope_id (str | None)

  • uuid (str | None)

async inspect_entity(name='', uuid=None, max_depth=2, neighbor_limit=50)[source]

Deep inspection of an entity and its full neighborhood.

Returns the entity’s properties plus all outgoing and incoming relationships (up to max_depth hops), with each neighbor’s core properties included.

Return type:

dict | None

Parameters:
  • name (str)

  • uuid (str | None)

  • max_depth (int)

  • neighbor_limit (int)

async list_entities(entity_type=None, category=None, scope_id=None, limit=50, offset=0, search=None)[source]

List entities with optional filtering, pagination, and text search.

Return type:

list[dict]

Parameters:
  • entity_type (str | None)

  • category (str | None)

  • scope_id (str | None)

  • limit (int)

  • offset (int)

  • search (str | None)

async add_relationship(source_uuid, target_uuid, relation_type, weight=0.5, description='', evidence='')[source]

Create or reinforce a relationship between two entities identified by UUID.

The edge inherits priority = min(source, target) and the category / scope_id from the lower-priority endpoint. Cross-category edges are fully supported.

Return type:

bool

Parameters:
  • source_uuid (str)

  • target_uuid (str)

  • relation_type (str)

  • weight (float)

  • description (str)

  • evidence (str)

async delete_relationship(source_uuid, target_uuid, relation_type)[source]

Delete the specified relationship.

Parameters:
  • source_uuid (str) – The source uuid value.

  • target_uuid (str) – The target uuid value.

  • relation_type (str) – The relation type value.

Returns:

True on success, False otherwise.

Return type:

bool

async list_relationships(entity_uuid=None, relation_type=None, category=None, limit=50, order_by=True, timeout=None)[source]

List relationships.

Parameters:
  • entity_uuid (str | None) – The entity uuid value.

  • relation_type (str | None) – The relation type value.

  • category (str | None) – The category value.

  • limit (int) – Maximum number of items.

  • order_by (bool) – Sort by updated_at DESC. Disable for large-graph visualization queries where the sort dominates query time.

  • timeout (int | None) – Per-query timeout in ms. When None the server default is used.

Returns:

The result.

Return type:

list[dict]

async resolve_entity_cross_category(name, entity_type)[source]

Find an entity by name across all categories.

Used for cross-category linking.

Return type:

dict | None

Parameters:
  • name (str)

  • entity_type (str)

async retrieve_context(query, query_embedding=None, user_ids=None, channel_id=None, guild_id=None, max_hops=2, max_per_user=60, max_channel=15, max_guild=15, max_general=30, max_per_lore=20, seed_top_k=64, seed_similarity_threshold=0.38, seed_limit=15, min_edge_weight=0.0, default_edge_weight=0.8, semantic_hop_decay=0.8, expansion_neighbor_limit=500, dynamic_threshold_enabled=True, dynamic_threshold_target_ratio=0.1, dynamic_threshold_min=0.2, dynamic_threshold_min_stored=5, full_user_memory_ids=None, user_seed_min=10, user_candidate_limit=100, lore_candidate_limit=40, lore_seed_min=5, lore_amplified=False, max_per_meta=20, meta_candidate_limit=40, meta_seed_min=5, meta_amplified=False)[source]
Return type:

dict[str, list[dict]]

Parameters:
  • query (str)

  • query_embedding (list[float] | ndarray | None)

  • user_ids (list[str] | None)

  • channel_id (str | None)

  • guild_id (str | None)

  • max_hops (int)

  • max_per_user (int)

  • max_channel (int)

  • max_guild (int)

  • max_general (int)

  • max_per_lore (int)

  • seed_top_k (int)

  • seed_similarity_threshold (float)

  • seed_limit (int)

  • min_edge_weight (float)

  • default_edge_weight (float)

  • semantic_hop_decay (float)

  • expansion_neighbor_limit (int)

  • dynamic_threshold_enabled (bool)

  • dynamic_threshold_target_ratio (float)

  • dynamic_threshold_min (float)

  • dynamic_threshold_min_stored (int)

  • full_user_memory_ids (list[str] | None)

  • user_seed_min (int)

  • user_candidate_limit (int)

  • lore_candidate_limit (int)

  • lore_seed_min (int)

  • lore_amplified (bool)

  • max_per_meta (int)

  • meta_candidate_limit (int)

  • meta_seed_min (int)

  • meta_amplified (bool)

async get_core_knowledge()[source]
Return type:

list[dict]

async search_entities(query, query_embedding=None, category=None, scope_id=None, entity_type=None, top_k=10)[source]
Return type:

list[dict]

Parameters:
  • query (str)

  • query_embedding (list[float] | None)

  • category (str | None)

  • scope_id (str | None)

  • entity_type (str | None)

  • top_k (int)

async reconsolidate_embeddings_on_recall(targets, query_embedding, *, learning_rate=0.03, max_step=0.25)[source]

Nudge recalled entities’ embeddings toward the query with a tanh clamp.

Fire-and-forget from prompt build; failures are logged at debug.

Return type:

None

Parameters:
async get_graph_stats()[source]

Return high-level graph statistics.

Each sub-query runs independently so a timeout on one (e.g. the expensive relationship-type scan) does not prevent the other stats from being returned.

Return type:

dict