kg_agentic_extraction

Agentic knowledge-graph extraction for bulk chat import.

Bulk agentic chat can use native Gemini (pool keys + gemini_kg_bulk_client) or OpenRouter (create_kg_bulk_openrouter_client()). A small read-only tool set is backed by KnowledgeGraphManager.

class kg_agentic_extraction.KgBulkLlmClient(*args, **kwargs)[source]

Bases: Protocol

Structural protocol for a bulk chunking and agentic-KG chat client.

Defines the minimal async surface — chat, count_input_tokens, and close — that the bulk-extraction code depends on, so either concrete backend can be used interchangeably without a shared base class. The two implementations are GeminiPoolToolChatClient (native Gemini over the shared embedding-pool keys with automatic function calling) and OpenRouterClient (OpenRouter HTTP). As a typing.Protocol it has no runtime behaviour and is never instantiated; it appears here only as the type of the bulk_client parameter of run_agentic_kg_extraction_chunk().

async chat(messages, user_id='', ctx=None, tool_names=None, validate_header=False, token_count=None, on_intermediate_text=None)[source]

Run one tool-enabled chat turn and return the model’s final text.

Structural (Protocol) declaration with no body — it documents the contract that both concrete bulk clients satisfy: GeminiPoolToolChatClient (native Gemini automatic function calling over the embed-pool keys) and OpenRouterClient (OpenRouter HTTP). The implementation is expected to drive the read-only KG tool loop, calling the registered kg_search_entities / kg_get_entity / kg_inspect_entity tools (which reach the KnowledgeGraphManager via ctx) for up to the client’s max_tool_rounds, then return the assistant’s final message as a plain string.

Within this module the only caller is run_agentic_kg_extraction_chunk(), which awaits bulk_client.chat with the assembled system/user messages and KG_BULK_TOOL_NAMES. Concrete chat methods are exercised more broadly across the codebase (e.g. message_processor, kg_consolidation, kg_extraction) but those go through other client instances, not this Protocol.

Parameters:
  • messages (list[dict[str, Any]]) – OpenAI-style {"role", "content"} message dicts forming the system + user prompt for this turn.

  • user_id (str) – Identifier of the speaker/owner for the turn; defaults to an empty string for bulk extraction.

  • ctx (ToolContext | None) – Tool execution context carrying kg_manager and ids so the read-only KG tools can run; may be None.

  • tool_names (list[str] | None) – Explicit subset of tool names the model may call this turn; None lets the client use its default registry view.

  • validate_header (bool) – Whether the implementation should enforce a structured header on the reply (unused/False for bulk KG).

  • token_count (int | None) – Optional pre-computed input token count the client may use to size context instead of recounting.

  • on_intermediate_text (Optional[Callable[[str], Awaitable[None]]]) – Optional async callback invoked with intermediate assistant text chunks as they stream.

Returns:

The model’s final assistant message text.

Return type:

str

async count_input_tokens(messages, *, gemini_model=None)[source]

Estimate the input token count for messages before sending.

Structural (Protocol) declaration with no body. Concrete clients are expected to call the provider’s token-counting endpoint (e.g. Gemini countTokens) so bulk callers can pre-size chunks and decide whether a conversation fits in the model’s context window. May return None when no estimate is available.

This Protocol method has no callers inside this module; concrete implementations are awaited by the bulk-import sizing paths (kg_bulk_runner.py via a token counter and GeminiPoolToolChatClient internally).

Parameters:
Returns:

The estimated input token count, or None when the implementation cannot produce one.

Return type:

int | None

async close()[source]

Release any network/session resources held by the client.

Structural (Protocol) declaration with no body. Concrete clients are expected to close their underlying HTTP session or pooled connections so bulk runs shut down cleanly. Idempotent by convention.

Not called for this Protocol inside this module; concrete close methods are awaited during teardown by the bulk runner (kg_bulk_runner.py) and other lifecycle paths.

Return type:

None

Returns:

None.

kg_agentic_extraction.build_platform_context_markdown(cfg)[source]

Build a human-readable, secret-free platform summary for the system prompt.

Walks the config’s enabled platforms and emits a markdown block describing each one (Matrix homeserver and bot id, Discord snowflake ids, and remaining non-secret settings) so the extraction model can interpret platform-specific identifiers in the chat log. Secret-looking and NCM-related setting keys are dropped via _is_secret_setting_key() and _settings_key_is_ncm_related(), NCM platforms are skipped via _platform_type_is_ncm(), and structured values are summarized rather than dumped. Reads only the passed config object; no Redis, KG, or network access. Falls back to explicit placeholder text for a missing config, no platform entries, an all-NCM set, or all-disabled platforms.

Called by render_kg_agentic_system_prompt() and _system_prompt_cache_key() (both in this module) to inject and fingerprint the platform context; no external callers.

Parameters:

cfg (Any | None) – Loaded application config (or None), inspected for platforms and legacy Matrix homeserver / user_id attributes.

Returns:

A markdown summary of enabled chat platforms, or an explanatory placeholder string when no usable platform context exists.

Return type:

str

kg_agentic_extraction.render_kg_agentic_system_prompt(cfg=None)[source]

Render the agentic-extraction system prompt from its Jinja2 template.

Loads prompts/kg_agentic_extraction_system.j2 (resolved via _system_template_path()) in a sandboxed, non-autoescaping LoggingSandboxedEnvironment, injecting the secret-free platform_context produced by build_platform_context_markdown() (passed through sanitize_context()). Reads the template file from disk; if it is missing, logs a warning and returns a hard-coded fallback prompt with the same platform context appended. No Redis, KG, or network access.

Called only by load_kg_agentic_system_prompt(), which memoizes the result; no external callers.

Parameters:

cfg (Any | None) – Loaded application config (or None) forwarded to build_platform_context_markdown().

Returns:

The fully rendered system prompt text (or the fallback prompt when the template file is absent).

Return type:

str

kg_agentic_extraction.load_kg_agentic_system_prompt(config=None)[source]

Return the rendered system prompt, memoized by template mtime and config.

Computes a cache key via _system_prompt_cache_key() (template mtime plus a hash of platform context and channel hints) and, on a miss, renders and stores the prompt in the module-level _SYSTEM_PROMPT_CACHE dict via render_kg_agentic_system_prompt(). This avoids re-rendering the Jinja2 template for every chunk while still picking up template or config edits. Touches only that in-process cache; no Redis, KG, or network access.

Called by messages_for_agentic_token_estimate() and run_agentic_kg_extraction_chunk() (both in this module) to assemble the system message; no external callers.

Parameters:

config (Any | None) – Loaded application config (or None) used both as the cache fingerprint input and for rendering.

Returns:

The rendered (and now cached) system prompt text.

Return type:

str

kg_agentic_extraction.format_chunk_channels_section(channel_pairs, cfg, default_channel_scope, channel_metadata=None)[source]

Build the markdown Channels in this chunk section for the user prompt.

Lists each (platform, channel_id) pair appearing in the chunk, enriched with optional operator-supplied descriptions from the config’s kg_extraction_channel_hints and any resolved channel name/topic from channel_metadata, then appends the chunk’s default_channel_scope_id for scoped facts. This orients the model about which rooms/sources the log lines come from. Reads only the passed config object and metadata dict; no Redis, KG, or network access. Returns a placeholder section when no channel pairs are resolved.

Called by build_kg_bulk_user_message() (in this module) while assembling the user message; no external callers.

Parameters:
  • channel_pairs (list[tuple[str, str]]) – (platform, channel_id) tuples present in the chunk.

  • cfg (Any | None) – Loaded application config (or None), read for kg_extraction_channel_hints.

  • default_channel_scope (str) – Channel scope id used as the chunk default for scoped facts.

  • channel_metadata (dict[str, dict[str, str]] | None) – Optional "platform:channel_id" to {"name", "topic"} map for resolved channel display info.

Returns:

The markdown channels section, joined with newlines.

Return type:

str

kg_agentic_extraction.format_chunk_speakers_section(speaker_pairs)[source]

Build the markdown Speakers in this chunk section for the user prompt.

Deduplicates the (user_id, display_name) pairs (first non-empty name wins per id, blanks become ?) and renders them as a sorted bullet list, so the model has the roster of participants for the chunk. Pure string formatting with no external side effects. Returns a placeholder section when there are no usable speakers.

Called by build_kg_bulk_user_message() (in this module); no external callers.

Parameters:

speaker_pairs (list[tuple[str, str]]) – (user_id, display_name) tuples for the chunk.

Returns:

The markdown speakers section, joined with newlines.

Return type:

str

kg_agentic_extraction.format_speaker_user_id_mapping_markdown(speaker_pairs)[source]

Render a markdown user_id to display-name table for the chunk.

Deduplicates the speaker pairs (first non-empty name wins per id, blanks become ?), escapes pipe characters in names, and emits a markdown table that helps the model map raw sender ids in the log lines to human names. Returns an empty string when there are no usable speakers. Pure string formatting with no external side effects.

Called by augment_system_prompt_with_speaker_mapping() (in this module) to build the block appended to the system prompt; no external callers.

Parameters:

