gemini_kg_bulk_client

Native Gemini (google.genai) client for bulk agentic KG extraction.

Uses gemini_embed_pool.next_gemini_flash_key() for API key rotation, client.aio for async I/O, and automatic function calling (AFC) with thin async callables that delegate to tools.ToolRegistry.call().

gemini_kg_bulk_client.openai_messages_to_gemini(messages)[source]

Split system instruction and build Gemini contents (no system in contents).

Translates an OpenAI-style chat history into the two-part shape the google-genai SDK expects: a single concatenated system instruction (peeled off only from leading system turns, since the Gemini developer API takes system text out of band rather than inside contents) and a list of google.genai.types.Content turns for the user/assistant/tool messages. Assistant tool_calls become types.FunctionCall parts (arguments JSON-decoded via the module-aliased jsonutil, imported as json) and tool results become types.FunctionResponse parts, so an OpenAI-shaped transcript round-trips into Gemini’s function-calling format. Pure transformation with no I/O; relies on _openai_message_text() to flatten message content to plain text.

Called by GeminiPoolToolChatClient.chat() (to build the contents sent to generate_content) and by GeminiPoolToolChatClient.count_input_tokens() (before reshaping via _contents_for_count_tokens()).

Parameters:

messages (list[dict[str, Any]]) – OpenAI-style chat messages with role/content (and, for assistant turns, optional tool_calls; for tool turns, name).

Returns:

The merged system instruction (None when no leading system text exists) and the Gemini contents list (empty when there are no non-system turns).

Return type:

tuple[str | None, list[Content]]

class gemini_kg_bulk_client.GeminiPoolToolChatClient(*, tool_registry, model_id, max_tool_rounds=48, max_tokens=60000, max_tool_output_chars=3000000, temperature=0.25)[source]

Bases: object

Async Gemini chat with pool keys, countTokens, and AFC tool execution.

Drop-in chat client for bulk knowledge-graph extraction that talks to the native Gemini API (google.genai) instead of OpenRouter, exposing the same chat/count_input_tokens/close surface as the pooled OpenRouter client so kg_agentic_extraction can swap backends transparently. Each request constructs a short-lived google.genai.Client keyed by a rotating pool key from gemini_embed_pool, runs generate_content with automatic function calling so Gemini drives the KG read tools itself (delegated to tools.ToolRegistry.call() via the wrappers built in _afc_tool_fns()), and tears the client down through _safe_aclose(). The KG/FalkorDB side effects are produced indirectly by those tools; this class itself holds no long-lived session.

Instances are built by kg_agentic_extraction.create_kg_bulk_gemini_pool_client(), which passes the bulk tool registry and native Gemini model id.

Parameters:
__init__(*, tool_registry, model_id, max_tool_rounds=48, max_tokens=60000, max_tool_output_chars=3000000, temperature=0.25)[source]

Store the tool registry, model id, and generation/AFC limits.

Holds configuration only; no network/SDK clients are created here (each request constructs a short-lived genai.Client). Instances are built by kg_agentic_extraction.create_kg_bulk_gemini_pool_client(), which supplies the KG bulk tool registry and the native Gemini model id.

Parameters:
  • tool_registry (ToolRegistry) – Registry whose tools the AFC wrappers delegate to.

  • model_id (str) – Native Gemini model id (no google/ prefix).

  • max_tool_rounds (int) – Cap on AFC remote function-call rounds per request.

  • max_tokens (int) – max_output_tokens for generation.

  • max_tool_output_chars (int) – Per-call character cap for tool output.

  • temperature (float) – Sampling temperature for generation.

Return type:

None

async close()[source]

No-op async close for interface parity with the OpenRouter client.

This client holds no long-lived session (genai clients are created and closed per request via _safe_aclose()), so there is nothing to tear down. Provided so callers can treat it like the pooled OpenRouter client; invoked during cleanup in kg_bulk_runner (e.g. the bulk_client.close() / token_counter.close() calls).

Return type:

None

async count_input_tokens(messages, *, gemini_model=None)[source]

Count Gemini input tokens for OpenAI-style messages via countTokens.

Converts messages into a system instruction plus Gemini contents using openai_messages_to_gemini(), then reshapes them with _contents_for_count_tokens() (the developer API rejects system_instruction on countTokens, so the system text is folded into the opening turn). It rotates pool keys via gemini_embed_pool.next_gemini_flash_key(), escalating to gemini_embed_pool.get_paid_fallback_key() on the final two attempts, and issues client.aio.models.count_tokens against a short-lived genai.Client that is always released through _safe_aclose(). On a 429 attributable to a daily-quota violation (per _error_body_daily_quota()) the spent key is marked with gemini_embed_pool.mark_key_daily_spent(); retryable codes (429/500/502/503/504) back off and retry while other APIError codes and unexpected exceptions abort with None.

Called by chat() (when its token_count is None) to fill the system prompt’s __INPUT_TOKEN_COUNT__ placeholder, and by kg_bulk_runner as a standalone token counter.

Parameters:
  • messages (list[dict[str, Any]]) – OpenAI-style chat messages to measure.

  • gemini_model (str | None) – Optional model id override; defaults to self.model_id.

Returns:

The total input-token count, or None when there are no non-system contents, the API returns a non-integer total, or every key attempt is exhausted without success.

Return type:

int | None

async chat(messages, user_id='', ctx=None, tool_names=None, validate_header=False, token_count=None, on_intermediate_text=None)[source]

Run one bulk agentic generation with Gemini automatic function calling.

Converts OpenAI-style messages to Gemini contents plus a system instruction, substitutes the resolved input-token count into the system prompt’s __INPUT_TOKEN_COUNT__ placeholder, and issues a single generate_content call with AFC enabled so Gemini can drive the KG read tools itself (up to max_tool_rounds rounds). The model’s final text is returned. validate_header and on_intermediate_text are accepted for interface parity with the OpenRouter client but unused on this bulk path.

Calls count_input_tokens() (when token_count is None), openai_messages_to_gemini(), and _afc_tool_fns() to build the tool callables, then rotates pool keys via gemini_embed_pool.next_gemini_flash_key() (escalating to gemini_embed_pool.get_paid_fallback_key() on the last two attempts) across a short-lived genai.Client per attempt, each closed with _safe_aclose(). On a 429 caused by a daily-quota violation (per _error_body_daily_quota()) it marks the key spent via gemini_embed_pool.mark_key_daily_spent(); retryable codes (429/500/502/503/504) trigger backoff, other APIError codes re-raise immediately. The KG side effects are produced indirectly by the tools invoked through AFC.

Called by kg_agentic_extraction.run_agentic_kg_extraction_chunk() for each conversation chunk during bulk KG extraction.

Parameters:
  • messages (list[dict[str, Any]]) – OpenAI-style chat messages (system/user/assistant/tool).

  • user_id (str) – Identity threaded into AFC tool calls.

  • ctx (ToolContext | None) – Optional tool_context.ToolContext for tool execution (carries the KG manager, channel id, etc.).

  • tool_names (list[str] | None) – Optional allow-list restricting which KG tools are exposed to AFC.

  • validate_header (bool) – Ignored on this bulk path (interface parity).

  • token_count (int | None) – Precomputed input-token count; counted on demand when None.

  • on_intermediate_text (Optional[Callable[[str], Awaitable[None]]]) – Ignored on this bulk path (interface parity).

Returns:

The model’s final response text, or "" when there are no non-system contents to send.

Return type:

str

Raises:
  • APIError – Propagated for non-retryable Gemini API error codes.

  • RuntimeError – When all _MAX_KEY_ATTEMPTS attempts are exhausted without a successful response.