Source code for classifiers.build_tool_index

#!/usr/bin/env python3
"""Build the tool index used by the vector classifier.

Auto-discovers every registered tool via :mod:`tool_loader`, then
calls an LLM to generate 50 diverse synthetic user queries per tool
(reverse-HyDE).  Results are saved to ``tool_index_data.json`` in
this directory.

Usage::

    python -m classifiers.build_tool_index [--tools-dir tools]
"""

from __future__ import annotations

import asyncio
import argparse
import jsonutil as json
import logging
import os
import re
import sys
from typing import Any

import httpx

# Ensure the project root is importable when running as a script.
_PROJECT_ROOT = os.path.abspath(
    os.path.join(os.path.dirname(__file__), ".."),
)
sys.path.insert(0, _PROJECT_ROOT)

try:
    from dotenv import load_dotenv

    load_dotenv(os.path.join(_PROJECT_ROOT, ".env"))
except ImportError:
    pass

from tools import ToolRegistry  # noqa: E402
from tool_loader import load_tools  # noqa: E402

logging.basicConfig(
    level=logging.INFO,
    format="%(levelname)s: %(message)s",
)
logger = logging.getLogger(__name__)

OUTPUT_FILE = os.path.join(
    os.path.dirname(__file__),
    "tool_index_data.json",
)

SYNTHETIC_QUERY_COUNT = 50

# Gemini synthetic-query generation (used by build_index, refresh_tool_embeddings,
# update_tool_embeddings). Tune via env vars.
_QUERY_GEN_MAX_ATTEMPTS = int(
    os.environ.get("GEMINI_QUERY_GEN_MAX_ATTEMPTS", "25"),
)
_QUERY_GEN_BACKOFF_BASE_S = float(
    os.environ.get("GEMINI_QUERY_GEN_BACKOFF_BASE", "0.5"),
)
_QUERY_GEN_BACKOFF_MAX_S = float(
    os.environ.get("GEMINI_QUERY_GEN_BACKOFF_MAX", "120.0"),
)
_QUERY_GEN_PAID_AFTER_FAILURES = int(
    os.environ.get("GEMINI_QUERY_GEN_PAID_AFTER_FAILURES", "5"),
)
_QUERY_GEN_MAX_OUTPUT_TOKENS = int(
    os.environ.get("GEMINI_QUERY_GEN_MAX_OUTPUT_TOKENS", "30000"),
)
# Per-model retries when Gemini returns 429 (exponential sleep before re-POST).
_QUERY_GEN_429_MAX_RETRIES = int(
    os.environ.get("GEMINI_QUERY_GEN_429_MAX_RETRIES", "8"),
)
_QUERY_GEN_429_BACKOFF_BASE_S = float(
    os.environ.get("GEMINI_QUERY_GEN_429_BACKOFF_BASE", "2.0"),
)
_QUERY_GEN_429_BACKOFF_MAX_S = float(
    os.environ.get("GEMINI_QUERY_GEN_429_BACKOFF_MAX", "120.0"),
)
# Same Gemini/OpenRouter model: on invalid JSON, empty body, or too few queries — re-POST immediately
# until success or this cap (then try next model / outer round).
_JSON_PARSE_MAX_RETRIES_SAME_MODEL = int(
    os.environ.get("GEMINI_QUERY_GEN_JSON_PARSE_MAX_RETRIES_SAME_MODEL", "100"),
)
# OpenRouter 429 retries for query-gen fallback only.
_OPENROUTER_QUERY_GEN_429_MAX_RETRIES = int(
    os.environ.get("OPENROUTER_QUERY_GEN_429_MAX_RETRIES", "6"),
)
_OPENROUTER_QUERY_GEN_429_BACKOFF_BASE_S = float(
    os.environ.get("OPENROUTER_QUERY_GEN_429_BACKOFF_BASE", "2.0"),
)
_OPENROUTER_QUERY_GEN_429_BACKOFF_MAX_S = float(
    os.environ.get("OPENROUTER_QUERY_GEN_429_BACKOFF_MAX", "120.0"),
)