speaker_pairs (list[tuple[str, str]]) – (user_id, display_name) tuples for the chunk.

Returns:

The markdown table block, or an empty string when no speakers remain after deduplication.

Return type:

str

kg_agentic_extraction.augment_system_prompt_with_speaker_mapping(system, speaker_pairs)[source]

Append a per-chunk user_id to display-name table to the system prompt.

Builds a markdown mapping block via format_speaker_user_id_mapping_markdown() and concatenates it onto the rendered system prompt so the model can resolve sender ids in the log lines. Returns the prompt unchanged when no speakers are supplied (empty block).

Called internally by both messages_for_agentic_token_estimate() and run_agentic_kg_extraction_chunk() to assemble the final system message; no external callers.

Parameters:
  • system (str) – The base rendered system prompt text.

  • speaker_pairs (list[tuple[str, str]] | None) – List of (user_id, display_name) tuples for the chunk, or None.

Returns:

The system prompt with the speaker-mapping block appended, or the original system when there is nothing to add.

Return type:

str

async kg_agentic_extraction.prefetch_speaker_kg_context(kg, speakers, *, max_speakers=8, hits_per_speaker=3, min_score=0.0)[source]

Prefetch existing KG entities for a chunk’s speakers as user-prompt context.

For each unique speaker (capped at max_speakers) runs two vector searches on the knowledge graph via kg.search_entities — one scoped to the speaker’s user category and id, and one general query over name and id — then deduplicates hits by uuid, drops anything below min_score, and formats the survivors into a markdown block. This seeds the model with likely already-known entities so it reuses them instead of creating duplicates. It reads the KG only (FalkorDB/vector index behind KnowledgeGraphManager) and never mutates it; each search is wrapped so a failure is logged at debug and skipped rather than raised. Returns an empty string when there are no speakers or no surviving hits.

Called by kg_bulk_runner.run_agentic_kg_bulk (in kg_bulk_runner.py) when speaker prefetch is enabled; the result is passed through as the speaker_kg_prefetch argument to the user-message builder.

Parameters:
  • kg (Any) – A KnowledgeGraphManager (typed Any) exposing an async search_entities method.

  • speakers (list[tuple[str, str]]) – (user_id, display_name) tuples for the chunk.

  • max_speakers (int) – Maximum distinct speakers to prefetch, clamped to 1-128.

  • hits_per_speaker (int) – Vector hits requested per search, clamped to 1-48.

  • min_score (float) – Minimum similarity score a hit must meet to be included.

Returns:

A markdown Existing knowledge graph (speakers prefetch) block, or an empty string when nothing relevant was found.

Return type:

str

kg_agentic_extraction.build_kg_bulk_user_message(conversation_text, *, channel_id, chunk_index, time_start_iso='', time_end_iso='', platforms_channels='', config=None, chunk_channel_pairs=None, chunk_speaker_pairs=None, speaker_kg_prefetch='', channel_metadata=None)[source]

Assemble the full user message for one agentic extraction chunk.

Concatenates chunk metadata (index, default channel scope, UTC time range, and optional corpus-wide platform/channel list), the channel section from format_chunk_channels_section(), the speaker section from format_chunk_speakers_section(), any speaker-KG prefetch text, the chronological conversation, and a final instruction to emit the JSON extraction. This is the single source of truth for the user turn so token estimation and the real run see the same shape. Pure string assembly with no Redis, KG, or network access.

Called by messages_for_agentic_token_estimate() and run_agentic_kg_extraction_chunk() (both in this module); no external callers.

Parameters:
  • conversation_text (str) – The chronological chat transcript for the chunk.

  • channel_id (str) – Default channel scope id for scoped facts.

  • chunk_index (int) – Zero-based index of this chunk.

  • time_start_iso (str) – ISO-8601 start of the chunk’s time range (optional).

  • time_end_iso (str) – ISO-8601 end of the chunk’s time range (optional).

  • platforms_channels (str) – Optional summary of all platforms/channels in the full corpus.

  • config (Any | None) – Loaded application config (or None) forwarded to the section builders.

  • chunk_channel_pairs (list[tuple[str, str]] | None) – (platform, channel_id) pairs in the chunk.

  • chunk_speaker_pairs (list[tuple[str, str]] | None) – (user_id, display_name) pairs in the chunk.

  • speaker_kg_prefetch (str) – Optional prefetched speaker-KG markdown block.

  • channel_metadata (dict[str, dict[str, str]] | None) – Optional per-channel name/topic metadata map.

Returns:

The assembled user message, joined with newlines.

Return type:

str

