kg_agentic_extraction
Agentic knowledge-graph extraction for bulk chat import.
Bulk agentic chat can use native Gemini (pool keys + gemini_kg_bulk_client)
or OpenRouter (create_kg_bulk_openrouter_client()). A small read-only tool
set is backed by KnowledgeGraphManager.
- class kg_agentic_extraction.KgBulkLlmClient(*args, **kwargs)[source]
Bases:
ProtocolStructural protocol for a bulk chunking and agentic-KG chat client.
Defines the minimal async surface —
chat,count_input_tokens, andclose— that the bulk-extraction code depends on, so either concrete backend can be used interchangeably without a shared base class. The two implementations areGeminiPoolToolChatClient(native Gemini over the shared embedding-pool keys with automatic function calling) andOpenRouterClient(OpenRouter HTTP). As atyping.Protocolit has no runtime behaviour and is never instantiated; it appears here only as the type of thebulk_clientparameter ofrun_agentic_kg_extraction_chunk().- async chat(messages, user_id='', ctx=None, tool_names=None, validate_header=False, token_count=None, on_intermediate_text=None)[source]
Run one tool-enabled chat turn and return the model’s final text.
Structural (Protocol) declaration with no body — it documents the contract that both concrete bulk clients satisfy:
GeminiPoolToolChatClient(native Gemini automatic function calling over the embed-pool keys) andOpenRouterClient(OpenRouter HTTP). The implementation is expected to drive the read-only KG tool loop, calling the registeredkg_search_entities/kg_get_entity/kg_inspect_entitytools (which reach theKnowledgeGraphManagerviactx) for up to the client’smax_tool_rounds, then return the assistant’s final message as a plain string.Within this module the only caller is
run_agentic_kg_extraction_chunk(), which awaitsbulk_client.chatwith the assembled system/user messages andKG_BULK_TOOL_NAMES. Concretechatmethods are exercised more broadly across the codebase (e.g.message_processor,kg_consolidation,kg_extraction) but those go through other client instances, not this Protocol.- Parameters:
messages (
list[dict[str,Any]]) – OpenAI-style{"role", "content"}message dicts forming the system + user prompt for this turn.user_id (
str) – Identifier of the speaker/owner for the turn; defaults to an empty string for bulk extraction.ctx (
ToolContext|None) – Tool execution context carryingkg_managerand ids so the read-only KG tools can run; may be None.tool_names (
list[str] |None) – Explicit subset of tool names the model may call this turn; None lets the client use its default registry view.validate_header (
bool) – Whether the implementation should enforce a structured header on the reply (unused/False for bulk KG).token_count (
int|None) – Optional pre-computed input token count the client may use to size context instead of recounting.on_intermediate_text (
Optional[Callable[[str],Awaitable[None]]]) – Optional async callback invoked with intermediate assistant text chunks as they stream.
- Returns:
The model’s final assistant message text.
- Return type:
- async count_input_tokens(messages, *, gemini_model=None)[source]
Estimate the input token count for messages before sending.
Structural (Protocol) declaration with no body. Concrete clients are expected to call the provider’s token-counting endpoint (e.g. Gemini
countTokens) so bulk callers can pre-size chunks and decide whether a conversation fits in the model’s context window. May return None when no estimate is available.This Protocol method has no callers inside this module; concrete implementations are awaited by the bulk-import sizing paths (
kg_bulk_runner.pyvia a token counter andGeminiPoolToolChatClientinternally).- Parameters:
- Returns:
The estimated input token count, or None when the implementation cannot produce one.
- Return type:
- async close()[source]
Release any network/session resources held by the client.
Structural (Protocol) declaration with no body. Concrete clients are expected to close their underlying HTTP session or pooled connections so bulk runs shut down cleanly. Idempotent by convention.
Not called for this Protocol inside this module; concrete
closemethods are awaited during teardown by the bulk runner (kg_bulk_runner.py) and other lifecycle paths.- Return type:
- Returns:
None.
- kg_agentic_extraction.build_platform_context_markdown(cfg)[source]
Build a human-readable, secret-free platform summary for the system prompt.
Walks the config’s enabled platforms and emits a markdown block describing each one (Matrix homeserver and bot id, Discord snowflake ids, and remaining non-secret settings) so the extraction model can interpret platform-specific identifiers in the chat log. Secret-looking and NCM-related setting keys are dropped via
_is_secret_setting_key()and_settings_key_is_ncm_related(), NCM platforms are skipped via_platform_type_is_ncm(), and structured values are summarized rather than dumped. Reads only the passed config object; no Redis, KG, or network access. Falls back to explicit placeholder text for a missing config, no platform entries, an all-NCM set, or all-disabled platforms.Called by
render_kg_agentic_system_prompt()and_system_prompt_cache_key()(both in this module) to inject and fingerprint the platform context; no external callers.
- kg_agentic_extraction.render_kg_agentic_system_prompt(cfg=None)[source]
Render the agentic-extraction system prompt from its Jinja2 template.
Loads
prompts/kg_agentic_extraction_system.j2(resolved via_system_template_path()) in a sandboxed, non-autoescapingLoggingSandboxedEnvironment, injecting the secret-freeplatform_contextproduced bybuild_platform_context_markdown()(passed throughsanitize_context()). Reads the template file from disk; if it is missing, logs a warning and returns a hard-coded fallback prompt with the same platform context appended. No Redis, KG, or network access.Called only by
load_kg_agentic_system_prompt(), which memoizes the result; no external callers.- Parameters:
cfg (
Any|None) – Loaded application config (or None) forwarded tobuild_platform_context_markdown().- Returns:
The fully rendered system prompt text (or the fallback prompt when the template file is absent).
- Return type:
- kg_agentic_extraction.load_kg_agentic_system_prompt(config=None)[source]
Return the rendered system prompt, memoized by template mtime and config.
Computes a cache key via
_system_prompt_cache_key()(template mtime plus a hash of platform context and channel hints) and, on a miss, renders and stores the prompt in the module-level_SYSTEM_PROMPT_CACHEdict viarender_kg_agentic_system_prompt(). This avoids re-rendering the Jinja2 template for every chunk while still picking up template or config edits. Touches only that in-process cache; no Redis, KG, or network access.Called by
messages_for_agentic_token_estimate()andrun_agentic_kg_extraction_chunk()(both in this module) to assemble the system message; no external callers.
- kg_agentic_extraction.format_chunk_channels_section(channel_pairs, cfg, default_channel_scope, channel_metadata=None)[source]
Build the markdown
Channels in this chunksection for the user prompt.Lists each
(platform, channel_id)pair appearing in the chunk, enriched with optional operator-supplied descriptions from the config’skg_extraction_channel_hintsand any resolved channel name/topic fromchannel_metadata, then appends the chunk’sdefault_channel_scope_idfor scoped facts. This orients the model about which rooms/sources the log lines come from. Reads only the passed config object and metadata dict; no Redis, KG, or network access. Returns a placeholder section when no channel pairs are resolved.Called by
build_kg_bulk_user_message()(in this module) while assembling the user message; no external callers.- Parameters:
channel_pairs (
list[tuple[str,str]]) –(platform, channel_id)tuples present in the chunk.cfg (
Any|None) – Loaded application config (or None), read forkg_extraction_channel_hints.default_channel_scope (
str) – Channel scope id used as the chunk default for scoped facts.channel_metadata (
dict[str,dict[str,str]] |None) – Optional"platform:channel_id"to{"name", "topic"}map for resolved channel display info.
- Returns:
The markdown channels section, joined with newlines.
- Return type:
- kg_agentic_extraction.format_chunk_speakers_section(speaker_pairs)[source]
Build the markdown
Speakers in this chunksection for the user prompt.Deduplicates the
(user_id, display_name)pairs (first non-empty name wins per id, blanks become?) and renders them as a sorted bullet list, so the model has the roster of participants for the chunk. Pure string formatting with no external side effects. Returns a placeholder section when there are no usable speakers.Called by
build_kg_bulk_user_message()(in this module); no external callers.
- kg_agentic_extraction.format_speaker_user_id_mapping_markdown(speaker_pairs)[source]
Render a markdown
user_idto display-name table for the chunk.Deduplicates the speaker pairs (first non-empty name wins per id, blanks become
?), escapes pipe characters in names, and emits a markdown table that helps the model map raw sender ids in the log lines to human names. Returns an empty string when there are no usable speakers. Pure string formatting with no external side effects.Called by
augment_system_prompt_with_speaker_mapping()(in this module) to build the block appended to the system prompt; no external callers.
- kg_agentic_extraction.augment_system_prompt_with_speaker_mapping(system, speaker_pairs)[source]
Append a per-chunk
user_idto display-name table to the system prompt.Builds a markdown mapping block via
format_speaker_user_id_mapping_markdown()and concatenates it onto the rendered system prompt so the model can resolve sender ids in the log lines. Returns the prompt unchanged when no speakers are supplied (empty block).Called internally by both
messages_for_agentic_token_estimate()andrun_agentic_kg_extraction_chunk()to assemble the final system message; no external callers.- Parameters:
- Returns:
The system prompt with the speaker-mapping block appended, or the original
systemwhen there is nothing to add.- Return type:
- async kg_agentic_extraction.prefetch_speaker_kg_context(kg, speakers, *, max_speakers=8, hits_per_speaker=3, min_score=0.0)[source]
Prefetch existing KG entities for a chunk’s speakers as user-prompt context.
For each unique speaker (capped at
max_speakers) runs two vector searches on the knowledge graph viakg.search_entities— one scoped to the speaker’susercategory and id, and one general query over name and id — then deduplicates hits by uuid, drops anything belowmin_score, and formats the survivors into a markdown block. This seeds the model with likely already-known entities so it reuses them instead of creating duplicates. It reads the KG only (FalkorDB/vector index behindKnowledgeGraphManager) and never mutates it; each search is wrapped so a failure is logged at debug and skipped rather than raised. Returns an empty string when there are no speakers or no surviving hits.Called by
kg_bulk_runner.run_agentic_kg_bulk(inkg_bulk_runner.py) when speaker prefetch is enabled; the result is passed through as thespeaker_kg_prefetchargument to the user-message builder.- Parameters:
kg (
Any) – AKnowledgeGraphManager(typedAny) exposing an asyncsearch_entitiesmethod.speakers (
list[tuple[str,str]]) –(user_id, display_name)tuples for the chunk.max_speakers (
int) – Maximum distinct speakers to prefetch, clamped to 1-128.hits_per_speaker (
int) – Vector hits requested per search, clamped to 1-48.min_score (
float) – Minimum similarity score a hit must meet to be included.
- Returns:
A markdown
Existing knowledge graph (speakers — prefetch)block, or an empty string when nothing relevant was found.- Return type:
- kg_agentic_extraction.build_kg_bulk_user_message(conversation_text, *, channel_id, chunk_index, time_start_iso='', time_end_iso='', platforms_channels='', config=None, chunk_channel_pairs=None, chunk_speaker_pairs=None, speaker_kg_prefetch='', channel_metadata=None)[source]
Assemble the full user message for one agentic extraction chunk.
Concatenates chunk metadata (index, default channel scope, UTC time range, and optional corpus-wide platform/channel list), the channel section from
format_chunk_channels_section(), the speaker section fromformat_chunk_speakers_section(), any speaker-KG prefetch text, the chronological conversation, and a final instruction to emit the JSON extraction. This is the single source of truth for the user turn so token estimation and the real run see the same shape. Pure string assembly with no Redis, KG, or network access.Called by
messages_for_agentic_token_estimate()andrun_agentic_kg_extraction_chunk()(both in this module); no external callers.- Parameters:
conversation_text (
str) – The chronological chat transcript for the chunk.channel_id (
str) – Default channel scope id for scoped facts.chunk_index (
int) – Zero-based index of this chunk.time_start_iso (
str) – ISO-8601 start of the chunk’s time range (optional).time_end_iso (
str) – ISO-8601 end of the chunk’s time range (optional).platforms_channels (
str) – Optional summary of all platforms/channels in the full corpus.config (
Any|None) – Loaded application config (or None) forwarded to the section builders.chunk_channel_pairs (
list[tuple[str,str]] |None) –(platform, channel_id)pairs in the chunk.chunk_speaker_pairs (
list[tuple[str,str]] |None) –(user_id, display_name)pairs in the chunk.speaker_kg_prefetch (
str) – Optional prefetched speaker-KG markdown block.channel_metadata (
dict[str,dict[str,str]] |None) – Optional per-channel name/topic metadata map.
- Returns:
The assembled user message, joined with newlines.
- Return type:
- kg_agentic_extraction.messages_for_agentic_token_estimate(conversation_text, *, channel_id, chunk_index=0, time_start_iso='', time_end_iso='', platforms_channels='', config=None, chunk_channel_pairs=None, chunk_speaker_pairs=None, speaker_kg_prefetch='', channel_metadata=None)[source]
Build the system + user message pair used to pre-count input tokens.
Produces the same OpenAI-style
[system, user]message list that a real extraction run sends — the system prompt fromload_kg_agentic_system_prompt()augmented with the speaker mapping viaaugment_system_prompt_with_speaker_mapping(), and the user message frombuild_kg_bulk_user_message()— so a token count over these messages accurately reflects the run. Pure assembly with no Redis, KG, or network access (it does not itself callcountTokens).Called by
kg_bulk_runner(inkg_bulk_runner.py) to size chunks before dispatching them to the bulk client’s token counter; no other callers.- Parameters:
conversation_text (
str) – The chronological chat transcript for the chunk.channel_id (
str) – Default channel scope id for scoped facts.chunk_index (
int) – Zero-based index of this chunk (default 0).time_start_iso (
str) – ISO-8601 start of the chunk’s time range (optional).time_end_iso (
str) – ISO-8601 end of the chunk’s time range (optional).platforms_channels (
str) – Optional corpus-wide platform/channel summary.config (
Any|None) – Loaded application config (or None) forwarded to the builders.chunk_channel_pairs (
list[tuple[str,str]] |None) –(platform, channel_id)pairs in the chunk.chunk_speaker_pairs (
list[tuple[str,str]] |None) –(user_id, display_name)pairs in the chunk.speaker_kg_prefetch (
str) – Optional prefetched speaker-KG markdown block.channel_metadata (
dict[str,dict[str,str]] |None) – Optional per-channel name/topic metadata map.
- Returns:
A two-element
[system, user]message list.- Return type:
- kg_agentic_extraction.build_kg_bulk_tool_registry()[source]
Build a registry of the read-only KG tools the bulk agent may call.
Creates a
ToolRegistry(with no task manager) and registers the three read-only knowledge-graph tools —kg_search_entities,kg_get_entity, andkg_inspect_entity— as nested closures decorated with@reg.tool. Each tool reaches theKnowledgeGraphManagerthrough thectxpassed at call time and only queries the graph, never mutating it. The returned registry is handed to whichever bulk client drives the tool loop. Building the registry itself has no Redis, KG, or network side effects.Called by
create_kg_bulk_gemini_pool_client()andcreate_kg_bulk_openrouter_client()(both in this module) to supply each client’stool_registry; no external callers.- Returns:
A registry exposing the read-only KG search and inspection tools.
- Return type:
- kg_agentic_extraction.kg_bulk_native_model_id()[source]
Return the native Gemini model id for the bulk chat model.
Strips the OpenRouter-style
google/vendor prefix offKG_BULK_CHAT_MODELso the native Gemini client (which expects a bare model id) can be configured from the same constant the OpenRouter path uses. Pure string transform with no side effects.Called by
create_kg_bulk_gemini_pool_client()(in this module) to set the pool client’smodel_id; no external callers.- Returns:
The bulk chat model id without the
google/prefix.- Return type:
- kg_agentic_extraction.create_kg_bulk_gemini_pool_client(*, max_tool_rounds=48, max_tokens=60000, max_tool_output_chars=3000000, temperature=0.25)[source]
Construct a native-Gemini bulk chat client backed by the embed-pool keys.
Lazily imports and instantiates
GeminiPoolToolChatClient, wiring it to the read-only KG tool registry frombuild_kg_bulk_tool_registry(), the bare model id fromkg_bulk_native_model_id(), and the supplied generation limits. This client talks to Gemini directly over the shared embedding-pool API keys and uses native automatic function calling for the tool loop; the underlying network/session resources are established by the client, not here.Called by
kg_bulk_runner(inkg_bulk_runner.py) to create both the token-counter client (withmax_tool_rounds=1) and the main bulk client; no other callers.- Parameters:
max_tool_rounds (
int) – Maximum tool-call rounds per chat turn (default 48).max_tokens (
int) – Maximum output tokens per turn (default 60000).max_tool_output_chars (
int) – Cap on characters of tool output fed back to the model (default 3000000).temperature (
float) – Sampling temperature (default 0.25).
- Returns:
A configured native-Gemini bulk chat client.
- Return type:
- kg_agentic_extraction.create_kg_bulk_openrouter_client(api_key, *, gemini_api_key='', max_tool_rounds=48, max_tokens=60000, max_tool_output_chars=3000000, temperature=0.25, top_p=0.99)[source]
Construct an OpenRouter-backed bulk chat client for KG extraction.
Instantiates
OpenRouterClientagainst the OpenRouter base URLKG_BULK_OPENROUTER_BASEand modelKG_BULK_CHAT_MODEL(gemini-3.1-flash-lite), wiring in the read-only KG tool registry frombuild_kg_bulk_tool_registry()plus the supplied generation and tool-loop limits. The client drives the KG tool loop over OpenRouter HTTP; an optionalgemini_api_keylets it use native Gemini token counting. Constructing the client does not itself open a connection.Called by
kg_bulk_runner(inkg_bulk_runner.py) to create both the token-counter client (withmax_tool_rounds=1) and the main bulk client when the OpenRouter backend is selected; no other callers.- Parameters:
api_key (
str) – OpenRouter API key for authentication.gemini_api_key (
str) – Optional Gemini API key enabling native token counting.max_tool_rounds (
int) – Maximum tool-call rounds per chat turn (default 48).max_tokens (
int) – Maximum output tokens per turn (default 60000).max_tool_output_chars (
int) – Cap on characters of tool output fed back to the model (default 3000000).temperature (
float) – Sampling temperature (default 0.25).top_p (
float) – Nucleus-sampling top-p (default 0.99).
- Returns:
A configured OpenRouter bulk chat client.
- Return type:
- async kg_agentic_extraction.run_agentic_kg_extraction_chunk(*, conversation_text, channel_id, kg_manager, bulk_client, user_id='000000000000', chunk_index=0, time_start_iso='', time_end_iso='', platforms_channels='', config=None, chunk_channel_pairs=None, chunk_speaker_pairs=None, speaker_kg_prefetch='', channel_metadata=None, persist_extraction=True)[source]
Run one agentic KG-extraction pass over a single conversation chunk.
Assembles the system message (via
load_kg_agentic_system_prompt()andaugment_system_prompt_with_speaker_mapping()) and user message (viabuild_kg_bulk_user_message()), builds aToolContextbound tokg_manager, and awaitsbulk_client.chatwith the read-only KG tool names. The model may search/inspect the graph during its tool rounds, and its final reply is parsed with_parse_llm_json(). Whenpersist_extractionis true the parsed entities and relationships are written to the knowledge graph throughapply_parsed_extraction()(created bysystem:kg_agentic_bulk); when it is false the model still runs (read-only tools included) but nothing is written and the raw output is printed via_print_dry_run_llm_output(). LLM-call failures, empty replies, and JSON parse errors are caught, logged, and reported as a stats dict withparse_errorset rather than raised. Interacts with the LLM (Gemini AFC or OpenRouter HTTP behindbulk_client) and, on persistence, with the KG (FalkorDB/vector index behindKnowledgeGraphManager).Called by
kg_bulk_runner.run_agentic_kg_bulk(inkg_bulk_runner.py), which awaits it once per chunk; no other callers.- Parameters:
conversation_text (
str) – The chronological chat transcript for the chunk.channel_id (
str) – Default channel scope id for scoped facts and the tool context.kg_manager (
KnowledgeGraphManager) – TheKnowledgeGraphManagerthe tools and persistence operate against.bulk_client (
KgBulkLlmClient) – The bulk LLM client (native Gemini or OpenRouter) satisfyingKgBulkLlmClientthat drives the tool-enabled chat turn.user_id (
str) – Speaker/owner id for the turn and persisted facts (default the zero placeholder).chunk_index (
int) – Zero-based index of this chunk.time_start_iso (
str) – ISO-8601 start of the chunk’s time range (optional).time_end_iso (
str) – ISO-8601 end of the chunk’s time range (optional).platforms_channels (
str) – Optional corpus-wide platform/channel summary.config (
Any|None) – Loaded application config (or None) forwarded to prompt assembly.chunk_channel_pairs (
list[tuple[str,str]] |None) –(platform, channel_id)pairs in the chunk.chunk_speaker_pairs (
list[tuple[str,str]] |None) –(user_id, display_name)pairs in the chunk.speaker_kg_prefetch (
str) – Optional prefetched speaker-KG markdown block.channel_metadata (
dict[str,dict[str,str]] |None) – Optional per-channel name/topic metadata map.persist_extraction (
bool) – When True, apply parsed results to the graph; when False, run read-only and only report proposed counts.
- Returns:
A stats dict. On success it carries the persistence outcome (added entity/relationship counts and
persisted/parse_errorflags, or proposed counts in dry-run mode); on failure it reportserrorswithparse_errorset.- Return type: