kg_extraction

Knowledge-graph extraction from conversations.

Replaces background_agents/auto_memory_extraction.py. Uses an LLM to extract structured entities and relationships from conversation text, then writes them into the FalkorDB knowledge graph via KnowledgeGraphManager.

async kg_extraction.apply_parsed_extraction(data, kg_manager, channel_id, user_id='000000000000', created_by='system:extraction')[source]

Apply LLM extraction JSON (entities + relationships) to FalkorDB.

Each entity may include existing_uuid — if set, that UUID is registered for relationship resolution and no new node is created. Per-entity user_id overrides the default user_id for category user scope resolution.

Return type:

dict[str, Any]

Parameters:
async kg_extraction.extract_knowledge(conversation, openrouter, kg_manager, channel_id, guild_id=None, user_id='000000000000', conversation_char_limit=4000)[source]

Full extraction pipeline for a block of conversation text.

  1. Call LLM with extraction prompt

  2. Parse structured JSON

  3. Validate categories (reject core/guild)

  4. Resolve or create each entity

  5. Create/reinforce each relationship

  6. Return stats

Parameters:
  • conversation_char_limit (int | None) – If set, truncate conversation to this many characters before sending to the LLM. None means no truncation.

  • conversation (str)

  • openrouter (OpenRouterClient)

  • kg_manager (KnowledgeGraphManager)

  • channel_id (str)

  • guild_id (str | None)

  • user_id (str)

Return type:

dict[str, Any]

async kg_extraction.extract_from_message(message_text, user_id, user_name, channel_id, guild_id, openrouter, kg_manager, redis=None, per_user_limit=5)[source]

Per-message extraction with cheap pre-filtering.

This function is called fire-and-forget but is gated by three layers before any LLM call is made:

  1. Length gate – message must be >= 100 chars.

  2. Heuristic gate – message must match at least one regex pattern that signals knowledge content.

  3. Rate limit – max per_user_limit extractions per user per hour (via Redis INCR + EXPIRE).

Return type:

None

Parameters:
async kg_extraction.run_batch_extraction(redis, kg_manager, openrouter, messages_limit=100)[source]

Scan recent messages and extract KG entities.

Called periodically by the background scheduler.

Return type:

dict[str, Any]

Parameters: