gemini_kg_bulk_client
Native Gemini (google.genai) client for bulk agentic KG extraction.
Uses gemini_embed_pool.next_gemini_flash_key() for API key rotation,
client.aio for async I/O, and automatic function calling (AFC) with
thin async callables that delegate to tools.ToolRegistry.call().
- gemini_kg_bulk_client.openai_messages_to_gemini(messages)[source]
Split system instruction and build Gemini
contents(no system in contents).Translates an OpenAI-style chat history into the two-part shape the google-genai SDK expects: a single concatenated system instruction (peeled off only from leading
systemturns, since the Gemini developer API takes system text out of band rather than insidecontents) and a list ofgoogle.genai.types.Contentturns for the user/assistant/tool messages. Assistanttool_callsbecometypes.FunctionCallparts (arguments JSON-decoded via the module-aliasedjsonutil, imported asjson) andtoolresults becometypes.FunctionResponseparts, so an OpenAI-shaped transcript round-trips into Gemini’s function-calling format. Pure transformation with no I/O; relies on_openai_message_text()to flatten message content to plain text.Called by
GeminiPoolToolChatClient.chat()(to build the contents sent togenerate_content) and byGeminiPoolToolChatClient.count_input_tokens()(before reshaping via_contents_for_count_tokens()).- Parameters:
messages (
list[dict[str,Any]]) – OpenAI-style chat messages withrole/content(and, for assistant turns, optionaltool_calls; for tool turns,name).- Returns:
The merged system instruction (
Nonewhen no leading system text exists) and the Geminicontentslist (empty when there are no non-system turns).- Return type:
- class gemini_kg_bulk_client.GeminiPoolToolChatClient(*, tool_registry, model_id, max_tool_rounds=48, max_tokens=60000, max_tool_output_chars=3000000, temperature=0.25)[source]
Bases:
objectAsync Gemini chat with pool keys, countTokens, and AFC tool execution.
Drop-in chat client for bulk knowledge-graph extraction that talks to the native Gemini API (
google.genai) instead of OpenRouter, exposing the samechat/count_input_tokens/closesurface as the pooled OpenRouter client sokg_agentic_extractioncan swap backends transparently. Each request constructs a short-livedgoogle.genai.Clientkeyed by a rotating pool key fromgemini_embed_pool, runsgenerate_contentwith automatic function calling so Gemini drives the KG read tools itself (delegated totools.ToolRegistry.call()via the wrappers built in_afc_tool_fns()), and tears the client down through_safe_aclose(). The KG/FalkorDB side effects are produced indirectly by those tools; this class itself holds no long-lived session.Instances are built by
kg_agentic_extraction.create_kg_bulk_gemini_pool_client(), which passes the bulk tool registry and native Gemini model id.- Parameters:
- __init__(*, tool_registry, model_id, max_tool_rounds=48, max_tokens=60000, max_tool_output_chars=3000000, temperature=0.25)[source]
Store the tool registry, model id, and generation/AFC limits.
Holds configuration only; no network/SDK clients are created here (each request constructs a short-lived
genai.Client). Instances are built bykg_agentic_extraction.create_kg_bulk_gemini_pool_client(), which supplies the KG bulk tool registry and the native Gemini model id.- Parameters:
tool_registry (
ToolRegistry) – Registry whose tools the AFC wrappers delegate to.model_id (
str) – Native Gemini model id (nogoogle/prefix).max_tool_rounds (
int) – Cap on AFC remote function-call rounds per request.max_tokens (
int) –max_output_tokensfor generation.max_tool_output_chars (
int) – Per-call character cap for tool output.temperature (
float) – Sampling temperature for generation.
- Return type:
None
- async close()[source]
No-op async close for interface parity with the OpenRouter client.
This client holds no long-lived session (genai clients are created and closed per request via
_safe_aclose()), so there is nothing to tear down. Provided so callers can treat it like the pooled OpenRouter client; invoked during cleanup inkg_bulk_runner(e.g. thebulk_client.close()/token_counter.close()calls).- Return type:
- async count_input_tokens(messages, *, gemini_model=None)[source]
Count Gemini input tokens for OpenAI-style messages via countTokens.
Converts messages into a system instruction plus Gemini
contentsusingopenai_messages_to_gemini(), then reshapes them with_contents_for_count_tokens()(the developer API rejectssystem_instructiononcountTokens, so the system text is folded into the opening turn). It rotates pool keys viagemini_embed_pool.next_gemini_flash_key(), escalating togemini_embed_pool.get_paid_fallback_key()on the final two attempts, and issuesclient.aio.models.count_tokensagainst a short-livedgenai.Clientthat is always released through_safe_aclose(). On a 429 attributable to a daily-quota violation (per_error_body_daily_quota()) the spent key is marked withgemini_embed_pool.mark_key_daily_spent(); retryable codes (429/500/502/503/504) back off and retry while otherAPIErrorcodes and unexpected exceptions abort withNone.Called by
chat()(when itstoken_countisNone) to fill the system prompt’s__INPUT_TOKEN_COUNT__placeholder, and bykg_bulk_runneras a standalone token counter.- Parameters:
- Returns:
The total input-token count, or
Nonewhen there are no non-system contents, the API returns a non-integer total, or every key attempt is exhausted without success.- Return type:
- async chat(messages, user_id='', ctx=None, tool_names=None, validate_header=False, token_count=None, on_intermediate_text=None)[source]
Run one bulk agentic generation with Gemini automatic function calling.
Converts OpenAI-style messages to Gemini
contentsplus a system instruction, substitutes the resolved input-token count into the system prompt’s__INPUT_TOKEN_COUNT__placeholder, and issues a singlegenerate_contentcall with AFC enabled so Gemini can drive the KG read tools itself (up tomax_tool_roundsrounds). The model’s final text is returned. validate_header and on_intermediate_text are accepted for interface parity with the OpenRouter client but unused on this bulk path.Calls
count_input_tokens()(when token_count isNone),openai_messages_to_gemini(), and_afc_tool_fns()to build the tool callables, then rotates pool keys viagemini_embed_pool.next_gemini_flash_key()(escalating togemini_embed_pool.get_paid_fallback_key()on the last two attempts) across a short-livedgenai.Clientper attempt, each closed with_safe_aclose(). On a 429 caused by a daily-quota violation (per_error_body_daily_quota()) it marks the key spent viagemini_embed_pool.mark_key_daily_spent(); retryable codes (429/500/502/503/504) trigger backoff, otherAPIErrorcodes re-raise immediately. The KG side effects are produced indirectly by the tools invoked through AFC.Called by
kg_agentic_extraction.run_agentic_kg_extraction_chunk()for each conversation chunk during bulk KG extraction.- Parameters:
messages (
list[dict[str,Any]]) – OpenAI-style chat messages (system/user/assistant/tool).user_id (
str) – Identity threaded into AFC tool calls.ctx (
ToolContext|None) – Optionaltool_context.ToolContextfor tool execution (carries the KG manager, channel id, etc.).tool_names (
list[str] |None) – Optional allow-list restricting which KG tools are exposed to AFC.validate_header (
bool) – Ignored on this bulk path (interface parity).token_count (
int|None) – Precomputed input-token count; counted on demand whenNone.on_intermediate_text (
Optional[Callable[[str],Awaitable[None]]]) – Ignored on this bulk path (interface parity).
- Returns:
The model’s final response text, or
""when there are no non-system contents to send.- Return type:
- Raises:
APIError – Propagated for non-retryable Gemini API error codes.
RuntimeError – When all
_MAX_KEY_ATTEMPTSattempts are exhausted without a successful response.