"""Agentic knowledge-graph extraction for bulk chat import.
Bulk agentic chat can use native Gemini (pool keys + :mod:`gemini_kg_bulk_client`)
or OpenRouter (:func:`create_kg_bulk_openrouter_client`). A small read-only tool
set is backed by :class:`~knowledge_graph.KnowledgeGraphManager`.
"""
from __future__ import annotations
import hashlib
import jsonutil as json
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Protocol
from jinja2.sandbox import SandboxedEnvironment
from jinja2 import FileSystemLoader
from prompt_renderer import sanitize_context, LoggingSandboxedEnvironment
from tool_context import ToolContext
from tools import ToolRegistry
from kg_extraction import _parse_llm_json, apply_parsed_extraction
from openrouter_client import OpenRouterClient
if TYPE_CHECKING:
from gemini_kg_bulk_client import GeminiPoolToolChatClient
from knowledge_graph import KnowledgeGraphManager
logger = logging.getLogger(__name__)
[docs]
class KgBulkLlmClient(Protocol):
"""Structural protocol for a bulk chunking and agentic-KG chat client.
Defines the minimal async surface — ``chat``, ``count_input_tokens``, and
``close`` — that the bulk-extraction code depends on, so either concrete
backend can be used interchangeably without a shared base class. The two
implementations are
:class:`~gemini_kg_bulk_client.GeminiPoolToolChatClient` (native Gemini over
the shared embedding-pool keys with automatic function calling) and
:class:`~openrouter_client.OpenRouterClient` (OpenRouter HTTP). As a
:class:`typing.Protocol` it has no runtime behaviour and is never
instantiated; it appears here only as the type of the ``bulk_client``
parameter of :func:`run_agentic_kg_extraction_chunk`.
"""
[docs]
async def chat(
self,
messages: list[dict[str, Any]],
user_id: str = "",
ctx: ToolContext | None = None,
tool_names: list[str] | None = None,
validate_header: bool = False,
token_count: int | None = None,
on_intermediate_text: Callable[[str], Awaitable[None]] | None = None,
) -> str:
"""Run one tool-enabled chat turn and return the model's final text.
Structural (Protocol) declaration with no body — it documents the
contract that both concrete bulk clients satisfy:
:class:`~gemini_kg_bulk_client.GeminiPoolToolChatClient` (native Gemini
automatic function calling over the embed-pool keys) and
:class:`~openrouter_client.OpenRouterClient` (OpenRouter HTTP). The
implementation is expected to drive the read-only KG tool loop, calling
the registered ``kg_search_entities`` / ``kg_get_entity`` /
``kg_inspect_entity`` tools (which reach the
:class:`~knowledge_graph.KnowledgeGraphManager` via ``ctx``) for up to
the client's ``max_tool_rounds``, then return the assistant's final
message as a plain string.
Within this module the only caller is
:func:`run_agentic_kg_extraction_chunk`, which awaits ``bulk_client.chat``
with the assembled system/user messages and
:data:`KG_BULK_TOOL_NAMES`. Concrete ``chat`` methods are exercised more
broadly across the codebase (e.g. ``message_processor``,
``kg_consolidation``, ``kg_extraction``) but those go through other
client instances, not this Protocol.
Args:
messages: OpenAI-style ``{"role", "content"}`` message dicts forming
the system + user prompt for this turn.
user_id: Identifier of the speaker/owner for the turn; defaults to
an empty string for bulk extraction.
ctx: Tool execution context carrying ``kg_manager`` and ids so the
read-only KG tools can run; may be None.
tool_names: Explicit subset of tool names the model may call this
turn; None lets the client use its default registry view.
validate_header: Whether the implementation should enforce a
structured header on the reply (unused/False for bulk KG).
token_count: Optional pre-computed input token count the client may
use to size context instead of recounting.
on_intermediate_text: Optional async callback invoked with
intermediate assistant text chunks as they stream.
Returns:
str: The model's final assistant message text.
"""
...
[docs]
async def close(self) -> None:
"""Release any network/session resources held by the client.
Structural (Protocol) declaration with no body. Concrete clients are
expected to close their underlying HTTP session or pooled connections so
bulk runs shut down cleanly. Idempotent by convention.
Not called for this Protocol inside this module; concrete ``close``
methods are awaited during teardown by the bulk runner
(``kg_bulk_runner.py``) and other lifecycle paths.
Returns:
None.
"""
...
KG_BULK_OPENROUTER_BASE = "https://openrouter.ai/api/v1"
KG_BULK_CHAT_MODEL = "google/gemini-3.1-flash-lite"
KG_BULK_TOOL_NAMES = [
"kg_search_entities",
"kg_get_entity",
"kg_inspect_entity",
]
_SYSTEM_PROMPT_CACHE: dict[str, str] = {}
def _print_dry_run_llm_output(
*,
channel_id: str,
chunk_index: int,
raw: str,
) -> None:
"""Print the model's final text to stdout for a non-persisting dry run.
Renders the LLM's raw final message inside a banner (separator lines plus a
header naming the chunk and channel scope) and writes it to stdout with
``flush=True``, so an operator running extraction without persistence can
inspect what the model proposed. Returns early when the text is empty.
Touches stdout only (no Redis, KG, or network side effects). Called by
:func:`run_agentic_kg_extraction_chunk` when ``persist_extraction`` is false;
no external callers.
Args:
channel_id: Default channel scope id for the chunk, shown in the banner.
chunk_index: Zero-based index of the chunk being reported.
raw: The model's final assistant text; stripped before printing and
skipped entirely when blank.
Returns:
None.
"""
body = (raw or "").strip()
if not body:
return
sep = "=" * 72
print(
f"\n{sep}\n"
f"KG agentic dry-run — LLM output "
f"(chunk {chunk_index}, scope {channel_id})\n"
f"{sep}\n"
f"{body}\n"
f"{sep}\n",
flush=True,
)
def _project_root() -> Path:
"""Return the directory containing this module as the project root.
Resolves the absolute parent of this file so prompt-template lookups are
independent of the process working directory. Called by
:func:`_system_template_path` to anchor the ``prompts/`` directory; no
external callers reference this module-local helper.
Returns:
Path: Absolute path of the directory holding ``kg_agentic_extraction.py``.
"""
return Path(__file__).resolve().parent
def _is_secret_setting_key(key: str) -> bool:
"""Return True if a platform-settings key likely holds a secret.
Performs a case-insensitive substring match against a fixed set of
sensitive fragments (``token``, ``password``, ``secret``, ``credential``,
``api_key``, ``client_secret``) so such values are never rendered into the
system prompt. Called by :func:`build_platform_context_markdown` when
filtering each platform's settings; no external callers.
Args:
key: A platform settings key name to test.
Returns:
bool: True if the key contains any known secret fragment, else False.
"""
lk = key.lower()
for frag in (
"token",
"password",
"secret",
"credential",
"api_key",
"client_secret",
):
if frag in lk:
return True
return False
def _platform_type_is_ncm(ptype: str) -> bool:
"""Return True when a platform type names the NCM (neurochemical) subsystem.
Case-insensitive substring check for ``ncm`` in a platform's ``type`` value,
used to exclude non-chat neurochemical platform entries from the extraction
prompt (their settings carry no conversational meaning). Called by
:func:`build_platform_context_markdown` both to skip such platforms and to
detect the all-NCM case; no external callers.
Args:
ptype: The platform ``type`` string to test (None-safe).
Returns:
bool: True if the type mentions ``ncm``, else False.
"""
return "ncm" in (ptype or "").strip().lower()
def _settings_key_is_ncm_related(key: str) -> bool:
"""Return True if a settings key relates to the NCM (neurochemical) subsystem.
Case-insensitive check for the ``ncm`` substring, used to drop neurochemical
configuration from the chat-extraction prompt. Called by
:func:`build_platform_context_markdown` alongside :func:`_is_secret_setting_key`
to filter per-platform settings; no external callers.
Args:
key: A platform settings key name to test.
Returns:
bool: True if the key mentions ``ncm``, else False.
"""
return "ncm" in str(key).lower()
[docs]
def build_platform_context_markdown(cfg: Any | None) -> str:
"""Build a human-readable, secret-free platform summary for the system prompt.
Walks the config's enabled platforms and emits a markdown block describing
each one (Matrix homeserver and bot id, Discord snowflake ids, and remaining
non-secret settings) so the extraction model can interpret platform-specific
identifiers in the chat log. Secret-looking and NCM-related setting keys are
dropped via :func:`_is_secret_setting_key` and
:func:`_settings_key_is_ncm_related`, NCM platforms are skipped via
:func:`_platform_type_is_ncm`, and structured values are summarized rather
than dumped. Reads only the passed config object; no Redis, KG, or network
access. Falls back to explicit placeholder text for a missing config, no
platform entries, an all-NCM set, or all-disabled platforms.
Called by :func:`render_kg_agentic_system_prompt` and
:func:`_system_prompt_cache_key` (both in this module) to inject and
fingerprint the platform context; no external callers.
Args:
cfg: Loaded application config (or None), inspected for ``platforms``
and legacy Matrix ``homeserver`` / ``user_id`` attributes.
Returns:
str: A markdown summary of enabled chat platforms, or an explanatory
placeholder string when no usable platform context exists.
"""
if cfg is None:
return (
"(No configuration passed — treat `platform` / `channel_id` / "
"`user_id` in logs as opaque platform-specific identifiers.)"
)
platforms = getattr(cfg, "platforms", None) or []
if not platforms:
lines_out = [
"(No `platforms:` entries in config.yaml — if legacy Matrix "
"fields exist, only the Matrix bot may be active.)",
]
hs = getattr(cfg, "homeserver", "") or ""
uid = getattr(cfg, "user_id", "") or ""
if hs:
lines_out.append(f"- Default Matrix homeserver URL: `{hs}`")
if uid:
lines_out.append(
f"- Legacy Matrix bot user id (format reference): `{uid}`",
)
return "\n".join(lines_out)
lines: list[str] = []
idx = 0
for p in platforms:
if not getattr(p, "enabled", True):
continue
ptype = getattr(p, "type", "") or "unknown"
if _platform_type_is_ncm(ptype):
continue
idx += 1
lines.append(f"### Platform #{idx}: `{ptype}`")
st = {
k: v
for k, v in (p.settings or {}).items()
if not _is_secret_setting_key(str(k))
and not _settings_key_is_ncm_related(str(k))
}
if ptype == "matrix":
hs = st.get("homeserver") or getattr(cfg, "homeserver", "")
if hs:
lines.append(f"- Matrix homeserver (public URL): `{hs}`")
bot_uid = st.get("user_id") or getattr(cfg, "user_id", "")
if bot_uid:
lines.append(
"- Matrix user ids in logs look like " "`@localpart:domain`."
)
lines.append(
f"- This deployment's bot Matrix id: `{bot_uid}`",
)
elif ptype in ("discord", "discord-self"):
lines.append(
"- Discord numeric ids in logs (`user_id`, `channel_id`, "
"etc.) are **snowflakes** (large integers as strings)."
)
for key in (
"application_id",
"guild_id",
"primary_guild_id",
"default_guild_id",
):
if key in st and st[key]:
lines.append(f"- `{key}`: `{st[key]}`")
skip_keys = {
"homeserver",
"user_id",
"store_path",
"credentials_file",
"password",
}
for k, v in sorted(st.items()):
if k in skip_keys:
continue
if isinstance(v, (dict, list)):
lines.append(
f"- `{k}`: _(structured value; omitted from prompt)_",
)
else:
lines.append(f"- `{k}`: `{v}`")
if not lines:
enabled = [x for x in platforms if getattr(x, "enabled", True)]
if enabled and all(
_platform_type_is_ncm(getattr(x, "type", "") or "") for x in enabled
):
return (
"(Only NCM / non-chat platform entries are configured — omitted "
"from KG extraction context; treat ids in logs as opaque.)"
)
return "(All platforms disabled in configuration.)"
return "\n".join(lines)
def _system_template_path() -> Path:
"""Return the filesystem path of the agentic-extraction system prompt template.
Joins :func:`_project_root` with ``prompts/kg_agentic_extraction_system.j2``.
Called by :func:`render_kg_agentic_system_prompt` (to load and render the
Jinja2 template) and by :func:`_system_prompt_cache_key` (to read its mtime
for cache invalidation); no external callers.
Returns:
Path: Absolute path to the system-prompt Jinja2 template.
"""
return _project_root() / "prompts" / "kg_agentic_extraction_system.j2"
[docs]
def render_kg_agentic_system_prompt(cfg: Any | None = None) -> str:
"""Render the agentic-extraction system prompt from its Jinja2 template.
Loads ``prompts/kg_agentic_extraction_system.j2`` (resolved via
:func:`_system_template_path`) in a sandboxed, non-autoescaping
:class:`~prompt_renderer.LoggingSandboxedEnvironment`, injecting the
secret-free ``platform_context`` produced by
:func:`build_platform_context_markdown` (passed through
:func:`~prompt_renderer.sanitize_context`). Reads the template file from disk;
if it is missing, logs a warning and returns a hard-coded fallback prompt with
the same platform context appended. No Redis, KG, or network access.
Called only by :func:`load_kg_agentic_system_prompt`, which memoizes the
result; no external callers.
Args:
cfg: Loaded application config (or None) forwarded to
:func:`build_platform_context_markdown`.
Returns:
str: The fully rendered system prompt text (or the fallback prompt when
the template file is absent).
"""
path = _system_template_path()
if not path.is_file():
logger.warning("Missing %s — using fallback system prompt", path)
return (
"You extract knowledge graphs from chat. Use tools to search "
"the graph before creating entities. Final message: JSON only.\n\n"
+ build_platform_context_markdown(cfg)
)
env = LoggingSandboxedEnvironment(
loader=FileSystemLoader(str(path.parent)),
autoescape=False,
template_name=path.name,
)
template = env.get_template(path.name)
ctx = sanitize_context(
{"platform_context": build_platform_context_markdown(cfg)},
)
return template.render(**ctx)
def _system_prompt_cache_key(cfg: Any | None) -> str:
"""Build a cache key fingerprinting the template, platform context, and hints.
Combines the template file's modification time with a SHA-256 digest of the
rendered platform-context markdown and any JSON-serialized
``kg_extraction_channel_hints`` from the config, so the cached system prompt
in :data:`_SYSTEM_PROMPT_CACHE` is invalidated whenever the template or the
config-derived inputs change.
Calls :func:`_system_template_path` (and tolerates a missing file via
:class:`OSError`, falling back to mtime ``0.0``) and
:func:`build_platform_context_markdown`. Called only by
:func:`load_kg_agentic_system_prompt`; no external callers.
Args:
cfg: Loaded application config (or None), inspected for
``platforms`` and ``kg_extraction_channel_hints``.
Returns:
str: A key of the form ``"{mtime:.6f}:{24-char-sha256-prefix}"``.
"""
path = _system_template_path()
try:
mtime = path.stat().st_mtime
except OSError:
mtime = 0.0
pc = build_platform_context_markdown(cfg)
hints = ""
if cfg is not None and hasattr(cfg, "kg_extraction_channel_hints"):
hints = json.dumps(
getattr(cfg, "kg_extraction_channel_hints", {}),
sort_keys=True,
)
h = hashlib.sha256(f"{pc}\n{hints}".encode()).hexdigest()[:24]
return f"{mtime:.6f}:{h}"
[docs]
def load_kg_agentic_system_prompt(config: Any | None = None) -> str:
"""Return the rendered system prompt, memoized by template mtime and config.
Computes a cache key via :func:`_system_prompt_cache_key` (template mtime
plus a hash of platform context and channel hints) and, on a miss, renders
and stores the prompt in the module-level :data:`_SYSTEM_PROMPT_CACHE`
dict via :func:`render_kg_agentic_system_prompt`. This avoids re-rendering the
Jinja2 template for every chunk while still picking up template or config
edits. Touches only that in-process cache; no Redis, KG, or network access.
Called by :func:`messages_for_agentic_token_estimate` and
:func:`run_agentic_kg_extraction_chunk` (both in this module) to assemble the
system message; no external callers.
Args:
config: Loaded application config (or None) used both as the cache
fingerprint input and for rendering.
Returns:
str: The rendered (and now cached) system prompt text.
"""
key = _system_prompt_cache_key(config)
if key not in _SYSTEM_PROMPT_CACHE:
_SYSTEM_PROMPT_CACHE[key] = render_kg_agentic_system_prompt(config)
return _SYSTEM_PROMPT_CACHE[key]
[docs]
def augment_system_prompt_with_speaker_mapping(
system: str,
speaker_pairs: list[tuple[str, str]] | None,
) -> str:
"""Append a per-chunk ``user_id`` to display-name table to the system prompt.
Builds a markdown mapping block via
:func:`format_speaker_user_id_mapping_markdown` and concatenates it onto the
rendered system prompt so the model can resolve sender ids in the log lines.
Returns the prompt unchanged when no speakers are supplied (empty block).
Called internally by both :func:`messages_for_agentic_token_estimate` and
:func:`run_agentic_kg_extraction_chunk` to assemble the final system message;
no external callers.
Args:
system: The base rendered system prompt text.
speaker_pairs: List of ``(user_id, display_name)`` tuples for the chunk,
or None.
Returns:
str: The system prompt with the speaker-mapping block appended, or the
original ``system`` when there is nothing to add.
"""
block = format_speaker_user_id_mapping_markdown(list(speaker_pairs or []))
if not block.strip():
return system
return system.rstrip() + "\n\n" + block + "\n"
[docs]
async def prefetch_speaker_kg_context(
kg: Any,
speakers: list[tuple[str, str]],
*,
max_speakers: int = 8,
hits_per_speaker: int = 3,
min_score: float = 0.0,
) -> str:
"""Prefetch existing KG entities for a chunk's speakers as user-prompt context.
For each unique speaker (capped at ``max_speakers``) runs two vector
searches on the knowledge graph via ``kg.search_entities`` — one scoped to
the speaker's ``user`` category and id, and one general query over name and
id — then deduplicates hits by uuid, drops anything below ``min_score``, and
formats the survivors into a markdown block. This seeds the model with likely
already-known entities so it reuses them instead of creating duplicates. It
reads the KG only (FalkorDB/vector index behind
:class:`~knowledge_graph.KnowledgeGraphManager`) and never mutates it; each
search is wrapped so a failure is logged at debug and skipped rather than
raised. Returns an empty string when there are no speakers or no surviving
hits.
Called by ``kg_bulk_runner.run_agentic_kg_bulk`` (in ``kg_bulk_runner.py``)
when speaker prefetch is enabled; the result is passed through as the
``speaker_kg_prefetch`` argument to the user-message builder.
Args:
kg: A :class:`~knowledge_graph.KnowledgeGraphManager` (typed ``Any``)
exposing an async ``search_entities`` method.
speakers: ``(user_id, display_name)`` tuples for the chunk.
max_speakers: Maximum distinct speakers to prefetch, clamped to 1-128.
hits_per_speaker: Vector hits requested per search, clamped to 1-48.
min_score: Minimum similarity score a hit must meet to be included.
Returns:
str: A markdown ``Existing knowledge graph (speakers — prefetch)`` block,
or an empty string when nothing relevant was found.
"""
if not speakers:
return ""
seen_uuid: set[str] = set()
blocks: list[str] = []
cap = max(1, min(128, int(max_speakers)))
hits = max(1, min(48, int(hits_per_speaker)))
uniq_speakers: dict[str, str] = {}
for uid, name in speakers:
u = (uid or "").strip()
if not u:
continue
uniq_speakers.setdefault(u, (name or "").strip() or "?")
for uid in sorted(uniq_speakers.keys())[:cap]:
name = uniq_speakers[uid]
hits_list: list[dict[str, Any]] = []
try:
by_scope = await kg.search_entities(
name,
category="user",
scope_id=uid,
top_k=hits,
)
hits_list.extend(by_scope or [])
except Exception:
logger.debug("prefetch user scope search failed", exc_info=True)
try:
general = await kg.search_entities(
f"{name} {uid}",
top_k=hits,
)
hits_list.extend(general or [])
except Exception:
logger.debug("prefetch general search failed", exc_info=True)
rows: list[str] = []
for ent in hits_list:
sc = float(ent.get("score") or 0.0)
if sc < min_score:
continue
uu = str(ent.get("uuid") or "")
if not uu or uu in seen_uuid:
continue
seen_uuid.add(uu)
nm = str(ent.get("name") or "")
et = str(ent.get("type") or "")
desc = str(ent.get("description") or "")
cat = str(ent.get("category") or "")
sid = str(ent.get("scope_id") or "")
rows.append(
f" - `{nm}` ({et}, cat={cat}, scope={sid}, "
f"score={sc:.3f}, uuid={uu})" + (f": {desc}" if desc else ""),
)
if rows:
blocks.append(f"**Speaker `{uid}` ({name})**:\n" + "\n".join(rows))
if not blocks:
return ""
header = (
"## Existing knowledge graph (speakers — prefetch)\n"
"_Heuristic vector matches only; may be incomplete or noisy — "
"use kg_search_entities / kg_get_entity to verify._\n\n"
)
body = "\n\n".join(blocks)
return header + body
[docs]
def build_kg_bulk_user_message(
conversation_text: str,
*,
channel_id: str,
chunk_index: int,
time_start_iso: str = "",
time_end_iso: str = "",
platforms_channels: str = "",
config: Any | None = None,
chunk_channel_pairs: list[tuple[str, str]] | None = None,
chunk_speaker_pairs: list[tuple[str, str]] | None = None,
speaker_kg_prefetch: str = "",
channel_metadata: dict[str, dict[str, str]] | None = None,
) -> str:
"""Assemble the full user message for one agentic extraction chunk.
Concatenates chunk metadata (index, default channel scope, UTC time range,
and optional corpus-wide platform/channel list), the channel section from
:func:`format_chunk_channels_section`, the speaker section from
:func:`format_chunk_speakers_section`, any speaker-KG prefetch text, the
chronological conversation, and a final instruction to emit the JSON
extraction. This is the single source of truth for the user turn so token
estimation and the real run see the same shape. Pure string assembly with no
Redis, KG, or network access.
Called by :func:`messages_for_agentic_token_estimate` and
:func:`run_agentic_kg_extraction_chunk` (both in this module); no external
callers.
Args:
conversation_text: The chronological chat transcript for the chunk.
channel_id: Default channel scope id for scoped facts.
chunk_index: Zero-based index of this chunk.
time_start_iso: ISO-8601 start of the chunk's time range (optional).
time_end_iso: ISO-8601 end of the chunk's time range (optional).
platforms_channels: Optional summary of all platforms/channels in the
full corpus.
config: Loaded application config (or None) forwarded to the section
builders.
chunk_channel_pairs: ``(platform, channel_id)`` pairs in the chunk.
chunk_speaker_pairs: ``(user_id, display_name)`` pairs in the chunk.
speaker_kg_prefetch: Optional prefetched speaker-KG markdown block.
channel_metadata: Optional per-channel name/topic metadata map.
Returns:
str: The assembled user message, joined with newlines.
"""
pairs = sorted(set(chunk_channel_pairs or []))
chan_block = format_chunk_channels_section(
pairs,
config,
channel_id,
channel_metadata=channel_metadata,
)
sp_block = format_chunk_speakers_section(
list(chunk_speaker_pairs or []),
)
lines = [
"## Chunk metadata",
f"- chunk_index: {chunk_index}",
f"- default_channel_scope_id: {channel_id}",
f"- time_range_utc: {time_start_iso} → {time_end_iso}",
]
if platforms_channels:
lines.append(
f"- all platforms/channels in full corpus: {platforms_channels}",
)
lines.extend(
[
"",
chan_block,
"",
sp_block,
]
)
if (speaker_kg_prefetch or "").strip():
lines.extend(["", (speaker_kg_prefetch or "").strip()])
lines.extend(
[
"",
"## Conversation (chronological)",
conversation_text,
"",
"Now produce the final JSON extraction per system instructions.",
]
)
return "\n".join(lines)
[docs]
def messages_for_agentic_token_estimate(
conversation_text: str,
*,
channel_id: str,
chunk_index: int = 0,
time_start_iso: str = "",
time_end_iso: str = "",
platforms_channels: str = "",
config: Any | None = None,
chunk_channel_pairs: list[tuple[str, str]] | None = None,
chunk_speaker_pairs: list[tuple[str, str]] | None = None,
speaker_kg_prefetch: str = "",
channel_metadata: dict[str, dict[str, str]] | None = None,
) -> list[dict[str, str]]:
"""Build the system + user message pair used to pre-count input tokens.
Produces the same OpenAI-style ``[system, user]`` message list that a real
extraction run sends — the system prompt from
:func:`load_kg_agentic_system_prompt` augmented with the speaker mapping via
:func:`augment_system_prompt_with_speaker_mapping`, and the user message from
:func:`build_kg_bulk_user_message` — so a token count over these messages
accurately reflects the run. Pure assembly with no Redis, KG, or network
access (it does not itself call ``countTokens``).
Called by ``kg_bulk_runner`` (in ``kg_bulk_runner.py``) to size chunks before
dispatching them to the bulk client's token counter; no other callers.
Args:
conversation_text: The chronological chat transcript for the chunk.
channel_id: Default channel scope id for scoped facts.
chunk_index: Zero-based index of this chunk (default 0).
time_start_iso: ISO-8601 start of the chunk's time range (optional).
time_end_iso: ISO-8601 end of the chunk's time range (optional).
platforms_channels: Optional corpus-wide platform/channel summary.
config: Loaded application config (or None) forwarded to the builders.
chunk_channel_pairs: ``(platform, channel_id)`` pairs in the chunk.
chunk_speaker_pairs: ``(user_id, display_name)`` pairs in the chunk.
speaker_kg_prefetch: Optional prefetched speaker-KG markdown block.
channel_metadata: Optional per-channel name/topic metadata map.
Returns:
list[dict[str, str]]: A two-element ``[system, user]`` message list.
"""
system = augment_system_prompt_with_speaker_mapping(
load_kg_agentic_system_prompt(config),
chunk_speaker_pairs,
)
user = build_kg_bulk_user_message(
conversation_text,
channel_id=channel_id,
chunk_index=chunk_index,
time_start_iso=time_start_iso,
time_end_iso=time_end_iso,
platforms_channels=platforms_channels,
config=config,
chunk_channel_pairs=chunk_channel_pairs,
chunk_speaker_pairs=chunk_speaker_pairs,
speaker_kg_prefetch=speaker_kg_prefetch,
channel_metadata=channel_metadata,
)
return [
{"role": "system", "content": system},
{"role": "user", "content": user},
]
[docs]
def kg_bulk_native_model_id() -> str:
"""Return the native Gemini model id for the bulk chat model.
Strips the OpenRouter-style ``google/`` vendor prefix off
:data:`KG_BULK_CHAT_MODEL` so the native Gemini client (which expects a bare
model id) can be configured from the same constant the OpenRouter path uses.
Pure string transform with no side effects.
Called by :func:`create_kg_bulk_gemini_pool_client` (in this module) to set
the pool client's ``model_id``; no external callers.
Returns:
str: The bulk chat model id without the ``google/`` prefix.
"""
return KG_BULK_CHAT_MODEL.removeprefix("google/")
[docs]
def create_kg_bulk_gemini_pool_client(
*,
max_tool_rounds: int = 48,
max_tokens: int = 60_000,
max_tool_output_chars: int = 3_000_000,
temperature: float = 0.25,
) -> GeminiPoolToolChatClient:
"""Construct a native-Gemini bulk chat client backed by the embed-pool keys.
Lazily imports and instantiates
:class:`~gemini_kg_bulk_client.GeminiPoolToolChatClient`, wiring it to the
read-only KG tool registry from :func:`build_kg_bulk_tool_registry`, the bare
model id from :func:`kg_bulk_native_model_id`, and the supplied generation
limits. This client talks to Gemini directly over the shared embedding-pool
API keys and uses native automatic function calling for the tool loop; the
underlying network/session resources are established by the client, not here.
Called by ``kg_bulk_runner`` (in ``kg_bulk_runner.py``) to create both the
token-counter client (with ``max_tool_rounds=1``) and the main bulk client;
no other callers.
Args:
max_tool_rounds: Maximum tool-call rounds per chat turn (default 48).
max_tokens: Maximum output tokens per turn (default 60000).
max_tool_output_chars: Cap on characters of tool output fed back to the
model (default 3000000).
temperature: Sampling temperature (default 0.25).
Returns:
GeminiPoolToolChatClient: A configured native-Gemini bulk chat client.
"""
from gemini_kg_bulk_client import GeminiPoolToolChatClient
return GeminiPoolToolChatClient(
tool_registry=build_kg_bulk_tool_registry(),
model_id=kg_bulk_native_model_id(),
max_tool_rounds=max_tool_rounds,
max_tokens=max_tokens,
max_tool_output_chars=max_tool_output_chars,
temperature=temperature,
)
[docs]
def create_kg_bulk_openrouter_client(
api_key: str,
*,
gemini_api_key: str = "",
max_tool_rounds: int = 48,
max_tokens: int = 60_000,
max_tool_output_chars: int = 3_000_000,
temperature: float = 0.25,
top_p: float = 0.99,
) -> OpenRouterClient:
"""Construct an OpenRouter-backed bulk chat client for KG extraction.
Instantiates :class:`~openrouter_client.OpenRouterClient` against the
OpenRouter base URL :data:`KG_BULK_OPENROUTER_BASE` and model
:data:`KG_BULK_CHAT_MODEL` (gemini-3.1-flash-lite), wiring in the read-only KG
tool registry from :func:`build_kg_bulk_tool_registry` plus the supplied
generation and tool-loop limits. The client drives the KG tool loop over
OpenRouter HTTP; an optional ``gemini_api_key`` lets it use native Gemini
token counting. Constructing the client does not itself open a connection.
Called by ``kg_bulk_runner`` (in ``kg_bulk_runner.py``) to create both the
token-counter client (with ``max_tool_rounds=1``) and the main bulk client
when the OpenRouter backend is selected; no other callers.
Args:
api_key: OpenRouter API key for authentication.
gemini_api_key: Optional Gemini API key enabling native token counting.
max_tool_rounds: Maximum tool-call rounds per chat turn (default 48).
max_tokens: Maximum output tokens per turn (default 60000).
max_tool_output_chars: Cap on characters of tool output fed back to the
model (default 3000000).
temperature: Sampling temperature (default 0.25).
top_p: Nucleus-sampling top-p (default 0.99).
Returns:
OpenRouterClient: A configured OpenRouter bulk chat client.
"""
return OpenRouterClient(
api_key=api_key,
model=KG_BULK_CHAT_MODEL,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
tool_registry=build_kg_bulk_tool_registry(),
max_tool_rounds=max_tool_rounds,
base_url=KG_BULK_OPENROUTER_BASE,
gemini_api_key=gemini_api_key,
max_tool_output_chars=max_tool_output_chars,
)