#!/usr/bin/env python3
"""Build the tool index used by the vector classifier.
Auto-discovers every registered tool via :mod:`tool_loader`, then
calls an LLM to generate 50 diverse synthetic user queries per tool
(reverse-HyDE). Results are saved to ``tool_index_data.json`` in
this directory.
Usage::
python -m classifiers.build_tool_index [--tools-dir tools]
"""
from __future__ import annotations
import asyncio
import argparse
import jsonutil as json
import logging
import os
import re
import sys
from typing import Any
import httpx
# Ensure the project root is importable when running as a script.
_PROJECT_ROOT = os.path.abspath(
os.path.join(os.path.dirname(__file__), ".."),
)
sys.path.insert(0, _PROJECT_ROOT)
try:
from dotenv import load_dotenv
load_dotenv(os.path.join(_PROJECT_ROOT, ".env"))
except ImportError:
pass
from tools import ToolRegistry # noqa: E402
from tool_loader import load_tools # noqa: E402
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s: %(message)s",
)
logger = logging.getLogger(__name__)
OUTPUT_FILE = os.path.join(
os.path.dirname(__file__),
"tool_index_data.json",
)
SYNTHETIC_QUERY_COUNT = 50
# Gemini synthetic-query generation (used by build_index, refresh_tool_embeddings,
# update_tool_embeddings). Tune via env vars.
_QUERY_GEN_MAX_ATTEMPTS = int(
os.environ.get("GEMINI_QUERY_GEN_MAX_ATTEMPTS", "25"),
)
_QUERY_GEN_BACKOFF_BASE_S = float(
os.environ.get("GEMINI_QUERY_GEN_BACKOFF_BASE", "0.5"),
)
_QUERY_GEN_BACKOFF_MAX_S = float(
os.environ.get("GEMINI_QUERY_GEN_BACKOFF_MAX", "120.0"),
)
_QUERY_GEN_PAID_AFTER_FAILURES = int(
os.environ.get("GEMINI_QUERY_GEN_PAID_AFTER_FAILURES", "5"),
)
_QUERY_GEN_MAX_OUTPUT_TOKENS = int(
os.environ.get("GEMINI_QUERY_GEN_MAX_OUTPUT_TOKENS", "30000"),
)
# Per-model retries when Gemini returns 429 (exponential sleep before re-POST).
_QUERY_GEN_429_MAX_RETRIES = int(
os.environ.get("GEMINI_QUERY_GEN_429_MAX_RETRIES", "8"),
)
_QUERY_GEN_429_BACKOFF_BASE_S = float(
os.environ.get("GEMINI_QUERY_GEN_429_BACKOFF_BASE", "2.0"),
)
_QUERY_GEN_429_BACKOFF_MAX_S = float(
os.environ.get("GEMINI_QUERY_GEN_429_BACKOFF_MAX", "120.0"),
)
# Same Gemini/OpenRouter model: on invalid JSON, empty body, or too few queries — re-POST immediately
# until success or this cap (then try next model / outer round).
_JSON_PARSE_MAX_RETRIES_SAME_MODEL = int(
os.environ.get("GEMINI_QUERY_GEN_JSON_PARSE_MAX_RETRIES_SAME_MODEL", "100"),
)
# OpenRouter 429 retries for query-gen fallback only.
_OPENROUTER_QUERY_GEN_429_MAX_RETRIES = int(
os.environ.get("OPENROUTER_QUERY_GEN_429_MAX_RETRIES", "6"),
)
_OPENROUTER_QUERY_GEN_429_BACKOFF_BASE_S = float(
os.environ.get("OPENROUTER_QUERY_GEN_429_BACKOFF_BASE", "2.0"),
)
_OPENROUTER_QUERY_GEN_429_BACKOFF_MAX_S = float(
os.environ.get("OPENROUTER_QUERY_GEN_429_BACKOFF_MAX", "120.0"),
)
REVERSE_HYDE_PROMPT = """\
You are an expert prompt engineer building a semantic search \
index for an AI assistant's tool-calling system.
## Tool under analysis
- **Name:** {tool_name}
- **Description:** {tool_description}
## Your task
Generate exactly {count} synthetic user messages that a human \
would write in a chat with an AI assistant when they *need* \
this tool -- even if they don't know the tool exists. The \
messages must be realistic, varied, and collectively span the \
full semantic surface area of the tool.
**Single response:** emit **exactly** {count} query strings in **one** JSON \
object in this turn only. The `"queries"` array must have length **exactly** \
{count} — no truncation, no follow-ups, no batching across calls.
Distribute the {count} messages across the following \
categories. Exact counts per category are guidelines -- some \
tools may naturally skew toward certain categories -- but \
every category MUST have at least one entry.
### Categories
1. **Direct commands** (~5): Explicit imperative sentences.
"Run ...", "Execute ...", "Do ...", "Set up ..."
2. **Natural-language requests** (~8): Polite or \
conversational requests that clearly imply the tool.
"Could you ...", "I'd like you to ...", "Please ..."
3. **Vague / ambiguous intents** (~8): The user's goal is \
clear but no specific tool is named; this tool is one \
plausible match.
4. **Questions** (~5): "Can you ...?", "Is it possible \
to ...?", "How do I ...?"
5. **Contextual / embedded** (~5): The request is buried \
inside a longer message giving background.
"I was working on X earlier and now I need ..."
6. **Multi-step scenarios** (~5): The user describes a \
workflow where this tool is one step.
"First do A, then B, and finally ..."
7. **Error / frustration** (~4): The user is stuck or \
something broke and they need this tool to fix it.
"X isn't working ...", "I keep getting ...", \
"Help, I can't ..."
8. **Follow-up / conversational** (~5): Short messages \
that only make sense in context.
"Actually, also ...", "And then ...", \
"One more thing -- ..."
9. **Domain-specific jargon** (~5): Technical or domain \
language that implies the tool without naming it.
## Constraints
- Do NOT mention the tool name in more than 5 of the {count} \
messages.
- Focus on *user intent*, not tool internals.
- Vary sentence length (short single-clause to \
multi-sentence).
- Include at least 5 messages that are <= 8 words.
- Return a JSON object with a single key `"queries"` whose \
value is the array of strings. No markdown fences, no \
extra keys, no explanation.
Example (abbreviated):
{{"queries": ["find the file", \
"search for documents matching sales Q3", \
"where is that report I uploaded yesterday"]}}
"""
_GEMINI_GENERATE_BASE = "https://generativelanguage.googleapis.com/v1beta/models"
_DEFAULT_GENERATE_MODEL = "gemini-3.1-flash-lite"
# OpenRouter chat model id fragment for ``google/<id>`` when
# :func:`generate_synthetic_queries` is called with ``openrouter_only=True``
# (skips Gemini ``generateContent`` entirely).
_OPENROUTER_ONLY_QUERY_MODEL = "gemini-3.1-flash-lite"
# Additional Gemini model ids to try (after *primary*) when generateContent fails
# (e.g. 429). Used only for synthetic query generation, not embeddings.
_GEMINI_QUERY_FALLBACK_MODELS = (
"gemini-2.5-flash",
"gemini-3-flash-preview",
)
_OPENROUTER_CHAT_URL = "https://openrouter.ai/api/v1/chat/completions"
def _gemini_query_model_order(primary: str) -> list[str]:
"""Build the ordered, deduped list of Gemini models to try for query generation.
Returns the caller-chosen *primary* model first, followed by the module
fallbacks in ``_GEMINI_QUERY_FALLBACK_MODELS``, dropping any duplicate so the
same model is never retried twice in one round. This fixes the per-round model
sweep that :func:`generate_synthetic_queries` walks when a Gemini
``generateContent`` call returns a retriable status.
Pure list construction with no I/O or side effects. Called only by
:func:`generate_synthetic_queries` (once per invocation, to seed
``model_order``); no other callers were found.
Args:
primary: The preferred Gemini model id to place first.
Returns:
list[str]: Model ids in try-order with duplicates removed.
"""
seen: set[str] = set()
out: list[str] = []
for m in (primary, *_GEMINI_QUERY_FALLBACK_MODELS):
if m not in seen:
seen.add(m)
out.append(m)
return out
def _extract_json_object(text: str) -> str | None:
"""Carve out the JSON object substring when the model wraps it in prose.
Slices from the first opening brace to the last closing brace so a payload
embedded in chatter or partial output can still be parsed. This is a
best-effort recovery step, not a validator: the returned slice is not
guaranteed to be valid JSON, only to span a plausible object.
Pure string handling with no I/O or side effects. Called only by
:func:`_parse_queries_from_response_text` as a second-chance recovery after a
direct :func:`json.loads` fails; no other callers were found.
Args:
text: Raw model output that may contain a JSON object somewhere inside.
Returns:
str | None: The brace-delimited substring, or ``None`` when no opening
brace exists or no closing brace follows it.
"""
start = text.find("{")
if start < 0:
return None
end = text.rfind("}")
if end <= start:
return None
return text[start : end + 1]
def _parse_queries_from_response_text(
raw_text: str,
tool_name: str,
count: int,
) -> list[str] | None:
"""Parse a model response into a validated list of synthetic query strings.
Normalizes raw model output -- strips whitespace and ``<thinking>`` blocks,
peels off markdown code fences -- then decodes the JSON and pulls out the
``queries`` array (falling back to the first list-valued key, or the whole
payload when it is itself a list). The result is coerced to strings and length
checked so that an empty, malformed, or under-count response is rejected and
the caller can regenerate.
On a direct decode failure it retries via :func:`_extract_json_object` to
recover a brace-delimited blob. Failures and short counts are logged at debug
level; the function performs no network or Redis I/O. Called by
:func:`_try_openrouter_query_gen` and :func:`generate_synthetic_queries` (the
Gemini branch) to validate each response, and exercised by
``tests/test_discover_invalid_query_index.py``; no other callers were found.
Args:
raw_text: The raw text body returned by Gemini or OpenRouter.
tool_name: Tool name, used only for log context.
count: Required number of queries; responses with fewer are rejected, and
extras are truncated to this length.
Returns:
list[str] | None: Exactly *count* query strings, or ``None`` when the
payload is unparseable, empty, or has too few queries.
"""
content = raw_text.strip()
content = re.sub(
r"<thinking>.*?</thinking>\s*",
"",
content,
flags=re.DOTALL,
)
if content.startswith("```json"):
content = content[7:]
if content.startswith("```"):
content = content[3:]
if content.endswith("```"):
content = content[:-3]
content = content.strip()
if not content:
logger.debug("No JSON payload after stripping for %s", tool_name)
return None
data: Any = None
try:
data = json.loads(content)
except json.JSONDecodeError:
blob = _extract_json_object(content)
if blob:
try:
data = json.loads(blob)
except json.JSONDecodeError:
logger.debug(
"JSON decode failed for %s (even after object extract)",
tool_name,
)
return None
else:
logger.debug("JSON decode failed for %s (no extractable object)", tool_name)
return None
if isinstance(data, dict):
queries = data.get("queries")
if not isinstance(queries, list):
for v in data.values():
if isinstance(v, list):
queries = v
break
else:
queries = []
elif isinstance(data, list):
queries = data
else:
queries = []
out = [str(q) for q in queries if isinstance(q, (str, int))]
if not out:
logger.debug("Parsed empty queries for %s", tool_name)
return None
if len(out) < count:
logger.debug(
"Expected %d queries for %s, got %d (will regenerate)",
count,
tool_name,
len(out),
)
return None
if len(out) > count:
out = out[:count]
return out
def _openrouter_query_gen_api_key() -> str:
"""API key for OpenRouter query generation (refresh / build index).
Prefer ``OPENROUTER_QUERY_GEN_API_KEY``, then ``OPENROUTER_API_KEY``,
then ``API_KEY``. Never log the value.
"""
for name in (
"OPENROUTER_QUERY_GEN_API_KEY",
"OPENROUTER_API_KEY",
"API_KEY",
):
v = os.environ.get(name, "").strip()
if v:
return v
return ""
async def _sleep_429_backoff(
attempt_index: int,
*,
base_s: float,
max_s: float,
label: str,
) -> None:
"""Sleep an exponential backoff delay after a rate-limited (429) attempt.
Computes ``base_s * 2 ** attempt_index`` capped at ``max_s`` (with
``attempt_index`` clamped to 0..20) and awaits it, logging the chosen delay
under ``label``. ``attempt_index`` is zero-based -- the first 429 is attempt
0 -- so callers pass their current retry counter directly.
Awaited by the Gemini embedding/generate request loops in this module
(build_tool_index.py:535 and :789) when the API returns HTTP 429.
Args:
attempt_index: Zero-based retry counter; the first 429 is ``0``.
base_s: Base delay in seconds, doubled for each subsequent attempt.
max_s: Upper bound on the computed delay.
label: Short tag identifying the call site in the log line.
"""
exp = min(max(attempt_index, 0), 20)
delay = min(base_s * (2.0**exp), max_s)
logger.info(
"429 backoff %s: sleeping %.1fs (attempt %d)",
label,
delay,
attempt_index + 1,
)
await asyncio.sleep(delay)
def _text_from_gemini_generate_response(resp_data: dict[str, Any]) -> str:
"""Pull the first text part out of a Gemini ``generateContent`` response body.
Walks the standard Gemini response shape -- ``candidates[0].content.parts[0].text``
-- and returns that string, defaulting to an empty string whenever any level
is missing (no candidates, no parts, or a part without ``text``). The empty
string lets callers treat a structurally empty response the same as a blank
one and trigger an immediate regenerate rather than crashing.
This is a pure dictionary accessor: it performs no I/O, no logging, and no
network calls; it only reshapes data already returned by the HTTP client.
Called by :func:`generate_synthetic_queries` (the Gemini branch), which feeds
the returned text to :func:`_parse_queries_from_response_text`. No other
internal callers were found.
Args:
resp_data: The parsed JSON body of a Gemini ``generateContent`` response.
Returns:
The first candidate's first text part, or ``""`` when the expected
structure is absent.
"""
candidates = resp_data.get("candidates", [])
if not candidates:
return ""
parts = candidates[0].get("content", {}).get("parts", [])
return parts[0].get("text", "") if parts else ""
async def _try_openrouter_query_gen(
client: httpx.AsyncClient,
*,
tool_name: str,
prompt: str,
count: int,
primary_model_name: str,
) -> list[str] | None:
"""Generate synthetic queries via OpenRouter chat completions as a Gemini fallback.
Posts the reverse-HyDE *prompt* to the OpenRouter chat endpoint using model
``google/<primary_model_name>`` and parses the reply into a query list. This
is the cross-provider safety net used when Gemini ``generateContent`` is
unavailable or exhausted, so query generation can still complete on a
different backend.
The API key comes from :func:`_openrouter_query_gen_api_key` (the call is
skipped, returning ``None``, when no key is configured). It makes HTTP POSTs
via the shared ``httpx.AsyncClient``, sleeping through 429 responses with
:func:`_sleep_429_backoff`, and re-POSTs immediately on empty or invalid
bodies up to ``_JSON_PARSE_MAX_RETRIES_SAME_MODEL`` times, validating each
reply through :func:`_parse_queries_from_response_text`. Progress and failures
are logged; no Redis or filesystem side effects. Called by
:func:`generate_synthetic_queries` and
:func:`_generate_synthetic_queries_openrouter_only`; no other callers were
found.
Args:
client: Shared async HTTP client used for the POST.
tool_name: Tool name woven into the prompt and logs.
prompt: The fully formatted reverse-HyDE prompt to send.
count: Required number of queries, enforced by the parser.
primary_model_name: Bare model id; sent as ``google/<primary_model_name>``.
Returns:
list[str] | None: The parsed queries on success, or ``None`` when the key
is missing, the HTTP call errors, or no valid response is obtained within
the retry budget.
"""
api_key = _openrouter_query_gen_api_key()
if not api_key:
logger.info(
"Skipping OpenRouter query-gen fallback for %s: "
"set OPENROUTER_QUERY_GEN_API_KEY, OPENROUTER_API_KEY, or API_KEY",
tool_name,
)
return None
or_model = f"google/{primary_model_name}"
payload: dict[str, Any] = {
"model": or_model,
"messages": [
{
"role": "system",
"content": ("You are a helpful assistant that outputs strict JSON."),
},
{"role": "user", "content": prompt},
],
"temperature": 0.8,
"max_tokens": min(_QUERY_GEN_MAX_OUTPUT_TOKENS, 65_536),
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
logger.info(
"OpenRouter query-gen fallback: model=%s tool=%s",
or_model,
tool_name,
)
parse_attempt = 0
while parse_attempt < _JSON_PARSE_MAX_RETRIES_SAME_MODEL:
for or_429_try in range(_OPENROUTER_QUERY_GEN_429_MAX_RETRIES + 1):
resp = await client.post(
_OPENROUTER_CHAT_URL,
json=payload,
headers=headers,
)
if (
resp.status_code == 429
and or_429_try < _OPENROUTER_QUERY_GEN_429_MAX_RETRIES
):
await _sleep_429_backoff(
or_429_try,
base_s=_OPENROUTER_QUERY_GEN_429_BACKOFF_BASE_S,
max_s=_OPENROUTER_QUERY_GEN_429_BACKOFF_MAX_S,
label=f"OpenRouter tool={tool_name}",
)
continue
break
if resp.status_code >= 400:
logger.warning(
"OpenRouter query-gen HTTP %s for %s: %s",
resp.status_code,
tool_name,
resp.text[:300],
)
return None
try:
body = resp.json()
except Exception:
parse_attempt += 1
logger.info(
"OpenRouter response body not JSON for %s — immediate regenerate (%d/%d)",
tool_name,
parse_attempt,
_JSON_PARSE_MAX_RETRIES_SAME_MODEL,
)
continue
choices = body.get("choices") or []
if not choices:
parse_attempt += 1
logger.info(
"OpenRouter query-gen no choices for %s — immediate regenerate (%d/%d)",
tool_name,
parse_attempt,
_JSON_PARSE_MAX_RETRIES_SAME_MODEL,
)
continue
msg = choices[0].get("message") or {}
raw = (msg.get("content") or "").strip()
if not raw:
parse_attempt += 1
logger.info(
"OpenRouter query-gen empty content for %s — immediate regenerate (%d/%d)",
tool_name,
parse_attempt,
_JSON_PARSE_MAX_RETRIES_SAME_MODEL,
)
continue
parsed = _parse_queries_from_response_text(raw, tool_name, count)
if parsed is not None:
return parsed
parse_attempt += 1
logger.info(
"OpenRouter invalid or incomplete query JSON for %s — immediate regenerate (%d/%d)",
tool_name,
parse_attempt,
_JSON_PARSE_MAX_RETRIES_SAME_MODEL,
)
return None
async def _generate_synthetic_queries_openrouter_only(
client: httpx.AsyncClient,
tool_name: str,
prompt: str,
count: int,
) -> list[str]:
"""Generate synthetic queries through OpenRouter exclusively, never touching Gemini.
Drives the OpenRouter-only path used when the caller passes
``openrouter_only=True``: it requires an OpenRouter key up front, then loops
up to ``_QUERY_GEN_MAX_ATTEMPTS`` times, applying exponential backoff between
rounds, until a valid query list is produced or the budget is spent. This lets
environments without Gemini access still build the tool index.
Each attempt delegates to :func:`_try_openrouter_query_gen` (HTTP POSTs via the
shared ``httpx.AsyncClient``); the key is checked with
:func:`_openrouter_query_gen_api_key`, and per-round errors are logged before
retrying. No Redis or filesystem side effects. Called only by
:func:`generate_synthetic_queries` when ``openrouter_only`` is set; no other
callers were found.
Args:
client: Shared async HTTP client passed through to the OpenRouter call.
tool_name: Tool name for prompt and log context.
prompt: The formatted reverse-HyDE prompt to send.
count: Required number of queries.
Returns:
list[str]: The generated queries.
Raises:
RuntimeError: When no OpenRouter key is configured, or when a valid query
list cannot be obtained within ``_QUERY_GEN_MAX_ATTEMPTS`` attempts.
"""
if not _openrouter_query_gen_api_key():
raise RuntimeError(
"openrouter_only synthetic query generation requires "
"OPENROUTER_QUERY_GEN_API_KEY, OPENROUTER_API_KEY, or API_KEY",
)
max_attempts = _QUERY_GEN_MAX_ATTEMPTS
consecutive_failures = 0
for attempt in range(max_attempts):
if attempt > 0:
exp = min(max(consecutive_failures - 1, 0), 20)
delay = min(
_QUERY_GEN_BACKOFF_BASE_S * (2.0**exp),
_QUERY_GEN_BACKOFF_MAX_S,
)
await asyncio.sleep(delay)
try:
or_out = await _try_openrouter_query_gen(
client,
tool_name=tool_name,
prompt=prompt,
count=count,
primary_model_name=_OPENROUTER_ONLY_QUERY_MODEL,
)
if or_out is not None:
return or_out
except Exception as exc:
logger.warning(
"OpenRouter-only query-gen error tool=%s: %s",
tool_name,
exc,
exc_info=True,
)
consecutive_failures += 1
raise RuntimeError(
f"Exhausted OpenRouter-only synthetic query generation for {tool_name!r}: "
f"could not obtain {count} valid queries after {max_attempts} attempts.",
)
[docs]
async def generate_synthetic_queries(
client: httpx.AsyncClient,
base_url: str | None,
api_key: str | None,
tool_name: str,
tool_description: str,
count: int = SYNTHETIC_QUERY_COUNT,
model: str = _DEFAULT_GENERATE_MODEL,
*,
openrouter_only: bool = False,
) -> list[str]:
"""Produce *count* synthetic queries via Gemini, with model fallbacks.
When *openrouter_only* is True, skips Gemini and uses OpenRouter chat with
``google/{_OPENROUTER_ONLY_QUERY_MODEL}`` only (requires ``OPENROUTER_*`` or
``API_KEY``).
Tries the primary *model*, then ``gemini-2.5-flash`` and
``gemini-3-flash-preview`` on the same API key when ``generateContent``
returns a retriable HTTP status. If every Gemini model fails for the
round, falls back to OpenRouter chat completions with
``google/<primary model name>``. OpenRouter key: prefer
``OPENROUTER_QUERY_GEN_API_KEY``, else ``OPENROUTER_API_KEY``, else
``API_KEY``. Embeddings are unaffected — this path is query generation
only.
**Invalid JSON or too few queries:** the same model is called again
immediately (up to ``GEMINI_QUERY_GEN_JSON_PARSE_MAX_RETRIES_SAME_MODEL``
times per model), then the next model / outer round. Partial JSON may be
recovered by extracting the first ``{...}`` block.
429 responses wait with exponential backoff and retry the same Gemini
model (see ``GEMINI_QUERY_GEN_429_*``); OpenRouter 429 uses
``OPENROUTER_QUERY_GEN_429_*``.
Retries use exponential backoff. After ``_QUERY_GEN_PAID_AFTER_FAILURES``
consecutive full-round failures, :func:`gemini_embed_pool.get_paid_fallback_key`
is used (if set). Env: ``GEMINI_QUERY_GEN_*`` (see module constants).
Raises:
RuntimeError: if valid queries cannot be produced after all attempts.
"""
from gemini_embed_pool import (
get_paid_fallback_key,
is_daily_quota_429,
mark_key_daily_spent,
next_gemini_flash_key,
)
prompt = REVERSE_HYDE_PROMPT.format(
tool_name=tool_name,
tool_description=tool_description,
count=count,
)
if openrouter_only:
return await _generate_synthetic_queries_openrouter_only(
client,
tool_name,
prompt,
count,
)
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"systemInstruction": {
"parts": [
{"text": "You are a helpful assistant that outputs strict JSON."}
],
},
"generationConfig": {
"temperature": 0.8,
"maxOutputTokens": _QUERY_GEN_MAX_OUTPUT_TOKENS,
"responseMimeType": "application/json",
},
}
paid_key = get_paid_fallback_key()
max_attempts = _QUERY_GEN_MAX_ATTEMPTS
consecutive_failures = 0
_logged_paid_fallback = False
model_order = _gemini_query_model_order(model)
for attempt in range(max_attempts):
if attempt > 0:
exp = min(max(consecutive_failures - 1, 0), 20)
delay = min(
_QUERY_GEN_BACKOFF_BASE_S * (2.0**exp),
_QUERY_GEN_BACKOFF_MAX_S,
)
await asyncio.sleep(delay)
use_paid = bool(
paid_key and consecutive_failures >= _QUERY_GEN_PAID_AFTER_FAILURES,
)
gemini_key = paid_key if use_paid else next_gemini_flash_key()
if use_paid and not _logged_paid_fallback:
logger.info(
"Using paid Gemini key for %s after %d consecutive failures",
tool_name,
consecutive_failures,
)
_logged_paid_fallback = True
for gm in model_order:
url = f"{_GEMINI_GENERATE_BASE}/{gm}" f":generateContent?key={gemini_key}"
gemini_429_attempt = 0
same_model_immediate_retry = 0
while True:
try:
resp = await client.post(url, json=payload)
if resp.status_code == 429:
if is_daily_quota_429(resp):
await mark_key_daily_spent(
gemini_key,
"generate",
)
if gemini_429_attempt < _QUERY_GEN_429_MAX_RETRIES:
await _sleep_429_backoff(
gemini_429_attempt,
base_s=_QUERY_GEN_429_BACKOFF_BASE_S,
max_s=_QUERY_GEN_429_BACKOFF_MAX_S,
label=f"Gemini model={gm} tool={tool_name}",
)
gemini_429_attempt += 1
continue
logger.info(
"Gemini query-gen 429 exhausted model=%s tool=%s — next",
gm,
tool_name,
)
break
if resp.status_code in (500, 502, 503, 504):
logger.info(
"Gemini query-gen HTTP %s model=%s tool=%s — next",
resp.status_code,
gm,
tool_name,
)
break
if resp.status_code >= 400:
logger.warning(
"Gemini query-gen HTTP %s model=%s tool=%s",
resp.status_code,
gm,
tool_name,
)
break
resp_data = resp.json()
raw_text = _text_from_gemini_generate_response(
resp_data,
)
if not raw_text.strip():
same_model_immediate_retry += 1
if (
same_model_immediate_retry
>= _JSON_PARSE_MAX_RETRIES_SAME_MODEL
):
logger.warning(
"Empty Gemini content model=%s tool=%s after %d "
"immediate regenerates — next model",
gm,
tool_name,
same_model_immediate_retry,
)
break
logger.info(
"Empty Gemini content model=%s tool=%s — immediate "
"regenerate (%d/%d)",
gm,
tool_name,
same_model_immediate_retry,
_JSON_PARSE_MAX_RETRIES_SAME_MODEL,
)
continue
parsed = _parse_queries_from_response_text(
raw_text,
tool_name,
count,
)
if parsed is not None:
return parsed
same_model_immediate_retry += 1
if same_model_immediate_retry >= _JSON_PARSE_MAX_RETRIES_SAME_MODEL:
logger.warning(
"Invalid or incomplete query JSON for %s model=%s after "
"%d immediate regenerates — next model",
tool_name,
gm,
same_model_immediate_retry,
)
break
logger.info(
"Invalid or incomplete query JSON for %s model=%s — "
"immediate regenerate (%d/%d)",
tool_name,
gm,
same_model_immediate_retry,
_JSON_PARSE_MAX_RETRIES_SAME_MODEL,
)
continue
except Exception as exc:
logger.warning(
"Gemini query-gen error model=%s tool=%s: %s",
gm,
tool_name,
exc,
)
break
# All Gemini models failed this round — OpenRouter (query gen only)
try:
or_out = await _try_openrouter_query_gen(
client,
tool_name=tool_name,
prompt=prompt,
count=count,
primary_model_name=model,
)
if or_out is not None:
return or_out
except Exception as exc:
logger.error(
"OpenRouter query-gen fallback failed for %s: %s",
tool_name,
exc,
)
consecutive_failures += 1
logger.debug(
"Query-gen round failed for %s (failures=%d)",
tool_name,
consecutive_failures,
)
raise RuntimeError(
f"Exhausted synthetic query generation for tool {tool_name!r}: "
f"could not obtain {count} valid queries after {max_attempts} outer "
f"attempt(s) (each attempt tries all Gemini models with up to "
f"{_JSON_PARSE_MAX_RETRIES_SAME_MODEL} immediate regenerates per model, "
f"then OpenRouter).",
)
[docs]
async def build_index(tools_dir: str = "tools") -> None:
"""Discover every tool and generate its synthetic queries into the index file.
The top-level driver for the reverse-HyDE index build: it auto-discovers all
registered tools, generates ``SYNTHETIC_QUERY_COUNT`` synthetic user queries
per tool, and writes the merged result to ``tool_index_data.json``. That JSON
is the input later consumed by the embedding initializers to populate the
vector classifier, so this is the first stage of the tool-routing pipeline.
It builds a :class:`tools.ToolRegistry` and loads it via
:func:`tool_loader.load_tools`, reads any existing ``OUTPUT_FILE`` to skip
tools that already have enough queries (resumable), and fans out generation
across an ``asyncio.Semaphore(3)`` of three concurrent workers. Each worker
calls :func:`generate_synthetic_queries` over a shared ``httpx.AsyncClient``
(Gemini with OpenRouter fallback), and the final dict is written back to
``OUTPUT_FILE`` on disk. Progress is logged throughout. Invoked only from this
module's ``__main__`` guard via ``asyncio.run``; no other callers were found.
Args:
tools_dir: Directory scanned for tool modules. Defaults to ``"tools"``.
Returns:
None. Side effects are the synthetic-query writes to ``tool_index_data.json``.
"""
logger.info(
"--- Auto-discovering tools from %s ---",
tools_dir,
)
registry = ToolRegistry()
load_tools(tools_dir, registry)
all_tools = registry.list_tools()
logger.info("Found %d tools.", len(all_tools))
client = httpx.AsyncClient(
timeout=httpx.Timeout(600.0, connect=30.0),
)
index_data: dict[str, Any] = {}
if os.path.exists(OUTPUT_FILE):
try:
with open(
OUTPUT_FILE,
"r",
encoding="utf-8",
) as f:
index_data = json.load(f)
logger.info(
"Loaded existing data with %d entries.",
len(index_data),
)
except Exception:
pass
logger.info(
"--- Generating %d queries per tool ---",
SYNTHETIC_QUERY_COUNT,
)
sem = asyncio.Semaphore(3)
async def process_tool(tool: Any) -> None:
"""Generate and record the synthetic queries for one discovered tool.
Per-tool worker scheduled by :func:`build_index`: it skips a tool that
already has at least ``SYNTHETIC_QUERY_COUNT`` queries in the loaded
``index_data``, otherwise acquires the shared semaphore and calls
:func:`generate_synthetic_queries` (over the enclosing ``client``) to
produce a fresh batch. The result mutates the enclosing ``index_data`` dict
in place under ``tool.name``; the actual disk write happens once back in
:func:`build_index`.
Closes over ``index_data``, ``client``, and the ``sem`` semaphore from
:func:`build_index`, which gathers one instance per discovered tool. No
other callers exist. Logging is its only direct side effect; the network
I/O lives inside :func:`generate_synthetic_queries`.
Args:
tool: A discovered tool object exposing ``name`` and ``description``.
Returns:
None. Side effect is the in-place ``index_data`` update.
"""
if tool.name in index_data:
existing = index_data[tool.name].get(
"synthetic_queries",
[],
)
if len(existing) >= SYNTHETIC_QUERY_COUNT:
logger.info(
"Skipping %s (%d queries)",
tool.name,
len(existing),
)
return
logger.info(
"Regenerating %s (%d queries)",
tool.name,
len(existing),
)
async with sem:
logger.info("Generating for: %s", tool.name)
queries = await generate_synthetic_queries(
client,
None,
None,
tool.name,
tool.description or "",
)
index_data[tool.name] = {
"name": tool.name,
"description": tool.description,
"synthetic_queries": queries,
}
logger.info(
"Finished %s (%d queries)",
tool.name,
len(queries),
)
tasks = [process_tool(t) for t in all_tools]
await asyncio.gather(*tasks)
logger.info("--- Saving to %s ---", OUTPUT_FILE)
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump(index_data, f, indent=2)
logger.info("Done! %d tools indexed.", len(index_data))
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=("Build tool index (synthetic queries) " "for vector classifier"),
)
parser.add_argument(
"--tools-dir",
default="tools",
help="Directory containing tool scripts",
)
args = parser.parse_args()
asyncio.run(build_index(tools_dir=args.tools_dir))