knowledge_graph
FalkorDB-backed knowledge graph with hybrid vector+graph retrieval.
- class knowledge_graph.KnowledgeGraphManager(redis_client, openrouter, embedding_model='google/gemini-embedding-001', admin_user_ids=None, dedup_threshold=0.9)[source]
Bases:
objectManages the FalkorDB knowledge graph for entity/relationship CRUD and hybrid retrieval.
- Parameters:
redis_client (aioredis.Redis)
openrouter (OpenRouterClient)
embedding_model (str)
dedup_threshold (float)
- GRAPH_NAME = 'knowledge'
- __init__(redis_client, openrouter, embedding_model='google/gemini-embedding-001', admin_user_ids=None, dedup_threshold=0.9)[source]
Initialize the instance.
- property indexes_ready: bool
True once
ensure_indexes()has completed all phases.Retrieval code checks this flag before issuing HNSW KNN queries; if False it means the index warm-up is still in progress and vector search will return no results from FalkorDB.
- async wait_for_foreground_idle()[source]
Block until no foreground queries are pending or executing.
Called by background workers before each query attempt. Because background callers hold _query_semaphore for only one query at a time, a foreground caller that arrives mid-background-query only waits for the current background query to finish (not the entire batch).
- Return type:
- async wait_for_idle()[source]
Alias for
wait_for_foreground_idle().- Return type:
- async query(q, params=None, is_background=False, **kwargs)[source]
Execute a Cypher query against the knowledge graph.
Foreground callers (is_background=False) increment the waiter counter before acquiring the concurrency semaphore so that background workers will yield to them at their next boundary.
- async ro_query(q, params=None, is_background=False, **kwargs)[source]
Execute a read-only Cypher query against the knowledge graph.
Proxies the underlying FalkorDB ro_query method. Applies the same concurrency and priority logic as
query().
- async ensure_indexes()[source]
Create vector + range indexes for every entity label.
Optimized with two pre-flight checks to eliminate unnecessary FalkorDB round-trips on warm restarts:
Strategy 1 — existence pre-check:
CALL db.indexes()is issued once before the Phase 1 loop. Labels whose HNSW index is already present are skipped entirely; noCREATE VECTOR INDEXcall is made for them.Strategy 2 — zero-node skip: A single
MATCHaggregation counts nodes per label. Labels with zero nodes are also skipped because building an empty index wastes a full server round-trip.Strategy 4 — readiness flag:
self._indexes_readyis set toTrueonly after all phases complete successfully. Retrieval code checks this flag before issuing KNN queries.
Both pre-checks fail safe: if the introspection query errors (e.g. older FalkorDB that lacks
db.indexes()), the corresponding skip-set is empty and all labels are attempted as before.- Return type:
- async add_entity(name, entity_type, description, category='general', scope_id='_', created_by='unknown', pinned=False, metadata='{}', user_id='000000000000', embedding=None)[source]
Create or update an entity.
Returns
{"name": ..., "uuid": ...}.
- async update_entity_description(name, entity_type, new_description, category=None, scope_id=None)[source]
Update an entity’s description and re-embed.
- async edit_entity(uuid, description=None, append_text=None, pinned=None, category=None, metadata_updates=None)[source]
Selectively update fields on an existing entity.
Looks up by uuid. Only the provided fields are changed; everything else is preserved.
description replaces the text entirely. append_text is concatenated to the existing description. (Mutually exclusive – caller must pick one.)
metadata_updates is shallow-merged into the existing metadata JSON (new keys added, existing overwritten, unmentioned preserved).
Returns the full updated entity dict via
get_entity(), orNoneif the UUID was not found.
- async delete_entity(name, entity_type, category, scope_id='_')[source]
Delete the specified entity.
- async delete_entity_by_uuid(uuid)[source]
Delete an entity by UUID (detach-deletes all relationships).
- async pin_entity(name, entity_type, pinned=True, category=None, scope_id=None)[source]
Set or clear the pinned flag on an entity.
When category and/or scope_id are provided, only entities matching those filters are updated. This avoids pinning the wrong entity when the same name exists in multiple scopes.
- async get_entity(name='', entity_type=None, category=None, scope_id=None, uuid=None)[source]
Fetch an entity with its immediate connections.
Can look up by name or by uuid.
- async inspect_entity(name='', uuid=None, max_depth=2, neighbor_limit=50)[source]
Deep inspection of an entity and its full neighborhood.
Returns the entity’s properties plus all outgoing and incoming relationships (up to max_depth hops), with each neighbor’s core properties included.
- async list_entities(entity_type=None, category=None, scope_id=None, limit=50, offset=0, search=None)[source]
List entities with optional filtering, pagination, and text search.
- async add_relationship(source_uuid, target_uuid, relation_type, weight=0.5, description='', evidence='')[source]
Create or reinforce a relationship between two entities identified by UUID.
The edge inherits
priority = min(source, target)and thecategory/scope_idfrom the lower-priority endpoint. Cross-category edges are fully supported.
- async delete_relationship(source_uuid, target_uuid, relation_type)[source]
Delete the specified relationship.
- async list_relationships(entity_uuid=None, relation_type=None, category=None, limit=50, order_by=True, timeout=None)[source]
List relationships.
- async resolve_entity_cross_category(name, entity_type)[source]
Find an entity by name across all categories.
Used for cross-category linking.
- async retrieve_context(query, query_embedding=None, user_ids=None, channel_id=None, guild_id=None, max_hops=2, max_per_user=60, max_channel=15, max_guild=15, max_general=30, max_per_lore=20, seed_top_k=64, seed_similarity_threshold=0.38, seed_limit=15, min_edge_weight=0.0, default_edge_weight=0.8, semantic_hop_decay=0.8, expansion_neighbor_limit=500, dynamic_threshold_enabled=True, dynamic_threshold_target_ratio=0.1, dynamic_threshold_min=0.2, dynamic_threshold_min_stored=5, full_user_memory_ids=None, user_seed_min=10, user_candidate_limit=100, lore_candidate_limit=40, lore_seed_min=5, lore_amplified=False, max_per_meta=20, meta_candidate_limit=40, meta_seed_min=5, meta_amplified=False)[source]
- Return type:
- Parameters:
query (str)
channel_id (str | None)
guild_id (str | None)
max_hops (int)
max_per_user (int)
max_channel (int)
max_guild (int)
max_general (int)
max_per_lore (int)
seed_top_k (int)
seed_similarity_threshold (float)
seed_limit (int)
min_edge_weight (float)
default_edge_weight (float)
semantic_hop_decay (float)
expansion_neighbor_limit (int)
dynamic_threshold_enabled (bool)
dynamic_threshold_target_ratio (float)
dynamic_threshold_min (float)
dynamic_threshold_min_stored (int)
user_seed_min (int)
user_candidate_limit (int)
lore_candidate_limit (int)
lore_seed_min (int)
lore_amplified (bool)
max_per_meta (int)
meta_candidate_limit (int)
meta_seed_min (int)
meta_amplified (bool)
- async search_entities(query, query_embedding=None, category=None, scope_id=None, entity_type=None, top_k=10)[source]