[docs] def discover_invalid_query_index_tools( index_data: dict[str, Any], registered: dict[str, Any], *, expected_count: int = SYNTHETIC_QUERY_COUNT, ) -> list[str]: """Return tool names whose stored index entry cannot drive embeddings. Invalid: missing key, non-dict value, ``synthetic_queries`` missing or not a list, fewer than *expected_count* items, or any of the first *expected_count* entries not a non-empty string (after strip). Used by :mod:`classifiers.refresh_tool_embeddings` to redo query generation only where ``tool_index_data.json`` is incomplete or malformed (e.g. failed or partial LLM JSON). """ bad: set[str] = set() for name in registered: if name not in index_data: bad.add(name) continue info = index_data[name] if not isinstance(info, dict): bad.add(name) continue qs = info.get("synthetic_queries") if not isinstance(qs, list): bad.add(name) continue if len(qs) < expected_count: bad.add(name) continue for q in qs[:expected_count]: if not isinstance(q, str) or not q.strip(): bad.add(name) break return sorted(bad)
REVERSE_HYDE_PROMPT = """\ You are an expert prompt engineer building a semantic search \ index for an AI assistant's tool-calling system. ## Tool under analysis - **Name:** {tool_name} - **Description:** {tool_description} ## Your task Generate exactly {count} synthetic user messages that a human \ would write in a chat with an AI assistant when they *need* \ this tool -- even if they don't know the tool exists. The \ messages must be realistic, varied, and collectively span the \ full semantic surface area of the tool. **Single response:** emit **exactly** {count} query strings in **one** JSON \ object in this turn only. The `"queries"` array must have length **exactly** \ {count} — no truncation, no follow-ups, no batching across calls. Distribute the {count} messages across the following \ categories. Exact counts per category are guidelines -- some \ tools may naturally skew toward certain categories -- but \ every category MUST have at least one entry. ### Categories 1. **Direct commands** (~5): Explicit imperative sentences. "Run ...", "Execute ...", "Do ...", "Set up ..." 2. **Natural-language requests** (~8): Polite or \ conversational requests that clearly imply the tool. "Could you ...", "I'd like you to ...", "Please ..." 3. **Vague / ambiguous intents** (~8): The user's goal is \ clear but no specific tool is named; this tool is one \ plausible match. 4. **Questions** (~5): "Can you ...?", "Is it possible \ to ...?", "How do I ...?" 5. **Contextual / embedded** (~5): The request is buried \ inside a longer message giving background. "I was working on X earlier and now I need ..." 6. **Multi-step scenarios** (~5): The user describes a \ workflow where this tool is one step. "First do A, then B, and finally ..." 7. **Error / frustration** (~4): The user is stuck or \ something broke and they need this tool to fix it. "X isn't working ...", "I keep getting ...", \ "Help, I can't ..." 8. **Follow-up / conversational** (~5): Short messages \ that only make sense in context. "Actually, also ...", "And then ...", \ "One more thing -- ..." 9. **Domain-specific jargon** (~5): Technical or domain \ language that implies the tool without naming it. ## Constraints - Do NOT mention the tool name in more than 5 of the {count} \ messages. - Focus on *user intent*, not tool internals. - Vary sentence length (short single-clause to \ multi-sentence). - Include at least 5 messages that are <= 8 words. - Return a JSON object with a single key `"queries"` whose \ value is the array of strings. No markdown fences, no \ extra keys, no explanation. Example (abbreviated): {{"queries": ["find the file", \ "search for documents matching sales Q3", \ "where is that report I uploaded yesterday"]}} """ _GEMINI_GENERATE_BASE = "https://generativelanguage.googleapis.com/v1beta/models" _DEFAULT_GENERATE_MODEL = "gemini-3.1-flash-lite" # OpenRouter chat model id fragment for ``google/<id>`` when # :func:`generate_synthetic_queries` is called with ``openrouter_only=True`` # (skips Gemini ``generateContent`` entirely). _OPENROUTER_ONLY_QUERY_MODEL = "gemini-3.1-flash-lite" # Additional Gemini model ids to try (after *primary*) when generateContent fails # (e.g. 429). Used only for synthetic query generation, not embeddings. _GEMINI_QUERY_FALLBACK_MODELS = ( "gemini-2.5-flash", "gemini-3-flash-preview", ) _OPENROUTER_CHAT_URL = "https://openrouter.ai/api/v1/chat/completions" def _gemini_query_model_order(primary: str) -> list[str]: """Build the ordered, deduped list of Gemini models to try for query generation. Returns the caller-chosen *primary* model first, followed by the module fallbacks in ``_GEMINI_QUERY_FALLBACK_MODELS``, dropping any duplicate so the same model is never retried twice in one round. This fixes the per-round model sweep that :func:`generate_synthetic_queries` walks when a Gemini ``generateContent`` call returns a retriable status. Pure list construction with no I/O or side effects. Called only by :func:`generate_synthetic_queries` (once per invocation, to seed ``model_order``); no other callers were found. Args: primary: The preferred Gemini model id to place first. Returns: list[str]: Model ids in try-order with duplicates removed. """ seen: set[str] = set() out: list[str] = [] for m in (primary, *_GEMINI_QUERY_FALLBACK_MODELS): if m not in seen: seen.add(m) out.append(m) return out def _extract_json_object(text: str) -> str | None: """Carve out the JSON object substring when the model wraps it in prose. Slices from the first opening brace to the last closing brace so a payload embedded in chatter or partial output can still be parsed. This is a best-effort recovery step, not a validator: the returned slice is not guaranteed to be valid JSON, only to span a plausible object. Pure string handling with no I/O or side effects. Called only by :func:`_parse_queries_from_response_text` as a second-chance recovery after a direct :func:`json.loads` fails; no other callers were found. Args: text: Raw model output that may contain a JSON object somewhere inside. Returns: str | None: The brace-delimited substring, or ``None`` when no opening brace exists or no closing brace follows it. """ start = text.find("{") if start < 0: return None end = text.rfind("}") if end <= start: return None return text[start : end + 1] def _parse_queries_from_response_text( raw_text: str, tool_name: str, count: int, ) -> list[str] | None: """Parse a model response into a validated list of synthetic query strings. Normalizes raw model output -- strips whitespace and ``<thinking>`` blocks, peels off markdown code fences -- then decodes the JSON and pulls out the ``queries`` array (falling back to the first list-valued key, or the whole payload when it is itself a list). The result is coerced to strings and length checked so that an empty, malformed, or under-count response is rejected and the caller can regenerate. On a direct decode failure it retries via :func:`_extract_json_object` to recover a brace-delimited blob. Failures and short counts are logged at debug level; the function performs no network or Redis I/O. Called by :func:`_try_openrouter_query_gen` and :func:`generate_synthetic_queries` (the Gemini branch) to validate each response, and exercised by ``tests/test_discover_invalid_query_index.py``; no other callers were found. Args: raw_text: The raw text body returned by Gemini or OpenRouter. tool_name: Tool name, used only for log context. count: Required number of queries; responses with fewer are rejected, and extras are truncated to this length. Returns: list[str] | None: Exactly *count* query strings, or ``None`` when the payload is unparseable, empty, or has too few queries. """ content = raw_text.strip() content = re.sub( r"<thinking>.*?</thinking>\s*", "", content, flags=re.DOTALL, ) if content.startswith("```json"): content = content[7:] if content.startswith("```"): content = content[3:] if content.endswith("```"): content = content[:-3] content = content.strip() if not content: logger.debug("No JSON payload after stripping for %s", tool_name) return None data: Any = None try: data = json.loads(content) except json.JSONDecodeError: blob = _extract_json_object(content) if blob: try: data = json.loads(blob) except json.JSONDecodeError: logger.debug( "JSON decode failed for %s (even after object extract)", tool_name, ) return None else: logger.debug("JSON decode failed for %s (no extractable object)", tool_name) return None if isinstance(data, dict): queries = data.get("queries") if not isinstance(queries, list): for v in data.values(): if isinstance(v, list): queries = v break else: queries = [] elif isinstance(data, list): queries = data else: queries = [] out = [str(q) for q in queries if isinstance(q, (str, int))] if not out: logger.debug("Parsed empty queries for %s", tool_name) return None if len(out) < count: logger.debug( "Expected %d queries for %s, got %d (will regenerate)", count, tool_name, len(out), ) return None if len(out) > count: out = out[:count] return out def _openrouter_query_gen_api_key() -> str: """API key for OpenRouter query generation (refresh / build index). Prefer ``OPENROUTER_QUERY_GEN_API_KEY``, then ``OPENROUTER_API_KEY``, then ``API_KEY``. Never log the value. """ for name in ( "OPENROUTER_QUERY_GEN_API_KEY", "OPENROUTER_API_KEY", "API_KEY", ): v = os.environ.get(name, "").strip() if v: return v return "" async def _sleep_429_backoff( attempt_index: int, *, base_s: float, max_s: float, label: str, ) -> None: """Sleep an exponential backoff delay after a rate-limited (429) attempt. Computes ``base_s * 2 ** attempt_index`` capped at ``max_s`` (with ``attempt_index`` clamped to 0..20) and awaits it, logging the chosen delay under ``label``. ``attempt_index`` is zero-based -- the first 429 is attempt 0 -- so callers pass their current retry counter directly. Awaited by the Gemini embedding/generate request loops in this module (build_tool_index.py:535 and :789) when the API returns HTTP 429. Args: attempt_index: Zero-based retry counter; the first 429 is ``0``. base_s: Base delay in seconds, doubled for each subsequent attempt. max_s: Upper bound on the computed delay. label: Short tag identifying the call site in the log line. """ exp = min(max(attempt_index, 0), 20) delay = min(base_s * (2.0**exp), max_s) logger.info( "429 backoff %s: sleeping %.1fs (attempt %d)", label, delay, attempt_index + 1, ) await asyncio.sleep(delay) def _text_from_gemini_generate_response(resp_data: dict[str, Any]) -> str: """Pull the first text part out of a Gemini ``generateContent`` response body. Walks the standard Gemini response shape -- ``candidates[0].content.parts[0].text`` -- and returns that string, defaulting to an empty string whenever any level is missing (no candidates, no parts, or a part without ``text``). The empty string lets callers treat a structurally empty response the same as a blank one and trigger an immediate regenerate rather than crashing. This is a pure dictionary accessor: it performs no I/O, no logging, and no network calls; it only reshapes data already returned by the HTTP client. Called by :func:`generate_synthetic_queries` (the Gemini branch), which feeds the returned text to :func:`_parse_queries_from_response_text`. No other internal callers were found. Args: resp_data: The parsed JSON body of a Gemini ``generateContent`` response. Returns: The first candidate's first text part, or ``""`` when the expected structure is absent. """ candidates = resp_data.get("candidates", []) if not candidates: return "" parts = candidates[0].get("content", {}).get("parts", []) return parts[0].get("text", "") if parts else "" async def _try_openrouter_query_gen( client: httpx.AsyncClient, *, tool_name: str, prompt: str, count: int, primary_model_name: str, ) -> list[str] | None: """Generate synthetic queries via OpenRouter chat completions as a Gemini fallback. Posts the reverse-HyDE *prompt* to the OpenRouter chat endpoint using model ``google/<primary_model_name>`` and parses the reply into a query list. This is the cross-provider safety net used when Gemini ``generateContent`` is unavailable or exhausted, so query generation can still complete on a different backend. The API key comes from :func:`_openrouter_query_gen_api_key` (the call is skipped, returning ``None``, when no key is configured). It makes HTTP POSTs via the shared ``httpx.AsyncClient``, sleeping through 429 responses with :func:`_sleep_429_backoff`, and re-POSTs immediately on empty or invalid bodies up to ``_JSON_PARSE_MAX_RETRIES_SAME_MODEL`` times, validating each reply through :func:`_parse_queries_from_response_text`. Progress and failures are logged; no Redis or filesystem side effects. Called by :func:`generate_synthetic_queries` and :func:`_generate_synthetic_queries_openrouter_only`; no other callers were found. Args: client: Shared async HTTP client used for the POST. tool_name: Tool name woven into the prompt and logs. prompt: The fully formatted reverse-HyDE prompt to send. count: Required number of queries, enforced by the parser. primary_model_name: Bare model id; sent as ``google/<primary_model_name>``. Returns: list[str] | None: The parsed queries on success, or ``None`` when the key is missing, the HTTP call errors, or no valid response is obtained within the retry budget. """ api_key = _openrouter_query_gen_api_key() if not api_key: logger.info( "Skipping OpenRouter query-gen fallback for %s: " "set OPENROUTER_QUERY_GEN_API_KEY, OPENROUTER_API_KEY, or API_KEY", tool_name, ) return None or_model = f"google/{primary_model_name}" payload: dict[str, Any] = { "model": or_model, "messages": [ { "role": "system", "content": ("You are a helpful assistant that outputs strict JSON."), }, {"role": "user", "content": prompt}, ], "temperature": 0.8, "max_tokens": min(_QUERY_GEN_MAX_OUTPUT_TOKENS, 65_536), } headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } logger.info( "OpenRouter query-gen fallback: model=%s tool=%s", or_model, tool_name, ) parse_attempt = 0 while parse_attempt < _JSON_PARSE_MAX_RETRIES_SAME_MODEL: for or_429_try in range(_OPENROUTER_QUERY_GEN_429_MAX_RETRIES + 1): resp = await client.post( _OPENROUTER_CHAT_URL, json=payload, headers=headers, ) if ( resp.status_code == 429 and or_429_try < _OPENROUTER_QUERY_GEN_429_MAX_RETRIES ): await _sleep_429_backoff( or_429_try, base_s=_OPENROUTER_QUERY_GEN_429_BACKOFF_BASE_S, max_s=_OPENROUTER_QUERY_GEN_429_BACKOFF_MAX_S, label=f"OpenRouter tool={tool_name}", ) continue break if resp.status_code >= 400: logger.warning( "OpenRouter query-gen HTTP %s for %s: %s", resp.status_code, tool_name, resp.text[:300], ) return None try: body = resp.json() except Exception: parse_attempt += 1 logger.info( "OpenRouter response body not JSON for %s — immediate regenerate (%d/%d)", tool_name, parse_attempt, _JSON_PARSE_MAX_RETRIES_SAME_MODEL, ) continue choices = body.get("choices") or [] if not choices: parse_attempt += 1 logger.info( "OpenRouter query-gen no choices for %s — immediate regenerate (%d/%d)", tool_name, parse_attempt, _JSON_PARSE_MAX_RETRIES_SAME_MODEL, ) continue msg = choices[0].get("message") or {} raw = (msg.get("content") or "").strip() if not raw: parse_attempt += 1 logger.info( "OpenRouter query-gen empty content for %s — immediate regenerate (%d/%d)", tool_name, parse_attempt, _JSON_PARSE_MAX_RETRIES_SAME_MODEL, ) continue parsed = _parse_queries_from_response_text(raw, tool_name, count) if parsed is not None: return parsed parse_attempt += 1 logger.info( "OpenRouter invalid or incomplete query JSON for %s — immediate regenerate (%d/%d)", tool_name, parse_attempt, _JSON_PARSE_MAX_RETRIES_SAME_MODEL, ) return None async def _generate_synthetic_queries_openrouter_only( client: httpx.AsyncClient, tool_name: str, prompt: str, count: int, ) -> list[str]: """Generate synthetic queries through OpenRouter exclusively, never touching Gemini. Drives the OpenRouter-only path used when the caller passes ``openrouter_only=True``: it requires an OpenRouter key up front, then loops up to ``_QUERY_GEN_MAX_ATTEMPTS`` times, applying exponential backoff between rounds, until a valid query list is produced or the budget is spent. This lets environments without Gemini access still build the tool index. Each attempt delegates to :func:`_try_openrouter_query_gen` (HTTP POSTs via the shared ``httpx.AsyncClient``); the key is checked with :func:`_openrouter_query_gen_api_key`, and per-round errors are logged before retrying. No Redis or filesystem side effects. Called only by :func:`generate_synthetic_queries` when ``openrouter_only`` is set; no other callers were found. Args: client: Shared async HTTP client passed through to the OpenRouter call. tool_name: Tool name for prompt and log context. prompt: The formatted reverse-HyDE prompt to send. count: Required number of queries. Returns: list[str]: The generated queries. Raises: RuntimeError: When no OpenRouter key is configured, or when a valid query list cannot be obtained within ``_QUERY_GEN_MAX_ATTEMPTS`` attempts. """ if not _openrouter_query_gen_api_key(): raise RuntimeError( "openrouter_only synthetic query generation requires " "OPENROUTER_QUERY_GEN_API_KEY, OPENROUTER_API_KEY, or API_KEY", ) max_attempts = _QUERY_GEN_MAX_ATTEMPTS consecutive_failures = 0 for attempt in range(max_attempts): if attempt > 0: exp = min(max(consecutive_failures - 1, 0), 20) delay = min( _QUERY_GEN_BACKOFF_BASE_S * (2.0**exp), _QUERY_GEN_BACKOFF_MAX_S, ) await asyncio.sleep(delay) try: or_out = await _try_openrouter_query_gen( client, tool_name=tool_name, prompt=prompt, count=count, primary_model_name=_OPENROUTER_ONLY_QUERY_MODEL, ) if or_out is not None: return or_out except Exception as exc: logger.warning( "OpenRouter-only query-gen error tool=%s: %s", tool_name, exc, exc_info=True, ) consecutive_failures += 1 raise RuntimeError( f"Exhausted OpenRouter-only synthetic query generation for {tool_name!r}: " f"could not obtain {count} valid queries after {max_attempts} attempts.", )
[docs] async def generate_synthetic_queries( client: httpx.AsyncClient, base_url: str | None, api_key: str | None, tool_name: str, tool_description: str, count: int = SYNTHETIC_QUERY_COUNT, model: str = _DEFAULT_GENERATE_MODEL, *, openrouter_only: bool = False, ) -> list[str]: """Produce *count* synthetic queries via Gemini, with model fallbacks. When *openrouter_only* is True, skips Gemini and uses OpenRouter chat with ``google/{_OPENROUTER_ONLY_QUERY_MODEL}`` only (requires ``OPENROUTER_*`` or ``API_KEY``). Tries the primary *model*, then ``gemini-2.5-flash`` and ``gemini-3-flash-preview`` on the same API key when ``generateContent`` returns a retriable HTTP status. If every Gemini model fails for the round, falls back to OpenRouter chat completions with ``google/<primary model name>``. OpenRouter key: prefer ``OPENROUTER_QUERY_GEN_API_KEY``, else ``OPENROUTER_API_KEY``, else ``API_KEY``. Embeddings are unaffected — this path is query generation only. **Invalid JSON or too few queries:** the same model is called again immediately (up to ``GEMINI_QUERY_GEN_JSON_PARSE_MAX_RETRIES_SAME_MODEL`` times per model), then the next model / outer round. Partial JSON may be recovered by extracting the first ``{...}`` block. 429 responses wait with exponential backoff and retry the same Gemini model (see ``GEMINI_QUERY_GEN_429_*``); OpenRouter 429 uses ``OPENROUTER_QUERY_GEN_429_*``. Retries use exponential backoff. After ``_QUERY_GEN_PAID_AFTER_FAILURES`` consecutive full-round failures, :func:`gemini_embed_pool.get_paid_fallback_key` is used (if set). Env: ``GEMINI_QUERY_GEN_*`` (see module constants). Raises: RuntimeError: if valid queries cannot be produced after all attempts. """ from gemini_embed_pool import ( get_paid_fallback_key, is_daily_quota_429, mark_key_daily_spent, next_gemini_flash_key, ) prompt = REVERSE_HYDE_PROMPT.format( tool_name=tool_name, tool_description=tool_description, count=count, ) if openrouter_only: return await _generate_synthetic_queries_openrouter_only( client, tool_name, prompt, count, ) payload = { "contents": [{"parts": [{"text": prompt}]}], "systemInstruction": { "parts": [ {"text": "You are a helpful assistant that outputs strict JSON."} ], }, "generationConfig": { "temperature": 0.8, "maxOutputTokens": _QUERY_GEN_MAX_OUTPUT_TOKENS, "responseMimeType": "application/json", }, } paid_key = get_paid_fallback_key() max_attempts = _QUERY_GEN_MAX_ATTEMPTS consecutive_failures = 0 _logged_paid_fallback = False model_order = _gemini_query_model_order(model) for attempt in range(max_attempts): if attempt > 0: exp = min(max(consecutive_failures - 1, 0), 20) delay = min( _QUERY_GEN_BACKOFF_BASE_S * (2.0**exp), _QUERY_GEN_BACKOFF_MAX_S, ) await asyncio.sleep(delay) use_paid = bool( paid_key and consecutive_failures >= _QUERY_GEN_PAID_AFTER_FAILURES, ) gemini_key = paid_key if use_paid else next_gemini_flash_key() if use_paid and not _logged_paid_fallback: logger.info( "Using paid Gemini key for %s after %d consecutive failures", tool_name, consecutive_failures, ) _logged_paid_fallback = True for gm in model_order: url = f"{_GEMINI_GENERATE_BASE}/{gm}" f":generateContent?key={gemini_key}" gemini_429_attempt = 0 same_model_immediate_retry = 0 while True: try: resp = await client.post(url, json=payload) if resp.status_code == 429: if is_daily_quota_429(resp): await mark_key_daily_spent( gemini_key, "generate", ) if gemini_429_attempt < _QUERY_GEN_429_MAX_RETRIES: await _sleep_429_backoff( gemini_429_attempt, base_s=_QUERY_GEN_429_BACKOFF_BASE_S, max_s=_QUERY_GEN_429_BACKOFF_MAX_S, label=f"Gemini model={gm} tool={tool_name}", ) gemini_429_attempt += 1 continue logger.info( "Gemini query-gen 429 exhausted model=%s tool=%s — next", gm, tool_name, ) break if resp.status_code in (500, 502, 503, 504): logger.info( "Gemini query-gen HTTP %s model=%s tool=%s — next", resp.status_code, gm, tool_name, ) break if resp.status_code >= 400: logger.warning( "Gemini query-gen HTTP %s model=%s tool=%s", resp.status_code, gm, tool_name, ) break resp_data = resp.json() raw_text = _text_from_gemini_generate_response( resp_data, ) if not raw_text.strip(): same_model_immediate_retry += 1 if ( same_model_immediate_retry >= _JSON_PARSE_MAX_RETRIES_SAME_MODEL ): logger.warning( "Empty Gemini content model=%s tool=%s after %d " "immediate regenerates — next model", gm, tool_name, same_model_immediate_retry, ) break logger.info( "Empty Gemini content model=%s tool=%s — immediate " "regenerate (%d/%d)", gm, tool_name, same_model_immediate_retry, _JSON_PARSE_MAX_RETRIES_SAME_MODEL, ) continue parsed = _parse_queries_from_response_text( raw_text, tool_name, count, ) if parsed is not None: return parsed same_model_immediate_retry += 1 if same_model_immediate_retry >= _JSON_PARSE_MAX_RETRIES_SAME_MODEL: logger.warning( "Invalid or incomplete query JSON for %s model=%s after " "%d immediate regenerates — next model", tool_name, gm, same_model_immediate_retry, ) break logger.info( "Invalid or incomplete query JSON for %s model=%s — " "immediate regenerate (%d/%d)", tool_name, gm, same_model_immediate_retry, _JSON_PARSE_MAX_RETRIES_SAME_MODEL, ) continue except Exception as exc: logger.warning( "Gemini query-gen error model=%s tool=%s: %s", gm, tool_name, exc, ) break # All Gemini models failed this round — OpenRouter (query gen only) try: or_out = await _try_openrouter_query_gen( client, tool_name=tool_name, prompt=prompt, count=count, primary_model_name=model, ) if or_out is not None: return or_out except Exception as exc: logger.error( "OpenRouter query-gen fallback failed for %s: %s", tool_name, exc, ) consecutive_failures += 1 logger.debug( "Query-gen round failed for %s (failures=%d)", tool_name, consecutive_failures, ) raise RuntimeError( f"Exhausted synthetic query generation for tool {tool_name!r}: " f"could not obtain {count} valid queries after {max_attempts} outer " f"attempt(s) (each attempt tries all Gemini models with up to " f"{_JSON_PARSE_MAX_RETRIES_SAME_MODEL} immediate regenerates per model, " f"then OpenRouter).", )
[docs] async def build_index(tools_dir: str = "tools") -> None: """Discover every tool and generate its synthetic queries into the index file. The top-level driver for the reverse-HyDE index build: it auto-discovers all registered tools, generates ``SYNTHETIC_QUERY_COUNT`` synthetic user queries per tool, and writes the merged result to ``tool_index_data.json``. That JSON is the input later consumed by the embedding initializers to populate the vector classifier, so this is the first stage of the tool-routing pipeline. It builds a :class:`tools.ToolRegistry` and loads it via :func:`tool_loader.load_tools`, reads any existing ``OUTPUT_FILE`` to skip tools that already have enough queries (resumable), and fans out generation across an ``asyncio.Semaphore(3)`` of three concurrent workers. Each worker calls :func:`generate_synthetic_queries` over a shared ``httpx.AsyncClient`` (Gemini with OpenRouter fallback), and the final dict is written back to ``OUTPUT_FILE`` on disk. Progress is logged throughout. Invoked only from this module's ``__main__`` guard via ``asyncio.run``; no other callers were found. Args: tools_dir: Directory scanned for tool modules. Defaults to ``"tools"``. Returns: None. Side effects are the synthetic-query writes to ``tool_index_data.json``. """ logger.info( "--- Auto-discovering tools from %s ---", tools_dir, ) registry = ToolRegistry() load_tools(tools_dir, registry) all_tools = registry.list_tools() logger.info("Found %d tools.", len(all_tools)) client = httpx.AsyncClient( timeout=httpx.Timeout(600.0, connect=30.0), ) index_data: dict[str, Any] = {} if os.path.exists(OUTPUT_FILE): try: with open( OUTPUT_FILE, "r", encoding="utf-8", ) as f: index_data = json.load(f) logger.info( "Loaded existing data with %d entries.", len(index_data), ) except Exception: pass logger.info( "--- Generating %d queries per tool ---", SYNTHETIC_QUERY_COUNT, ) sem = asyncio.Semaphore(3) async def process_tool(tool: Any) -> None: """Generate and record the synthetic queries for one discovered tool. Per-tool worker scheduled by :func:`build_index`: it skips a tool that already has at least ``SYNTHETIC_QUERY_COUNT`` queries in the loaded ``index_data``, otherwise acquires the shared semaphore and calls :func:`generate_synthetic_queries` (over the enclosing ``client``) to produce a fresh batch. The result mutates the enclosing ``index_data`` dict in place under ``tool.name``; the actual disk write happens once back in :func:`build_index`. Closes over ``index_data``, ``client``, and the ``sem`` semaphore from :func:`build_index`, which gathers one instance per discovered tool. No other callers exist. Logging is its only direct side effect; the network I/O lives inside :func:`generate_synthetic_queries`. Args: tool: A discovered tool object exposing ``name`` and ``description``. Returns: None. Side effect is the in-place ``index_data`` update. """ if tool.name in index_data: existing = index_data[tool.name].get( "synthetic_queries", [], ) if len(existing) >= SYNTHETIC_QUERY_COUNT: logger.info( "Skipping %s (%d queries)", tool.name, len(existing), ) return logger.info( "Regenerating %s (%d queries)", tool.name, len(existing), ) async with sem: logger.info("Generating for: %s", tool.name) queries = await generate_synthetic_queries( client, None, None, tool.name, tool.description or "", ) index_data[tool.name] = { "name": tool.name, "description": tool.description, "synthetic_queries": queries, } logger.info( "Finished %s (%d queries)", tool.name, len(queries), ) tasks = [process_tool(t) for t in all_tools] await asyncio.gather(*tasks) logger.info("--- Saving to %s ---", OUTPUT_FILE) with open(OUTPUT_FILE, "w", encoding="utf-8") as f: json.dump(index_data, f, indent=2) logger.info("Done! %d tools indexed.", len(index_data))
if __name__ == "__main__": parser = argparse.ArgumentParser( description=("Build tool index (synthetic queries) " "for vector classifier"), ) parser.add_argument( "--tools-dir", default="tools", help="Directory containing tool scripts", ) args = parser.parse_args() asyncio.run(build_index(tools_dir=args.tools_dir))