kg_agentic_extraction.messages_for_agentic_token_estimate(conversation_text, *, channel_id, chunk_index=0, time_start_iso='', time_end_iso='', platforms_channels='', config=None, chunk_channel_pairs=None, chunk_speaker_pairs=None, speaker_kg_prefetch='', channel_metadata=None)[source]

Build the system + user message pair used to pre-count input tokens.

Produces the same OpenAI-style [system, user] message list that a real extraction run sends — the system prompt from load_kg_agentic_system_prompt() augmented with the speaker mapping via augment_system_prompt_with_speaker_mapping(), and the user message from build_kg_bulk_user_message() — so a token count over these messages accurately reflects the run. Pure assembly with no Redis, KG, or network access (it does not itself call countTokens).

Called by kg_bulk_runner (in kg_bulk_runner.py) to size chunks before dispatching them to the bulk client’s token counter; no other callers.

Parameters:
  • conversation_text (str) – The chronological chat transcript for the chunk.

  • channel_id (str) – Default channel scope id for scoped facts.

  • chunk_index (int) – Zero-based index of this chunk (default 0).

  • time_start_iso (str) – ISO-8601 start of the chunk’s time range (optional).

  • time_end_iso (str) – ISO-8601 end of the chunk’s time range (optional).

  • platforms_channels (str) – Optional corpus-wide platform/channel summary.

  • config (Any | None) – Loaded application config (or None) forwarded to the builders.

  • chunk_channel_pairs (list[tuple[str, str]] | None) – (platform, channel_id) pairs in the chunk.

  • chunk_speaker_pairs (list[tuple[str, str]] | None) – (user_id, display_name) pairs in the chunk.

  • speaker_kg_prefetch (str) – Optional prefetched speaker-KG markdown block.

  • channel_metadata (dict[str, dict[str, str]] | None) – Optional per-channel name/topic metadata map.

Returns:

A two-element [system, user] message list.

Return type:

list[dict[str, str]]

kg_agentic_extraction.build_kg_bulk_tool_registry()[source]

Build a registry of the read-only KG tools the bulk agent may call.

Creates a ToolRegistry (with no task manager) and registers the three read-only knowledge-graph tools — kg_search_entities, kg_get_entity, and kg_inspect_entity — as nested closures decorated with @reg.tool. Each tool reaches the KnowledgeGraphManager through the ctx passed at call time and only queries the graph, never mutating it. The returned registry is handed to whichever bulk client drives the tool loop. Building the registry itself has no Redis, KG, or network side effects.

Called by create_kg_bulk_gemini_pool_client() and create_kg_bulk_openrouter_client() (both in this module) to supply each client’s tool_registry; no external callers.

Returns:

A registry exposing the read-only KG search and inspection tools.

Return type:

ToolRegistry

kg_agentic_extraction.kg_bulk_native_model_id()[source]

Return the native Gemini model id for the bulk chat model.

Strips the OpenRouter-style google/ vendor prefix off KG_BULK_CHAT_MODEL so the native Gemini client (which expects a bare model id) can be configured from the same constant the OpenRouter path uses. Pure string transform with no side effects.

Called by create_kg_bulk_gemini_pool_client() (in this module) to set the pool client’s model_id; no external callers.

Returns:

The bulk chat model id without the google/ prefix.

Return type:

str

kg_agentic_extraction.create_kg_bulk_gemini_pool_client(*, max_tool_rounds=48, max_tokens=60000, max_tool_output_chars=3000000, temperature=0.25)[source]

Construct a native-Gemini bulk chat client backed by the embed-pool keys.

Lazily imports and instantiates GeminiPoolToolChatClient, wiring it to the read-only KG tool registry from build_kg_bulk_tool_registry(), the bare model id from kg_bulk_native_model_id(), and the supplied generation limits. This client talks to Gemini directly over the shared embedding-pool API keys and uses native automatic function calling for the tool loop; the underlying network/session resources are established by the client, not here.

Called by kg_bulk_runner (in kg_bulk_runner.py) to create both the token-counter client (with max_tool_rounds=1) and the main bulk client; no other callers.

Parameters:
  • max_tool_rounds (int) – Maximum tool-call rounds per chat turn (default 48).

  • max_tokens (int) – Maximum output tokens per turn (default 60000).

  • max_tool_output_chars (int) – Cap on characters of tool output fed back to the model (default 3000000).

  • temperature (float) – Sampling temperature (default 0.25).

Returns:

A configured native-Gemini bulk chat client.

Return type:

GeminiPoolToolChatClient

kg_agentic_extraction.create_kg_bulk_openrouter_client(api_key, *, gemini_api_key='', max_tool_rounds=48, max_tokens=60000, max_tool_output_chars=3000000, temperature=0.25, top_p=0.99)[source]

Construct an OpenRouter-backed bulk chat client for KG extraction.

Instantiates OpenRouterClient against the OpenRouter base URL KG_BULK_OPENROUTER_BASE and model KG_BULK_CHAT_MODEL (gemini-3.1-flash-lite), wiring in the read-only KG tool registry from build_kg_bulk_tool_registry() plus the supplied generation and tool-loop limits. The client drives the KG tool loop over OpenRouter HTTP; an optional gemini_api_key lets it use native Gemini token counting. Constructing the client does not itself open a connection.

Called by kg_bulk_runner (in kg_bulk_runner.py) to create both the token-counter client (with max_tool_rounds=1) and the main bulk client when the OpenRouter backend is selected; no other callers.

Parameters:
  • api_key (str) – OpenRouter API key for authentication.

  • gemini_api_key (str) – Optional Gemini API key enabling native token counting.

  • max_tool_rounds (int) – Maximum tool-call rounds per chat turn (default 48).

  • max_tokens (int) – Maximum output tokens per turn (default 60000).

  • max_tool_output_chars (int) – Cap on characters of tool output fed back to the model (default 3000000).

  • temperature (float) – Sampling temperature (default 0.25).

  • top_p (float) – Nucleus-sampling top-p (default 0.99).

Returns:

A configured OpenRouter bulk chat client.

Return type:

OpenRouterClient

async kg_agentic_extraction.run_agentic_kg_extraction_chunk(*, conversation_text, channel_id, kg_manager, bulk_client, user_id='000000000000', chunk_index=0, time_start_iso='', time_end_iso='', platforms_channels='', config=None, chunk_channel_pairs=None, chunk_speaker_pairs=None, speaker_kg_prefetch='', channel_metadata=None, persist_extraction=True)[source]

Run one agentic KG-extraction pass over a single conversation chunk.

Assembles the system message (via load_kg_agentic_system_prompt() and augment_system_prompt_with_speaker_mapping()) and user message (via build_kg_bulk_user_message()), builds a ToolContext bound to kg_manager, and awaits bulk_client.chat with the read-only KG tool names. The model may search/inspect the graph during its tool rounds, and its final reply is parsed with _parse_llm_json(). When persist_extraction is true the parsed entities and relationships are written to the knowledge graph through apply_parsed_extraction() (created by system:kg_agentic_bulk); when it is false the model still runs (read-only tools included) but nothing is written and the raw output is printed via _print_dry_run_llm_output(). LLM-call failures, empty replies, and JSON parse errors are caught, logged, and reported as a stats dict with parse_error set rather than raised. Interacts with the LLM (Gemini AFC or OpenRouter HTTP behind bulk_client) and, on persistence, with the KG (FalkorDB/vector index behind KnowledgeGraphManager).

Called by kg_bulk_runner.run_agentic_kg_bulk (in kg_bulk_runner.py), which awaits it once per chunk; no other callers.

Parameters:
  • conversation_text (str) – The chronological chat transcript for the chunk.

  • channel_id (str) – Default channel scope id for scoped facts and the tool context.

  • kg_manager (KnowledgeGraphManager) – The KnowledgeGraphManager the tools and persistence operate against.

  • bulk_client (KgBulkLlmClient) – The bulk LLM client (native Gemini or OpenRouter) satisfying KgBulkLlmClient that drives the tool-enabled chat turn.

  • user_id (str) – Speaker/owner id for the turn and persisted facts (default the zero placeholder).

  • chunk_index (int) – Zero-based index of this chunk.

  • time_start_iso (str) – ISO-8601 start of the chunk’s time range (optional).

  • time_end_iso (str) – ISO-8601 end of the chunk’s time range (optional).

  • platforms_channels (str) – Optional corpus-wide platform/channel summary.

  • config (Any | None) – Loaded application config (or None) forwarded to prompt assembly.

  • chunk_channel_pairs (list[tuple[str, str]] | None) – (platform, channel_id) pairs in the chunk.

  • chunk_speaker_pairs (list[tuple[str, str]] | None) – (user_id, display_name) pairs in the chunk.

  • speaker_kg_prefetch (str) – Optional prefetched speaker-KG markdown block.

  • channel_metadata (dict[str, dict[str, str]] | None) – Optional per-channel name/topic metadata map.

  • persist_extraction (bool) – When True, apply parsed results to the graph; when False, run read-only and only report proposed counts.

Returns:

A stats dict. On success it carries the persistence outcome (added entity/relationship counts and persisted/parse_error flags, or proposed counts in dry-run mode); on failure it reports errors with parse_error set.

Return type:

dict[str, Any]