Source code for kg_agentic_extraction

"""Agentic knowledge-graph extraction for bulk chat import.

Bulk agentic chat can use native Gemini (pool keys + :mod:`gemini_kg_bulk_client`)
or OpenRouter (:func:`create_kg_bulk_openrouter_client`).  A small read-only tool
set is backed by :class:`~knowledge_graph.KnowledgeGraphManager`.
"""

from __future__ import annotations

import hashlib
import jsonutil as json
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Protocol

from jinja2.sandbox import SandboxedEnvironment
from jinja2 import FileSystemLoader
from prompt_renderer import sanitize_context, LoggingSandboxedEnvironment
from tool_context import ToolContext
from tools import ToolRegistry

from kg_extraction import _parse_llm_json, apply_parsed_extraction
from openrouter_client import OpenRouterClient

if TYPE_CHECKING:
    from gemini_kg_bulk_client import GeminiPoolToolChatClient
    from knowledge_graph import KnowledgeGraphManager

logger = logging.getLogger(__name__)


[docs] class KgBulkLlmClient(Protocol): """Structural protocol for a bulk chunking and agentic-KG chat client. Defines the minimal async surface — ``chat``, ``count_input_tokens``, and ``close`` — that the bulk-extraction code depends on, so either concrete backend can be used interchangeably without a shared base class. The two implementations are :class:`~gemini_kg_bulk_client.GeminiPoolToolChatClient` (native Gemini over the shared embedding-pool keys with automatic function calling) and :class:`~openrouter_client.OpenRouterClient` (OpenRouter HTTP). As a :class:`typing.Protocol` it has no runtime behaviour and is never instantiated; it appears here only as the type of the ``bulk_client`` parameter of :func:`run_agentic_kg_extraction_chunk`. """
[docs] async def chat( self, messages: list[dict[str, Any]], user_id: str = "", ctx: ToolContext | None = None, tool_names: list[str] | None = None, validate_header: bool = False, token_count: int | None = None, on_intermediate_text: Callable[[str], Awaitable[None]] | None = None, ) -> str: """Run one tool-enabled chat turn and return the model's final text. Structural (Protocol) declaration with no body — it documents the contract that both concrete bulk clients satisfy: :class:`~gemini_kg_bulk_client.GeminiPoolToolChatClient` (native Gemini automatic function calling over the embed-pool keys) and :class:`~openrouter_client.OpenRouterClient` (OpenRouter HTTP). The implementation is expected to drive the read-only KG tool loop, calling the registered ``kg_search_entities`` / ``kg_get_entity`` / ``kg_inspect_entity`` tools (which reach the :class:`~knowledge_graph.KnowledgeGraphManager` via ``ctx``) for up to the client's ``max_tool_rounds``, then return the assistant's final message as a plain string. Within this module the only caller is :func:`run_agentic_kg_extraction_chunk`, which awaits ``bulk_client.chat`` with the assembled system/user messages and :data:`KG_BULK_TOOL_NAMES`. Concrete ``chat`` methods are exercised more broadly across the codebase (e.g. ``message_processor``, ``kg_consolidation``, ``kg_extraction``) but those go through other client instances, not this Protocol. Args: messages: OpenAI-style ``{"role", "content"}`` message dicts forming the system + user prompt for this turn. user_id: Identifier of the speaker/owner for the turn; defaults to an empty string for bulk extraction. ctx: Tool execution context carrying ``kg_manager`` and ids so the read-only KG tools can run; may be None. tool_names: Explicit subset of tool names the model may call this turn; None lets the client use its default registry view. validate_header: Whether the implementation should enforce a structured header on the reply (unused/False for bulk KG). token_count: Optional pre-computed input token count the client may use to size context instead of recounting. on_intermediate_text: Optional async callback invoked with intermediate assistant text chunks as they stream. Returns: str: The model's final assistant message text. """ ...
[docs] async def count_input_tokens( self, messages: list[dict[str, Any]], *, gemini_model: str | None = None, ) -> int | None: """Estimate the input token count for *messages* before sending. Structural (Protocol) declaration with no body. Concrete clients are expected to call the provider's token-counting endpoint (e.g. Gemini ``countTokens``) so bulk callers can pre-size chunks and decide whether a conversation fits in the model's context window. May return None when no estimate is available. This Protocol method has no callers inside this module; concrete implementations are awaited by the bulk-import sizing paths (``kg_bulk_runner.py`` via a token counter and :class:`~gemini_kg_bulk_client.GeminiPoolToolChatClient` internally). Args: messages: OpenAI-style message dicts to be measured, normally those produced by :func:`messages_for_agentic_token_estimate`. gemini_model: Optional explicit Gemini model id to count against; None lets the implementation use its configured default. Returns: int | None: The estimated input token count, or None when the implementation cannot produce one. """ ...
[docs] async def close(self) -> None: """Release any network/session resources held by the client. Structural (Protocol) declaration with no body. Concrete clients are expected to close their underlying HTTP session or pooled connections so bulk runs shut down cleanly. Idempotent by convention. Not called for this Protocol inside this module; concrete ``close`` methods are awaited during teardown by the bulk runner (``kg_bulk_runner.py``) and other lifecycle paths. Returns: None. """ ...
KG_BULK_OPENROUTER_BASE = "https://openrouter.ai/api/v1" KG_BULK_CHAT_MODEL = "google/gemini-3.1-flash-lite" KG_BULK_TOOL_NAMES = [ "kg_search_entities", "kg_get_entity", "kg_inspect_entity", ] _SYSTEM_PROMPT_CACHE: dict[str, str] = {} def _print_dry_run_llm_output( *, channel_id: str, chunk_index: int, raw: str, ) -> None: """Print the model's final text to stdout for a non-persisting dry run. Renders the LLM's raw final message inside a banner (separator lines plus a header naming the chunk and channel scope) and writes it to stdout with ``flush=True``, so an operator running extraction without persistence can inspect what the model proposed. Returns early when the text is empty. Touches stdout only (no Redis, KG, or network side effects). Called by :func:`run_agentic_kg_extraction_chunk` when ``persist_extraction`` is false; no external callers. Args: channel_id: Default channel scope id for the chunk, shown in the banner. chunk_index: Zero-based index of the chunk being reported. raw: The model's final assistant text; stripped before printing and skipped entirely when blank. Returns: None. """ body = (raw or "").strip() if not body: return sep = "=" * 72 print( f"\n{sep}\n" f"KG agentic dry-run — LLM output " f"(chunk {chunk_index}, scope {channel_id})\n" f"{sep}\n" f"{body}\n" f"{sep}\n", flush=True, ) def _project_root() -> Path: """Return the directory containing this module as the project root. Resolves the absolute parent of this file so prompt-template lookups are independent of the process working directory. Called by :func:`_system_template_path` to anchor the ``prompts/`` directory; no external callers reference this module-local helper. Returns: Path: Absolute path of the directory holding ``kg_agentic_extraction.py``. """ return Path(__file__).resolve().parent def _is_secret_setting_key(key: str) -> bool: """Return True if a platform-settings key likely holds a secret. Performs a case-insensitive substring match against a fixed set of sensitive fragments (``token``, ``password``, ``secret``, ``credential``, ``api_key``, ``client_secret``) so such values are never rendered into the system prompt. Called by :func:`build_platform_context_markdown` when filtering each platform's settings; no external callers. Args: key: A platform settings key name to test. Returns: bool: True if the key contains any known secret fragment, else False. """ lk = key.lower() for frag in ( "token", "password", "secret", "credential", "api_key", "client_secret", ): if frag in lk: return True return False def _platform_type_is_ncm(ptype: str) -> bool: """Return True when a platform type names the NCM (neurochemical) subsystem. Case-insensitive substring check for ``ncm`` in a platform's ``type`` value, used to exclude non-chat neurochemical platform entries from the extraction prompt (their settings carry no conversational meaning). Called by :func:`build_platform_context_markdown` both to skip such platforms and to detect the all-NCM case; no external callers. Args: ptype: The platform ``type`` string to test (None-safe). Returns: bool: True if the type mentions ``ncm``, else False. """ return "ncm" in (ptype or "").strip().lower() def _settings_key_is_ncm_related(key: str) -> bool: """Return True if a settings key relates to the NCM (neurochemical) subsystem. Case-insensitive check for the ``ncm`` substring, used to drop neurochemical configuration from the chat-extraction prompt. Called by :func:`build_platform_context_markdown` alongside :func:`_is_secret_setting_key` to filter per-platform settings; no external callers. Args: key: A platform settings key name to test. Returns: bool: True if the key mentions ``ncm``, else False. """ return "ncm" in str(key).lower()
[docs] def build_platform_context_markdown(cfg: Any | None) -> str: """Build a human-readable, secret-free platform summary for the system prompt. Walks the config's enabled platforms and emits a markdown block describing each one (Matrix homeserver and bot id, Discord snowflake ids, and remaining non-secret settings) so the extraction model can interpret platform-specific identifiers in the chat log. Secret-looking and NCM-related setting keys are dropped via :func:`_is_secret_setting_key` and :func:`_settings_key_is_ncm_related`, NCM platforms are skipped via :func:`_platform_type_is_ncm`, and structured values are summarized rather than dumped. Reads only the passed config object; no Redis, KG, or network access. Falls back to explicit placeholder text for a missing config, no platform entries, an all-NCM set, or all-disabled platforms. Called by :func:`render_kg_agentic_system_prompt` and :func:`_system_prompt_cache_key` (both in this module) to inject and fingerprint the platform context; no external callers. Args: cfg: Loaded application config (or None), inspected for ``platforms`` and legacy Matrix ``homeserver`` / ``user_id`` attributes. Returns: str: A markdown summary of enabled chat platforms, or an explanatory placeholder string when no usable platform context exists. """ if cfg is None: return ( "(No configuration passed — treat `platform` / `channel_id` / " "`user_id` in logs as opaque platform-specific identifiers.)" ) platforms = getattr(cfg, "platforms", None) or [] if not platforms: lines_out = [ "(No `platforms:` entries in config.yaml — if legacy Matrix " "fields exist, only the Matrix bot may be active.)", ] hs = getattr(cfg, "homeserver", "") or "" uid = getattr(cfg, "user_id", "") or "" if hs: lines_out.append(f"- Default Matrix homeserver URL: `{hs}`") if uid: lines_out.append( f"- Legacy Matrix bot user id (format reference): `{uid}`", ) return "\n".join(lines_out) lines: list[str] = [] idx = 0 for p in platforms: if not getattr(p, "enabled", True): continue ptype = getattr(p, "type", "") or "unknown" if _platform_type_is_ncm(ptype): continue idx += 1 lines.append(f"### Platform #{idx}: `{ptype}`") st = { k: v for k, v in (p.settings or {}).items() if not _is_secret_setting_key(str(k)) and not _settings_key_is_ncm_related(str(k)) } if ptype == "matrix": hs = st.get("homeserver") or getattr(cfg, "homeserver", "") if hs: lines.append(f"- Matrix homeserver (public URL): `{hs}`") bot_uid = st.get("user_id") or getattr(cfg, "user_id", "") if bot_uid: lines.append( "- Matrix user ids in logs look like " "`@localpart:domain`." ) lines.append( f"- This deployment's bot Matrix id: `{bot_uid}`", ) elif ptype in ("discord", "discord-self"): lines.append( "- Discord numeric ids in logs (`user_id`, `channel_id`, " "etc.) are **snowflakes** (large integers as strings)." ) for key in ( "application_id", "guild_id", "primary_guild_id", "default_guild_id", ): if key in st and st[key]: lines.append(f"- `{key}`: `{st[key]}`") skip_keys = { "homeserver", "user_id", "store_path", "credentials_file", "password", } for k, v in sorted(st.items()): if k in skip_keys: continue if isinstance(v, (dict, list)): lines.append( f"- `{k}`: _(structured value; omitted from prompt)_", ) else: lines.append(f"- `{k}`: `{v}`") if not lines: enabled = [x for x in platforms if getattr(x, "enabled", True)] if enabled and all( _platform_type_is_ncm(getattr(x, "type", "") or "") for x in enabled ): return ( "(Only NCM / non-chat platform entries are configured — omitted " "from KG extraction context; treat ids in logs as opaque.)" ) return "(All platforms disabled in configuration.)" return "\n".join(lines)
def _system_template_path() -> Path: """Return the filesystem path of the agentic-extraction system prompt template. Joins :func:`_project_root` with ``prompts/kg_agentic_extraction_system.j2``. Called by :func:`render_kg_agentic_system_prompt` (to load and render the Jinja2 template) and by :func:`_system_prompt_cache_key` (to read its mtime for cache invalidation); no external callers. Returns: Path: Absolute path to the system-prompt Jinja2 template. """ return _project_root() / "prompts" / "kg_agentic_extraction_system.j2"
[docs] def render_kg_agentic_system_prompt(cfg: Any | None = None) -> str: """Render the agentic-extraction system prompt from its Jinja2 template. Loads ``prompts/kg_agentic_extraction_system.j2`` (resolved via :func:`_system_template_path`) in a sandboxed, non-autoescaping :class:`~prompt_renderer.LoggingSandboxedEnvironment`, injecting the secret-free ``platform_context`` produced by :func:`build_platform_context_markdown` (passed through :func:`~prompt_renderer.sanitize_context`). Reads the template file from disk; if it is missing, logs a warning and returns a hard-coded fallback prompt with the same platform context appended. No Redis, KG, or network access. Called only by :func:`load_kg_agentic_system_prompt`, which memoizes the result; no external callers. Args: cfg: Loaded application config (or None) forwarded to :func:`build_platform_context_markdown`. Returns: str: The fully rendered system prompt text (or the fallback prompt when the template file is absent). """ path = _system_template_path() if not path.is_file(): logger.warning("Missing %s — using fallback system prompt", path) return ( "You extract knowledge graphs from chat. Use tools to search " "the graph before creating entities. Final message: JSON only.\n\n" + build_platform_context_markdown(cfg) ) env = LoggingSandboxedEnvironment( loader=FileSystemLoader(str(path.parent)), autoescape=False, template_name=path.name, ) template = env.get_template(path.name) ctx = sanitize_context( {"platform_context": build_platform_context_markdown(cfg)}, ) return template.render(**ctx)
def _system_prompt_cache_key(cfg: Any | None) -> str: """Build a cache key fingerprinting the template, platform context, and hints. Combines the template file's modification time with a SHA-256 digest of the rendered platform-context markdown and any JSON-serialized ``kg_extraction_channel_hints`` from the config, so the cached system prompt in :data:`_SYSTEM_PROMPT_CACHE` is invalidated whenever the template or the config-derived inputs change. Calls :func:`_system_template_path` (and tolerates a missing file via :class:`OSError`, falling back to mtime ``0.0``) and :func:`build_platform_context_markdown`. Called only by :func:`load_kg_agentic_system_prompt`; no external callers. Args: cfg: Loaded application config (or None), inspected for ``platforms`` and ``kg_extraction_channel_hints``. Returns: str: A key of the form ``"{mtime:.6f}:{24-char-sha256-prefix}"``. """ path = _system_template_path() try: mtime = path.stat().st_mtime except OSError: mtime = 0.0 pc = build_platform_context_markdown(cfg) hints = "" if cfg is not None and hasattr(cfg, "kg_extraction_channel_hints"): hints = json.dumps( getattr(cfg, "kg_extraction_channel_hints", {}), sort_keys=True, ) h = hashlib.sha256(f"{pc}\n{hints}".encode()).hexdigest()[:24] return f"{mtime:.6f}:{h}"
[docs] def load_kg_agentic_system_prompt(config: Any | None = None) -> str: """Return the rendered system prompt, memoized by template mtime and config. Computes a cache key via :func:`_system_prompt_cache_key` (template mtime plus a hash of platform context and channel hints) and, on a miss, renders and stores the prompt in the module-level :data:`_SYSTEM_PROMPT_CACHE` dict via :func:`render_kg_agentic_system_prompt`. This avoids re-rendering the Jinja2 template for every chunk while still picking up template or config edits. Touches only that in-process cache; no Redis, KG, or network access. Called by :func:`messages_for_agentic_token_estimate` and :func:`run_agentic_kg_extraction_chunk` (both in this module) to assemble the system message; no external callers. Args: config: Loaded application config (or None) used both as the cache fingerprint input and for rendering. Returns: str: The rendered (and now cached) system prompt text. """ key = _system_prompt_cache_key(config) if key not in _SYSTEM_PROMPT_CACHE: _SYSTEM_PROMPT_CACHE[key] = render_kg_agentic_system_prompt(config) return _SYSTEM_PROMPT_CACHE[key]
[docs] def format_chunk_channels_section( channel_pairs: list[tuple[str, str]], cfg: Any | None, default_channel_scope: str, channel_metadata: dict[str, dict[str, str]] | None = None, ) -> str: """Build the markdown ``Channels in this chunk`` section for the user prompt. Lists each ``(platform, channel_id)`` pair appearing in the chunk, enriched with optional operator-supplied descriptions from the config's ``kg_extraction_channel_hints`` and any resolved channel name/topic from ``channel_metadata``, then appends the chunk's ``default_channel_scope_id`` for scoped facts. This orients the model about which rooms/sources the log lines come from. Reads only the passed config object and metadata dict; no Redis, KG, or network access. Returns a placeholder section when no channel pairs are resolved. Called by :func:`build_kg_bulk_user_message` (in this module) while assembling the user message; no external callers. Args: channel_pairs: ``(platform, channel_id)`` tuples present in the chunk. cfg: Loaded application config (or None), read for ``kg_extraction_channel_hints``. default_channel_scope: Channel scope id used as the chunk default for scoped facts. channel_metadata: Optional ``"platform:channel_id"`` to ``{"name", "topic"}`` map for resolved channel display info. Returns: str: The markdown channels section, joined with newlines. """ meta = channel_metadata or {} if not channel_pairs: return ( "## Channels in this chunk\n" f"- _(none resolved — use default_channel_scope_id)_: " f"`{default_channel_scope}`" ) lines = ["## Channels in this chunk"] hints: dict[str, str] = {} if cfg is not None and hasattr(cfg, "kg_extraction_channel_hints"): hints = dict(getattr(cfg, "kg_extraction_channel_hints", {}) or {}) for plat, cid in channel_pairs: key = f"{plat}:{cid}" hint = hints.get(key, "") line = f"- **{plat}** / channel id `{cid}`" if hint: line += f" — _{hint}_" row = meta.get(key) or {} cname = (row.get("name") or "").strip() topic = (row.get("topic") or "").strip() if cname: line += f"\n - resolved name: **{cname}**" if topic: line += f"\n - topic: {topic}" lines.append(line) lines.append( f"- **default_channel_scope_id** (chunk default for scoped facts): " f"`{default_channel_scope}`", ) return "\n".join(lines)
[docs] def format_chunk_speakers_section( speaker_pairs: list[tuple[str, str]], ) -> str: """Build the markdown ``Speakers in this chunk`` section for the user prompt. Deduplicates the ``(user_id, display_name)`` pairs (first non-empty name wins per id, blanks become ``?``) and renders them as a sorted bullet list, so the model has the roster of participants for the chunk. Pure string formatting with no external side effects. Returns a placeholder section when there are no usable speakers. Called by :func:`build_kg_bulk_user_message` (in this module); no external callers. Args: speaker_pairs: ``(user_id, display_name)`` tuples for the chunk. Returns: str: The markdown speakers section, joined with newlines. """ if not speaker_pairs: return "## Speakers in this chunk\n- _(none)_" uniq: dict[str, str] = {} for uid, name in speaker_pairs: u = (uid or "").strip() if not u: continue n = (name or "").strip() or "?" if u not in uniq: uniq[u] = n if not uniq: return "## Speakers in this chunk\n- _(none)_" lines = ["## Speakers in this chunk"] for uid in sorted(uniq.keys()): lines.append(f"- `{uid}` — **{uniq[uid]}**") return "\n".join(lines)
[docs] def format_speaker_user_id_mapping_markdown( speaker_pairs: list[tuple[str, str]], ) -> str: """Render a markdown ``user_id`` to display-name table for the chunk. Deduplicates the speaker pairs (first non-empty name wins per id, blanks become ``?``), escapes pipe characters in names, and emits a markdown table that helps the model map raw sender ids in the log lines to human names. Returns an empty string when there are no usable speakers. Pure string formatting with no external side effects. Called by :func:`augment_system_prompt_with_speaker_mapping` (in this module) to build the block appended to the system prompt; no external callers. Args: speaker_pairs: ``(user_id, display_name)`` tuples for the chunk. Returns: str: The markdown table block, or an empty string when no speakers remain after deduplication. """ uniq: dict[str, str] = {} for uid, name in speaker_pairs or []: u = (uid or "").strip() if not u: continue n = (name or "").strip() or "?" uniq.setdefault(u, n) if not uniq: return "" lines = [ "## User ID → display name (this chunk)", "", "Use this table when interpreting sender ids in the log lines below:", "", "| `user_id` | display name |", "| --- | --- |", ] for uid in sorted(uniq.keys()): nm = uniq[uid].replace("|", "\\|") lines.append(f"| `{uid}` | {nm} |") return "\n".join(lines)
[docs] def augment_system_prompt_with_speaker_mapping( system: str, speaker_pairs: list[tuple[str, str]] | None, ) -> str: """Append a per-chunk ``user_id`` to display-name table to the system prompt. Builds a markdown mapping block via :func:`format_speaker_user_id_mapping_markdown` and concatenates it onto the rendered system prompt so the model can resolve sender ids in the log lines. Returns the prompt unchanged when no speakers are supplied (empty block). Called internally by both :func:`messages_for_agentic_token_estimate` and :func:`run_agentic_kg_extraction_chunk` to assemble the final system message; no external callers. Args: system: The base rendered system prompt text. speaker_pairs: List of ``(user_id, display_name)`` tuples for the chunk, or None. Returns: str: The system prompt with the speaker-mapping block appended, or the original ``system`` when there is nothing to add. """ block = format_speaker_user_id_mapping_markdown(list(speaker_pairs or [])) if not block.strip(): return system return system.rstrip() + "\n\n" + block + "\n"
[docs] async def prefetch_speaker_kg_context( kg: Any, speakers: list[tuple[str, str]], *, max_speakers: int = 8, hits_per_speaker: int = 3, min_score: float = 0.0, ) -> str: """Prefetch existing KG entities for a chunk's speakers as user-prompt context. For each unique speaker (capped at ``max_speakers``) runs two vector searches on the knowledge graph via ``kg.search_entities`` — one scoped to the speaker's ``user`` category and id, and one general query over name and id — then deduplicates hits by uuid, drops anything below ``min_score``, and formats the survivors into a markdown block. This seeds the model with likely already-known entities so it reuses them instead of creating duplicates. It reads the KG only (FalkorDB/vector index behind :class:`~knowledge_graph.KnowledgeGraphManager`) and never mutates it; each search is wrapped so a failure is logged at debug and skipped rather than raised. Returns an empty string when there are no speakers or no surviving hits. Called by ``kg_bulk_runner.run_agentic_kg_bulk`` (in ``kg_bulk_runner.py``) when speaker prefetch is enabled; the result is passed through as the ``speaker_kg_prefetch`` argument to the user-message builder. Args: kg: A :class:`~knowledge_graph.KnowledgeGraphManager` (typed ``Any``) exposing an async ``search_entities`` method. speakers: ``(user_id, display_name)`` tuples for the chunk. max_speakers: Maximum distinct speakers to prefetch, clamped to 1-128. hits_per_speaker: Vector hits requested per search, clamped to 1-48. min_score: Minimum similarity score a hit must meet to be included. Returns: str: A markdown ``Existing knowledge graph (speakers — prefetch)`` block, or an empty string when nothing relevant was found. """ if not speakers: return "" seen_uuid: set[str] = set() blocks: list[str] = [] cap = max(1, min(128, int(max_speakers))) hits = max(1, min(48, int(hits_per_speaker))) uniq_speakers: dict[str, str] = {} for uid, name in speakers: u = (uid or "").strip() if not u: continue uniq_speakers.setdefault(u, (name or "").strip() or "?") for uid in sorted(uniq_speakers.keys())[:cap]: name = uniq_speakers[uid] hits_list: list[dict[str, Any]] = [] try: by_scope = await kg.search_entities( name, category="user", scope_id=uid, top_k=hits, ) hits_list.extend(by_scope or []) except Exception: logger.debug("prefetch user scope search failed", exc_info=True) try: general = await kg.search_entities( f"{name} {uid}", top_k=hits, ) hits_list.extend(general or []) except Exception: logger.debug("prefetch general search failed", exc_info=True) rows: list[str] = [] for ent in hits_list: sc = float(ent.get("score") or 0.0) if sc < min_score: continue uu = str(ent.get("uuid") or "") if not uu or uu in seen_uuid: continue seen_uuid.add(uu) nm = str(ent.get("name") or "") et = str(ent.get("type") or "") desc = str(ent.get("description") or "") cat = str(ent.get("category") or "") sid = str(ent.get("scope_id") or "") rows.append( f" - `{nm}` ({et}, cat={cat}, scope={sid}, " f"score={sc:.3f}, uuid={uu})" + (f": {desc}" if desc else ""), ) if rows: blocks.append(f"**Speaker `{uid}` ({name})**:\n" + "\n".join(rows)) if not blocks: return "" header = ( "## Existing knowledge graph (speakers — prefetch)\n" "_Heuristic vector matches only; may be incomplete or noisy — " "use kg_search_entities / kg_get_entity to verify._\n\n" ) body = "\n\n".join(blocks) return header + body
[docs] def build_kg_bulk_user_message( conversation_text: str, *, channel_id: str, chunk_index: int, time_start_iso: str = "", time_end_iso: str = "", platforms_channels: str = "", config: Any | None = None, chunk_channel_pairs: list[tuple[str, str]] | None = None, chunk_speaker_pairs: list[tuple[str, str]] | None = None, speaker_kg_prefetch: str = "", channel_metadata: dict[str, dict[str, str]] | None = None, ) -> str: """Assemble the full user message for one agentic extraction chunk. Concatenates chunk metadata (index, default channel scope, UTC time range, and optional corpus-wide platform/channel list), the channel section from :func:`format_chunk_channels_section`, the speaker section from :func:`format_chunk_speakers_section`, any speaker-KG prefetch text, the chronological conversation, and a final instruction to emit the JSON extraction. This is the single source of truth for the user turn so token estimation and the real run see the same shape. Pure string assembly with no Redis, KG, or network access. Called by :func:`messages_for_agentic_token_estimate` and :func:`run_agentic_kg_extraction_chunk` (both in this module); no external callers. Args: conversation_text: The chronological chat transcript for the chunk. channel_id: Default channel scope id for scoped facts. chunk_index: Zero-based index of this chunk. time_start_iso: ISO-8601 start of the chunk's time range (optional). time_end_iso: ISO-8601 end of the chunk's time range (optional). platforms_channels: Optional summary of all platforms/channels in the full corpus. config: Loaded application config (or None) forwarded to the section builders. chunk_channel_pairs: ``(platform, channel_id)`` pairs in the chunk. chunk_speaker_pairs: ``(user_id, display_name)`` pairs in the chunk. speaker_kg_prefetch: Optional prefetched speaker-KG markdown block. channel_metadata: Optional per-channel name/topic metadata map. Returns: str: The assembled user message, joined with newlines. """ pairs = sorted(set(chunk_channel_pairs or [])) chan_block = format_chunk_channels_section( pairs, config, channel_id, channel_metadata=channel_metadata, ) sp_block = format_chunk_speakers_section( list(chunk_speaker_pairs or []), ) lines = [ "## Chunk metadata", f"- chunk_index: {chunk_index}", f"- default_channel_scope_id: {channel_id}", f"- time_range_utc: {time_start_iso}{time_end_iso}", ] if platforms_channels: lines.append( f"- all platforms/channels in full corpus: {platforms_channels}", ) lines.extend( [ "", chan_block, "", sp_block, ] ) if (speaker_kg_prefetch or "").strip(): lines.extend(["", (speaker_kg_prefetch or "").strip()]) lines.extend( [ "", "## Conversation (chronological)", conversation_text, "", "Now produce the final JSON extraction per system instructions.", ] ) return "\n".join(lines)
[docs] def messages_for_agentic_token_estimate( conversation_text: str, *, channel_id: str, chunk_index: int = 0, time_start_iso: str = "", time_end_iso: str = "", platforms_channels: str = "", config: Any | None = None, chunk_channel_pairs: list[tuple[str, str]] | None = None, chunk_speaker_pairs: list[tuple[str, str]] | None = None, speaker_kg_prefetch: str = "", channel_metadata: dict[str, dict[str, str]] | None = None, ) -> list[dict[str, str]]: """Build the system + user message pair used to pre-count input tokens. Produces the same OpenAI-style ``[system, user]`` message list that a real extraction run sends — the system prompt from :func:`load_kg_agentic_system_prompt` augmented with the speaker mapping via :func:`augment_system_prompt_with_speaker_mapping`, and the user message from :func:`build_kg_bulk_user_message` — so a token count over these messages accurately reflects the run. Pure assembly with no Redis, KG, or network access (it does not itself call ``countTokens``). Called by ``kg_bulk_runner`` (in ``kg_bulk_runner.py``) to size chunks before dispatching them to the bulk client's token counter; no other callers. Args: conversation_text: The chronological chat transcript for the chunk. channel_id: Default channel scope id for scoped facts. chunk_index: Zero-based index of this chunk (default 0). time_start_iso: ISO-8601 start of the chunk's time range (optional). time_end_iso: ISO-8601 end of the chunk's time range (optional). platforms_channels: Optional corpus-wide platform/channel summary. config: Loaded application config (or None) forwarded to the builders. chunk_channel_pairs: ``(platform, channel_id)`` pairs in the chunk. chunk_speaker_pairs: ``(user_id, display_name)`` pairs in the chunk. speaker_kg_prefetch: Optional prefetched speaker-KG markdown block. channel_metadata: Optional per-channel name/topic metadata map. Returns: list[dict[str, str]]: A two-element ``[system, user]`` message list. """ system = augment_system_prompt_with_speaker_mapping( load_kg_agentic_system_prompt(config), chunk_speaker_pairs, ) user = build_kg_bulk_user_message( conversation_text, channel_id=channel_id, chunk_index=chunk_index, time_start_iso=time_start_iso, time_end_iso=time_end_iso, platforms_channels=platforms_channels, config=config, chunk_channel_pairs=chunk_channel_pairs, chunk_speaker_pairs=chunk_speaker_pairs, speaker_kg_prefetch=speaker_kg_prefetch, channel_metadata=channel_metadata, ) return [ {"role": "system", "content": system}, {"role": "user", "content": user}, ]
[docs] def build_kg_bulk_tool_registry() -> ToolRegistry: """Build a registry of the read-only KG tools the bulk agent may call. Creates a :class:`~tools.ToolRegistry` (with no task manager) and registers the three read-only knowledge-graph tools — ``kg_search_entities``, ``kg_get_entity``, and ``kg_inspect_entity`` — as nested closures decorated with ``@reg.tool``. Each tool reaches the :class:`~knowledge_graph.KnowledgeGraphManager` through the ``ctx`` passed at call time and only queries the graph, never mutating it. The returned registry is handed to whichever bulk client drives the tool loop. Building the registry itself has no Redis, KG, or network side effects. Called by :func:`create_kg_bulk_gemini_pool_client` and :func:`create_kg_bulk_openrouter_client` (both in this module) to supply each client's ``tool_registry``; no external callers. Returns: ToolRegistry: A registry exposing the read-only KG search and inspection tools. """ reg = ToolRegistry(task_manager=None) @reg.tool( name="kg_search_entities", description=( "Semantic search over knowledge-graph entities. " "Use short queries (names, projects, topics from the chat)." ), parameters={ "type": "object", "properties": { "query": {"type": "string"}, "category": { "type": "string", "description": "Optional: user, channel, general, basic", }, "scope_id": {"type": "string"}, "top_k": {"type": "integer", "description": "Max hits (default 12)"}, }, "required": ["query"], }, ) async def kg_search_entities( query: str, category: str = "", scope_id: str = "", top_k: int = 12, ctx: ToolContext | None = None, ) -> str: """Run a semantic vector search over knowledge-graph entities. Read-only KG tool registered as ``kg_search_entities`` on the bulk registry. Clamps ``top_k`` to the range 1-24 and delegates to :meth:`KnowledgeGraphManager.search_entities` (via ``ctx.kg_manager``), returning a JSON envelope of the hit count and result rows; never mutates the graph. All exceptions and a missing KG manager are caught and reported in the JSON ``error`` field rather than raised. Invoked by name by the bulk LLM client (native Gemini AFC or :class:`OpenRouterClient`) during a tool round; not called directly from Python. Args: query: Free-text search query (names, projects, topics from chat). category: Optional entity category filter (e.g. ``user``, ``channel``, ``general``, ``basic``); empty means no filter. scope_id: Optional scope identifier to restrict results; empty means no filter. top_k: Requested maximum hits, clamped to 1-24 (default 12). ctx: Tool execution context supplying ``kg_manager``. Returns: str: JSON string ``{"success", "count", "results"}`` on success, or ``{"success": False, "error"}`` when the KG is unavailable or the search fails. """ if ctx is None or ctx.kg_manager is None: return json.dumps({"success": False, "error": "kg unavailable"}) kg = ctx.kg_manager tk = max(1, min(24, int(top_k))) try: results = await kg.search_entities( query, category=category or None, scope_id=scope_id or None, top_k=tk, ) return json.dumps( {"success": True, "count": len(results), "results": results}, default=str, ) except Exception as e: return json.dumps({"success": False, "error": str(e)}) @reg.tool( name="kg_get_entity", description=( "Look up one entity by name and/or uuid; includes immediate " "relationship summaries." ), parameters={ "type": "object", "properties": { "name": {"type": "string"}, "uuid": {"type": "string"}, }, "required": [], }, ) async def kg_get_entity( name: str = "", uuid: str = "", ctx: ToolContext | None = None, ) -> str: """Look up a single KG entity by name and/or uuid. Read-only KG tool registered as ``kg_get_entity``. Requires at least one of ``name`` or ``uuid`` and delegates to :meth:`KnowledgeGraphManager.get_entity` (via ``ctx.kg_manager``), which returns the entity plus immediate relationship summaries; never mutates the graph. A missing KG manager, missing identifiers, a not-found entity, and any exception are all reported through the JSON ``error`` field rather than raised. Invoked by name by the bulk LLM client during a tool round; not called directly from Python. Args: name: Entity name to look up; empty if unused. uuid: Entity uuid to look up; empty if unused. ctx: Tool execution context supplying ``kg_manager``. Returns: str: JSON ``{"success": True, "entity": ...}`` on a hit, otherwise ``{"success": False, "error"}`` (unavailable KG, no identifier provided, entity not found, or lookup failure). """ if ctx is None or ctx.kg_manager is None: return json.dumps({"success": False, "error": "kg unavailable"}) if not (name or "").strip() and not (uuid or "").strip(): return json.dumps( { "success": False, "error": "Provide name or uuid", } ) kg = ctx.kg_manager try: ent = await kg.get_entity( name=(name or "").strip() or None, uuid=(uuid or "").strip() or None, ) if ent: return json.dumps({"success": True, "entity": ent}, default=str) return json.dumps({"success": False, "error": "Entity not found"}) except Exception as e: return json.dumps({"success": False, "error": str(e)}) @reg.tool( name="kg_inspect_entity", description=( "Deep inspection: entity plus inbound/outbound edges, " "optional 2-hop neighborhood. max_depth 1 or 2." ), parameters={ "type": "object", "properties": { "name": {"type": "string"}, "uuid": {"type": "string"}, "max_depth": {"type": "integer"}, }, "required": [], }, ) async def kg_inspect_entity( name: str = "", uuid: str = "", max_depth: int = 2, ctx: ToolContext | None = None, ) -> str: """Deeply inspect a KG entity, including edges and optional neighborhood. Read-only KG tool registered as ``kg_inspect_entity``. Requires at least one of ``name`` or ``uuid``, clamps ``max_depth`` to 1-2, and delegates to :meth:`KnowledgeGraphManager.inspect_entity` (via ``ctx.kg_manager``) with a fixed ``neighbor_limit`` of 30 to retrieve the entity plus its inbound/outbound edges and (at depth 2) a 2-hop neighborhood; never mutates the graph. A missing KG manager, missing identifiers, a not-found entity, and any exception are reported in the JSON ``error`` field rather than raised. Invoked by name by the bulk LLM client during a tool round; not called directly from Python. Args: name: Entity name to inspect; empty if unused. uuid: Entity uuid to inspect; empty if unused. max_depth: Neighborhood depth, clamped to 1 or 2 (default 2). ctx: Tool execution context supplying ``kg_manager``. Returns: str: JSON ``{"success": True, ...inspection fields...}`` on a hit, otherwise ``{"success": False, "error"}`` (unavailable KG, no identifier provided, entity not found, or inspection failure). """ if ctx is None or ctx.kg_manager is None: return json.dumps({"success": False, "error": "kg unavailable"}) if not (name or "").strip() and not (uuid or "").strip(): return json.dumps( { "success": False, "error": "Provide name or uuid", } ) kg = ctx.kg_manager depth = max(1, min(2, int(max_depth))) try: result = await kg.inspect_entity( name=(name or "").strip() or None, uuid=(uuid or "").strip() or None, max_depth=depth, neighbor_limit=30, ) if result: return json.dumps({"success": True, **result}, default=str) return json.dumps({"success": False, "error": "Entity not found"}) except Exception as e: return json.dumps({"success": False, "error": str(e)}) return reg
[docs] def kg_bulk_native_model_id() -> str: """Return the native Gemini model id for the bulk chat model. Strips the OpenRouter-style ``google/`` vendor prefix off :data:`KG_BULK_CHAT_MODEL` so the native Gemini client (which expects a bare model id) can be configured from the same constant the OpenRouter path uses. Pure string transform with no side effects. Called by :func:`create_kg_bulk_gemini_pool_client` (in this module) to set the pool client's ``model_id``; no external callers. Returns: str: The bulk chat model id without the ``google/`` prefix. """ return KG_BULK_CHAT_MODEL.removeprefix("google/")
[docs] def create_kg_bulk_gemini_pool_client( *, max_tool_rounds: int = 48, max_tokens: int = 60_000, max_tool_output_chars: int = 3_000_000, temperature: float = 0.25, ) -> GeminiPoolToolChatClient: """Construct a native-Gemini bulk chat client backed by the embed-pool keys. Lazily imports and instantiates :class:`~gemini_kg_bulk_client.GeminiPoolToolChatClient`, wiring it to the read-only KG tool registry from :func:`build_kg_bulk_tool_registry`, the bare model id from :func:`kg_bulk_native_model_id`, and the supplied generation limits. This client talks to Gemini directly over the shared embedding-pool API keys and uses native automatic function calling for the tool loop; the underlying network/session resources are established by the client, not here. Called by ``kg_bulk_runner`` (in ``kg_bulk_runner.py``) to create both the token-counter client (with ``max_tool_rounds=1``) and the main bulk client; no other callers. Args: max_tool_rounds: Maximum tool-call rounds per chat turn (default 48). max_tokens: Maximum output tokens per turn (default 60000). max_tool_output_chars: Cap on characters of tool output fed back to the model (default 3000000). temperature: Sampling temperature (default 0.25). Returns: GeminiPoolToolChatClient: A configured native-Gemini bulk chat client. """ from gemini_kg_bulk_client import GeminiPoolToolChatClient return GeminiPoolToolChatClient( tool_registry=build_kg_bulk_tool_registry(), model_id=kg_bulk_native_model_id(), max_tool_rounds=max_tool_rounds, max_tokens=max_tokens, max_tool_output_chars=max_tool_output_chars, temperature=temperature, )
[docs] def create_kg_bulk_openrouter_client( api_key: str, *, gemini_api_key: str = "", max_tool_rounds: int = 48, max_tokens: int = 60_000, max_tool_output_chars: int = 3_000_000, temperature: float = 0.25, top_p: float = 0.99, ) -> OpenRouterClient: """Construct an OpenRouter-backed bulk chat client for KG extraction. Instantiates :class:`~openrouter_client.OpenRouterClient` against the OpenRouter base URL :data:`KG_BULK_OPENROUTER_BASE` and model :data:`KG_BULK_CHAT_MODEL` (gemini-3.1-flash-lite), wiring in the read-only KG tool registry from :func:`build_kg_bulk_tool_registry` plus the supplied generation and tool-loop limits. The client drives the KG tool loop over OpenRouter HTTP; an optional ``gemini_api_key`` lets it use native Gemini token counting. Constructing the client does not itself open a connection. Called by ``kg_bulk_runner`` (in ``kg_bulk_runner.py``) to create both the token-counter client (with ``max_tool_rounds=1``) and the main bulk client when the OpenRouter backend is selected; no other callers. Args: api_key: OpenRouter API key for authentication. gemini_api_key: Optional Gemini API key enabling native token counting. max_tool_rounds: Maximum tool-call rounds per chat turn (default 48). max_tokens: Maximum output tokens per turn (default 60000). max_tool_output_chars: Cap on characters of tool output fed back to the model (default 3000000). temperature: Sampling temperature (default 0.25). top_p: Nucleus-sampling top-p (default 0.99). Returns: OpenRouterClient: A configured OpenRouter bulk chat client. """ return OpenRouterClient( api_key=api_key, model=KG_BULK_CHAT_MODEL, temperature=temperature, max_tokens=max_tokens, top_p=top_p, tool_registry=build_kg_bulk_tool_registry(), max_tool_rounds=max_tool_rounds, base_url=KG_BULK_OPENROUTER_BASE, gemini_api_key=gemini_api_key, max_tool_output_chars=max_tool_output_chars, )
[docs] async def run_agentic_kg_extraction_chunk( *, conversation_text: str, channel_id: str, kg_manager: KnowledgeGraphManager, bulk_client: KgBulkLlmClient, user_id: str = "000000000000", chunk_index: int = 0, time_start_iso: str = "", time_end_iso: str = "", platforms_channels: str = "", config: Any | None = None, chunk_channel_pairs: list[tuple[str, str]] | None = None, chunk_speaker_pairs: list[tuple[str, str]] | None = None, speaker_kg_prefetch: str = "", channel_metadata: dict[str, dict[str, str]] | None = None, persist_extraction: bool = True, ) -> dict[str, Any]: """Run one agentic KG-extraction pass over a single conversation chunk. Assembles the system message (via :func:`load_kg_agentic_system_prompt` and :func:`augment_system_prompt_with_speaker_mapping`) and user message (via :func:`build_kg_bulk_user_message`), builds a :class:`~tool_context.ToolContext` bound to ``kg_manager``, and awaits ``bulk_client.chat`` with the read-only KG tool names. The model may search/inspect the graph during its tool rounds, and its final reply is parsed with :func:`~kg_extraction._parse_llm_json`. When ``persist_extraction`` is true the parsed entities and relationships are written to the knowledge graph through :func:`~kg_extraction.apply_parsed_extraction` (created by ``system:kg_agentic_bulk``); when it is false the model still runs (read-only tools included) but nothing is written and the raw output is printed via :func:`_print_dry_run_llm_output`. LLM-call failures, empty replies, and JSON parse errors are caught, logged, and reported as a stats dict with ``parse_error`` set rather than raised. Interacts with the LLM (Gemini AFC or OpenRouter HTTP behind ``bulk_client``) and, on persistence, with the KG (FalkorDB/vector index behind :class:`~knowledge_graph.KnowledgeGraphManager`). Called by ``kg_bulk_runner.run_agentic_kg_bulk`` (in ``kg_bulk_runner.py``), which awaits it once per chunk; no other callers. Args: conversation_text: The chronological chat transcript for the chunk. channel_id: Default channel scope id for scoped facts and the tool context. kg_manager: The :class:`~knowledge_graph.KnowledgeGraphManager` the tools and persistence operate against. bulk_client: The bulk LLM client (native Gemini or OpenRouter) satisfying :class:`KgBulkLlmClient` that drives the tool-enabled chat turn. user_id: Speaker/owner id for the turn and persisted facts (default the zero placeholder). chunk_index: Zero-based index of this chunk. time_start_iso: ISO-8601 start of the chunk's time range (optional). time_end_iso: ISO-8601 end of the chunk's time range (optional). platforms_channels: Optional corpus-wide platform/channel summary. config: Loaded application config (or None) forwarded to prompt assembly. chunk_channel_pairs: ``(platform, channel_id)`` pairs in the chunk. chunk_speaker_pairs: ``(user_id, display_name)`` pairs in the chunk. speaker_kg_prefetch: Optional prefetched speaker-KG markdown block. channel_metadata: Optional per-channel name/topic metadata map. persist_extraction: When True, apply parsed results to the graph; when False, run read-only and only report proposed counts. Returns: dict[str, Any]: A stats dict. On success it carries the persistence outcome (added entity/relationship counts and ``persisted``/``parse_error`` flags, or proposed counts in dry-run mode); on failure it reports ``errors`` with ``parse_error`` set. """ system = augment_system_prompt_with_speaker_mapping( load_kg_agentic_system_prompt(config), chunk_speaker_pairs, ) user = build_kg_bulk_user_message( conversation_text, channel_id=channel_id, chunk_index=chunk_index, time_start_iso=time_start_iso, time_end_iso=time_end_iso, platforms_channels=platforms_channels, config=config, chunk_channel_pairs=chunk_channel_pairs, chunk_speaker_pairs=chunk_speaker_pairs, speaker_kg_prefetch=speaker_kg_prefetch, channel_metadata=channel_metadata, ) msgs: list[dict[str, Any]] = [ {"role": "system", "content": system}, {"role": "user", "content": user}, ] ctx = ToolContext( kg_manager=kg_manager, user_id=user_id, channel_id=channel_id, ) try: raw = await bulk_client.chat( msgs, user_id=user_id, ctx=ctx, tool_names=list(KG_BULK_TOOL_NAMES), validate_header=False, ) except Exception: logger.warning("Agentic KG extraction LLM call failed", exc_info=True) return { "entities_added": 0, "relationships_added": 0, "errors": 1, "parse_error": True, } if not raw or not raw.strip(): return { "entities_added": 0, "relationships_added": 0, "errors": 1, "parse_error": True, } if not persist_extraction: _print_dry_run_llm_output( channel_id=channel_id, chunk_index=chunk_index, raw=raw, ) try: data = _parse_llm_json(raw) except (json.JSONDecodeError, Exception): logger.warning( "Agentic KG extraction JSON parse failed; raw=%r", raw, exc_info=True, ) return { "entities_added": 0, "relationships_added": 0, "errors": 1, "parse_error": True, } if not persist_extraction: return { "entities_added": 0, "relationships_added": 0, "errors": 0, "parse_error": False, "persisted": False, "proposed_entities": len(data.get("entities") or []), "proposed_relationships": len(data.get("relationships") or []), } stats = await apply_parsed_extraction( data, kg_manager, channel_id, user_id=user_id, created_by="system:kg_agentic_bulk", ) stats["parse_error"] = False stats["persisted"] = True return stats