kg_agentic_extraction

Agentic knowledge-graph extraction for bulk chat import.

Bulk agentic chat can use native Gemini (pool keys + gemini_kg_bulk_client) or OpenRouter (create_kg_bulk_openrouter_client()). A small read-only tool set is backed by KnowledgeGraphManager.

class kg_agentic_extraction.KgBulkLlmClient(*args, **kwargs)[source]

Bases: Protocol

Structural type for bulk chunking + agentic KG (OpenRouter or native Gemini).

async chat(messages, user_id='', ctx=None, tool_names=None, validate_header=False, token_count=None, on_intermediate_text=None)[source]
Return type:

str

Parameters:
async count_input_tokens(messages, *, gemini_model=None)[source]
Return type:

int | None

Parameters:
async close()[source]
Return type:

None

kg_agentic_extraction.build_platform_context_markdown(cfg)[source]

Human-readable platform summary for the system prompt (no secrets).

Return type:

str

Parameters:

cfg (Any | None)

kg_agentic_extraction.render_kg_agentic_system_prompt(cfg=None)[source]

Render the Jinja2 system prompt including platform context.

Return type:

str

Parameters:

cfg (Any | None)

kg_agentic_extraction.load_kg_agentic_system_prompt(config=None)[source]

Rendered system prompt (cached per template mtime + config fingerprint).

Return type:

str

Parameters:

config (Any | None)

kg_agentic_extraction.format_chunk_channels_section(channel_pairs, cfg, default_channel_scope, channel_metadata=None)[source]

Describe which rooms/sources appear in this chunk.

Return type:

str

Parameters:
kg_agentic_extraction.format_chunk_speakers_section(speaker_pairs)[source]

Unique speakers (user_id, display name) in this chunk, sorted by id.

Return type:

str

Parameters:

speaker_pairs (list[tuple[str, str]])

kg_agentic_extraction.format_speaker_user_id_mapping_markdown(speaker_pairs)[source]

Markdown table: user_id → display name for the current chunk (system prompt).

Return type:

str

Parameters:

speaker_pairs (list[tuple[str, str]])

kg_agentic_extraction.augment_system_prompt_with_speaker_mapping(system, speaker_pairs)[source]
Return type:

str

Parameters:
async kg_agentic_extraction.prefetch_speaker_kg_context(kg, speakers, *, max_speakers=8, hits_per_speaker=3, min_score=0.0)[source]

Vector search prefetch for chunk speakers; full entity text for user prompt.

Return type:

str

Parameters:
kg_agentic_extraction.build_kg_bulk_user_message(conversation_text, *, channel_id, chunk_index, time_start_iso='', time_end_iso='', platforms_channels='', config=None, chunk_channel_pairs=None, chunk_speaker_pairs=None, speaker_kg_prefetch='', channel_metadata=None)[source]

User message: metadata, channel context, speakers, optional prefetch.

Return type:

str

Parameters:
kg_agentic_extraction.messages_for_agentic_token_estimate(conversation_text, *, channel_id, chunk_index=0, time_start_iso='', time_end_iso='', platforms_channels='', config=None, chunk_channel_pairs=None, chunk_speaker_pairs=None, speaker_kg_prefetch='', channel_metadata=None)[source]

OpenAI-shaped messages for Gemini countTokens (same shape as a run).

Return type:

list[dict[str, str]]

Parameters:
kg_agentic_extraction.build_kg_bulk_tool_registry()[source]

Read-only KG tools for the bulk extraction agent.

Return type:

ToolRegistry

kg_agentic_extraction.kg_bulk_native_model_id()[source]

Gemini API model id (strip OpenRouter google/ prefix).

Return type:

str

kg_agentic_extraction.create_kg_bulk_gemini_pool_client(*, max_tool_rounds=48, max_tokens=60000, max_tool_output_chars=3000000, temperature=0.25)[source]

Native Gemini via embed pool keys + AFC (gemini_kg_bulk_client).

Return type:

GeminiPoolToolChatClient

Parameters:
  • max_tool_rounds (int)

  • max_tokens (int)

  • max_tool_output_chars (int)

  • temperature (float)

kg_agentic_extraction.create_kg_bulk_openrouter_client(api_key, *, gemini_api_key='', max_tool_rounds=48, max_tokens=60000, max_tool_output_chars=3000000, temperature=0.25)[source]

OpenRouter client: production endpoint + gemini-3.1-flash-lite-preview.

Return type:

OpenRouterClient

Parameters:
  • api_key (str)

  • gemini_api_key (str)

  • max_tool_rounds (int)

  • max_tokens (int)

  • max_tool_output_chars (int)

  • temperature (float)

async kg_agentic_extraction.run_agentic_kg_extraction_chunk(*, conversation_text, channel_id, kg_manager, bulk_client, user_id='000000000000', chunk_index=0, time_start_iso='', time_end_iso='', platforms_channels='', config=None, chunk_channel_pairs=None, chunk_speaker_pairs=None, speaker_kg_prefetch='', channel_metadata=None, persist_extraction=True)[source]

One agentic extraction pass over conversation_text.

When persist_extraction is false, the model still runs (including read-only KG tools); parsed JSON is not applied to the graph.

Return type:

dict[str, Any]

Parameters: