"""Web search via the Brave Search API with rate limiting and key rotation."""
import aiohttp
import asyncio
import jsonutil as json
import logging
import time
from dataclasses import dataclass
from typing import Optional
from tools.alter_privileges import has_privilege, PRIVILEGES
logger = logging.getLogger(__name__)
BRAVE_SEARCH_URL = "https://api.search.brave.com/res/v1/web/search"
TAVILY_SEARCH_URL = "https://api.tavily.com/search"
TAVILY_API_KEY = "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"
MAX_RETRIES = 3
INITIAL_BACKOFF_SECONDS = 1.0
[docs]
class BraveAPIKeyManager:
"""Round-robin manager for the pool of shared Brave Search API keys.
Holds the list of default/pool Brave subscription tokens and hands them
out in rotation so that the per-key free-tier quota is spread across all
configured keys. Rotation is advanced both on every normal request (via
:meth:`get_next_key`) and on demand when a key returns HTTP 429/402,
letting :class:`BraveSearchRateLimiter` fail over to a fresh key without
aborting the search. A single module-level instance (``_key_manager``)
is shared by the rate limiter and the public helpers; state is purely
in-memory and not persisted.
"""
[docs]
def __init__(self):
"""Construct an empty key manager with no keys loaded.
Keys are populated later by :meth:`load_keys` once a config object is
available; the round-robin cursor starts at zero and the asyncio lock
guarding :meth:`get_next_key` is created lazily on first use.
"""
self.keys: list = []
self.current_index: int = 0
self._lock: Optional[asyncio.Lock] = None
[docs]
def load_keys(self, config):
"""Populate the key pool from the bot configuration.
Reads ``config.API_KEYS["brave"]`` (which may be a single string or a
list), discards blank/whitespace-only entries, and stores the cleaned
result on ``self.keys``; the outcome is logged at INFO when keys are
found and WARNING when none are. Any exception while reading the config
is swallowed and leaves the pool empty so a misconfiguration degrades
gracefully into "no shared key". Called once (guarded by the
module-level ``_keys_loaded`` flag) from :func:`run` and
:func:`search_with_key` the first time a search is performed.
Args:
config: Bot configuration object exposing an ``API_KEYS`` mapping.
"""
try:
brave_keys = getattr(config, "API_KEYS", {}).get("brave")
if isinstance(brave_keys, list):
self.keys = [k for k in brave_keys if k and k.strip()]
elif brave_keys and brave_keys.strip():
self.keys = [brave_keys.strip()]
else:
self.keys = []
if self.keys:
logger.info(f"Loaded {len(self.keys)} Brave Search API key(s)")
else:
logger.warning("No Brave Search API keys configured")
except Exception as e:
logger.error(f"Failed to load Brave API keys: {e}")
self.keys = []
async def _ensure_lock(self):
"""Lazily create the asyncio lock that serialises key handout.
The lock cannot be built in :meth:`__init__` because the manager is
instantiated at import time, possibly before any event loop exists.
This is called by :meth:`get_next_key` immediately before acquiring
the lock and is a no-op once the lock has been created.
"""
if self._lock is None:
self._lock = asyncio.Lock()
[docs]
async def get_next_key(self) -> Optional[str]:
"""Return the next pool key and advance the round-robin cursor.
Hands back the key at the current index, then moves the cursor forward
(wrapping modulo the pool size) so consecutive callers spread load
across keys. The read-and-advance is performed under the lazily created
lock from :meth:`_ensure_lock` so concurrent searches never receive the
same index or race the cursor update. Called by
:meth:`BraveSearchRateLimiter._brave_search` when the request is not
using a caller-supplied user key.
Returns:
Optional[str]: The selected pool key, or ``None`` if the pool is
empty (no Brave keys configured).
"""
if not self.keys:
return None
await self._ensure_lock()
async with self._lock:
key = self.keys[self.current_index]
self.current_index = (self.current_index + 1) % len(self.keys)
return key
[docs]
def rotate_key(self) -> Optional[str]:
"""Advance to the next pool key and return it, for failover.
Unlike :meth:`get_next_key`, this advances the cursor *before* reading
so it can be called mid-request to deliberately switch away from a key
that just failed, and it logs the new position at INFO. It is
synchronous (no lock) because it is only invoked from inside the rate
limiter worker's single-threaded retry loop. Called by
:meth:`BraveSearchRateLimiter._brave_search` after an HTTP 429 or a
network error to retry on a fresh key.
Returns:
Optional[str]: The newly current key, or ``None`` if the pool is
empty.
"""
if not self.keys:
return None
self.current_index = (self.current_index + 1) % len(self.keys)
logger.info(
f"Rotated to Brave API key {self.current_index + 1}/{len(self.keys)}"
)
return self.keys[self.current_index]
[docs]
def get_current_key(self) -> Optional[str]:
"""Return the key at the current cursor without advancing it.
A read-only peek at the active pool key; the round-robin cursor is left
untouched. Provided for completeness alongside the rotation helpers; it
is not currently called elsewhere in the repo.
Returns:
Optional[str]: The current key, or ``None`` if the pool is empty.
"""
if not self.keys:
return None
return self.keys[self.current_index]
[docs]
def get_key_count(self) -> int:
"""Return how many keys are in the pool.
Used to decide whether failover is even possible (rotating is pointless
with a single key) and to render the ``key N/M`` progress string in the
search logs. Called by :meth:`BraveSearchRateLimiter._brave_search`.
Returns:
int: The number of configured pool keys.
"""
return len(self.keys)
_key_manager = BraveAPIKeyManager()
_keys_loaded = False
# ---------------------------------------------------------------------------
# Authorization
# ---------------------------------------------------------------------------
async def _check_web_search_access(ctx) -> str | None:
"""Gate the tool behind the WEB_SEARCH privilege.
Pulls ``user_id``, ``redis``, and ``config`` off the tool context and
consults :func:`tools.alter_privileges.has_privilege` (which reads the
user's granted privileges from Redis) to decide whether the caller may run
a web search. On denial it returns a ready-to-emit JSON error string so the
caller can short-circuit; on success it returns ``None``. Called at the top
of :func:`run` as the authorization guard for this tool.
Args:
ctx: Tool execution context exposing ``user_id``, ``redis``, and
``config``.
Returns:
str | None: A JSON-encoded error payload when the user lacks the
WEB_SEARCH privilege, otherwise ``None``.
"""
user_id = getattr(ctx, "user_id", "") or ""
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
if not await has_privilege(redis, user_id, PRIVILEGES["WEB_SEARCH"], config):
return json.dumps(
{
"success": False,
"error": "The user does not have the WEB_SEARCH privilege. Ask an admin to grant it.",
}
)
return None
# ---------------------------------------------------------------------------
async def _tavily_fallback(query: str, count: int) -> Optional[str]:
"""Run the same query against the Tavily Search API as a fallback.
Invoked when the primary Brave request fails (quota exhausted, rate limit,
or network error) so a user still gets results. Issues an authenticated
HTTP POST to ``api.tavily.com/search`` via aiohttp and reshapes Tavily's
response into the same normalised result schema Brave returns (tagging it
with ``"source": "tavily"``). Any non-200 status or exception is logged and
reported to observability by scheduling
:func:`observability.publish_http_error_event` as a fire-and-forget task,
after which ``None`` is returned so the caller falls back to the raw Brave
error. Called by :meth:`BraveSearchRateLimiter._execute_search`.
Args:
query (str): The search query to run on Tavily.
count (int): Desired result count; clamped to the 1-20 range.
Returns:
Optional[str]: A JSON string of normalised results on success, or
``None`` if the fallback request failed.
"""
try:
payload = {
"query": query.strip(),
"max_results": max(1, min(20, count)),
"search_depth": "basic",
}
headers = {
"Authorization": f"Bearer {TAVILY_API_KEY}",
"Content-Type": "application/json",
}
async with aiohttp.ClientSession() as session:
logger.info("Tavily fallback search: '%s'", query)
async with session.post(
TAVILY_SEARCH_URL,
json=payload,
headers=headers,
timeout=aiohttp.ClientTimeout(total=30),
) as resp:
if resp.status != 200:
body = await resp.text()
logger.warning(
"Tavily fallback HTTP %d: %s", resp.status, body[:300]
)
from observability import publish_http_error_event
asyncio.create_task(
publish_http_error_event(
http_service="tavily_search",
http_status=resp.status,
endpoint="api.tavily.com/search",
detail=body[:500],
)
)
return None
data = await resp.json()
results = data.get("results") or []
formatted = {
"query": query,
"source": "tavily",
"total_results": len(results),
"results": [
{
"title": r.get("title", ""),
"description": r.get("content", ""),
"url": r.get("url", ""),
"display_url": "",
"age": "",
"page_age": "",
"language": "",
"family_friendly": True,
}
for r in results
],
}
logger.info("Tavily fallback OK: %d results", len(formatted["results"]))
return json.dumps(formatted, indent=2, ensure_ascii=False)
except Exception as exc:
logger.warning("Tavily fallback failed: %s", exc, exc_info=True)
from observability import publish_http_error_event
asyncio.create_task(
publish_http_error_event(
http_service="tavily_search",
http_status=0,
endpoint="api.tavily.com/search",
detail=str(exc)[:500],
error_kind="network",
)
)
return None
[docs]
@dataclass
class BraveSearchTask:
"""A single queued Brave search request awaiting rate-limited execution.
Bundles everything :class:`BraveSearchRateLimiter` needs to run one search
(the query plus its locale/safesearch options, the resolved key and caller
identity for quota accounting, and a Redis handle and config for privilege
checks) together with an :class:`asyncio.Future` the worker resolves with
the result. Instances are created by
:meth:`BraveSearchRateLimiter.search`, placed on the limiter's internal
queue, and consumed by its background worker.
"""
query: str
count: Optional[int]
country: Optional[str]
search_lang: Optional[str]
ui_lang: Optional[str]
safesearch: Optional[str]
future: asyncio.Future
user_api_key: Optional[str] = None
user_id: Optional[str] = None
redis_client: Optional[object] = None
config: Optional[object] = None
[docs]
class BraveSearchRateLimiter:
"""Process-wide serialiser that throttles all Brave API traffic.
Every search is enqueued as a :class:`BraveSearchTask` and executed by a
single background worker coroutine, guaranteeing the configured minimum
gap between outbound Brave calls regardless of how many concurrent tool
invocations are in flight. The worker performs the Brave request (with
retries and key rotation), applies the Tavily fallback on failure, and
resolves each task's future with the result. A single module-level
instance (``_rate_limiter``) backs both :func:`run` and
:func:`search_with_key`.
"""
[docs]
def __init__(self, calls_per_interval: float = 0.5, interval_seconds: float = 1.0):
"""Configure the throttle rate; defer all async setup until first use.
Stores the rate parameters but does not create the queue, lock, or
worker task here (no event loop is guaranteed at construction time);
those are built lazily by :meth:`_ensure_initialized`. The default of
0.5 calls per 1.0 second yields a minimum spacing of two seconds
between Brave requests.
Args:
calls_per_interval (float): Permitted number of calls within each
interval window.
interval_seconds (float): Length of the interval window in seconds.
"""
self.calls_per_interval = calls_per_interval
self.interval_seconds = interval_seconds
self.task_queue: Optional[asyncio.Queue] = None
self.last_call_time: float = 0.0
self._lock: Optional[asyncio.Lock] = None
self._worker_task: Optional[asyncio.Task] = None
self._initialized = False
async def _ensure_initialized(self):
"""Lazily build the queue, lock, and background worker on first search.
Creates the :class:`asyncio.Queue` tasks are pushed onto, the lock that
guards rate-limit timing, and spawns the long-lived :meth:`_worker`
coroutine via :func:`asyncio.create_task`. Guarded by the
``_initialized`` flag so the worker is started exactly once. Called at
the top of :meth:`search`.
"""
if not self._initialized:
self.task_queue = asyncio.Queue()
self._lock = asyncio.Lock()
self._worker_task = asyncio.create_task(self._worker())
self._initialized = True
async def _worker(self):
"""Background loop that drains the queue one task at a time.
Runs forever, pulling each queued :class:`BraveSearchTask`, waiting out
the rate limit via :meth:`_wait_for_rate_limit`, then executing the
search through :meth:`_execute_search` and resolving the task's future
with the result (or its exception). This serial structure is what
enforces the global throttle. Exits cleanly on
:class:`asyncio.CancelledError`; other per-task errors are logged and
propagated to that task's future without killing the loop. Spawned once
by :meth:`_ensure_initialized`.
"""
while True:
try:
task = await self.task_queue.get()
await self._wait_for_rate_limit()
try:
result = await self._execute_search(task)
task.future.set_result(result)
except Exception as e:
task.future.set_exception(e)
self.task_queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in Brave search rate limiter worker: {e}")
if "task" in locals() and not task.future.done():
task.future.set_exception(e)
async def _wait_for_rate_limit(self):
"""Sleep just long enough to honour the minimum inter-call spacing.
Under the limiter lock, computes the minimum interval implied by
``interval_seconds / calls_per_interval`` and, if the previous call was
more recent than that, awaits the remaining time before stamping
``last_call_time``. This is the actual throttle enforced once per task
by :meth:`_worker`.
"""
async with self._lock:
current_time = time.time()
time_since_last = current_time - self.last_call_time
min_interval = self.interval_seconds / self.calls_per_interval
if time_since_last < min_interval:
await asyncio.sleep(min_interval - time_since_last)
self.last_call_time = time.time()
async def _execute_search(self, task: BraveSearchTask) -> str:
"""Run the Brave request and fall back to Tavily when it fails.
Delegates to :meth:`_brave_search`, parses the JSON it returns, and on
success tags the payload with ``"source": "brave"`` and re-serialises
it. If Brave reported an error, logs a warning and tries
:func:`_tavily_fallback`; the Tavily result is returned when available,
otherwise the original Brave error string is passed through unchanged.
Called by :meth:`_worker` for each dequeued task.
Args:
task (BraveSearchTask): The queued search request to execute.
Returns:
str: A JSON string of results (from Brave or Tavily) or a JSON
error payload.
"""
brave_result = await self._brave_search(task)
try:
parsed = json.loads(brave_result)
except (json.JSONDecodeError, TypeError):
parsed = {"error": "unparseable response"}
if "error" not in parsed:
parsed["source"] = "brave"
return json.dumps(parsed, indent=2, ensure_ascii=False)
logger.warning(
"Brave search failed (%s), trying Tavily fallback",
parsed.get("error", "")[:120],
)
count = task.count if task.count is not None else 10
fallback = await _tavily_fallback(task.query, count)
if fallback is not None:
return fallback
return brave_result
async def _brave_search(self, task: BraveSearchTask) -> str:
"""Issue the Brave Search HTTP request with quota checks and retries.
The core network routine. It resolves which key to use (the caller's
own ``user_api_key`` when present, otherwise the next pool key from
``_key_manager``) and, for pool-key usage by a non-exempt user, enforces
a 50/day per-user cap via
:func:`tools.manage_api_keys.check_default_key_limit` (admins and holders
of the BYPASS_RATELIMIT privilege are exempt), incrementing the counter
with :func:`tools.manage_api_keys.increment_default_key_usage` on
success. It then builds the query parameters, performs up to
``MAX_RETRIES`` aiohttp GETs against the Brave endpoint with exponential
backoff, rotating the pool key on HTTP 429 and network errors, and maps
each error status (401/402/403/400/429/other) to a descriptive JSON
payload. Every failure path schedules
:func:`observability.publish_http_error_event` as a fire-and-forget
task. On HTTP 200 it reshapes the Brave web results into the normalised
result schema. Called by :meth:`_execute_search`.
Args:
task (BraveSearchTask): The request to execute, including query,
options, resolved key, and caller identity for quota accounting.
Returns:
str: A JSON string of normalised results on success, or a JSON
error payload describing the failure.
"""
if task.user_api_key:
api_key = task.user_api_key
using_user_key = True
else:
api_key = await _key_manager.get_next_key()
using_user_key = False
if not api_key:
from tools.manage_api_keys import missing_api_key_error
return json.dumps({"error": missing_api_key_error("brave")})
# Rate-limit default/pool key usage (50/day for search)
# (exempt: admin, BYPASS_RATELIMIT privilege, own key)
rate_limit_applies = False
if not using_user_key and task.user_id and task.redis_client:
task_config = getattr(task, "config", None)
admin_ids = getattr(task_config, "admin_user_ids", None) or []
is_admin = str(task.user_id) in admin_ids
bypass = is_admin or await has_privilege(
task.redis_client,
task.user_id,
PRIVILEGES["BYPASS_RATELIMIT"],
task_config,
)
rate_limit_applies = not bypass
if rate_limit_applies:
from tools.manage_api_keys import (
check_default_key_limit,
default_key_limit_error,
)
allowed, current, limit = await check_default_key_limit(
task.user_id,
"brave_web_search",
task.redis_client,
daily_limit=50,
)
if not allowed:
return json.dumps(
{
"error": default_key_limit_error(
"brave_web_search", current, limit
)
}
)
count = task.count
if count is None:
count = 10
try:
count = int(count)
except (TypeError, ValueError):
count = 10
count = max(1, min(20, count))
params = {
"q": task.query.strip(),
"count": count,
"safesearch": task.safesearch or "moderate",
}
if task.country:
params["country"] = task.country.upper()
if task.search_lang:
params["search_lang"] = task.search_lang.lower()
if task.ui_lang:
params["ui_lang"] = task.ui_lang.lower()
last_error = None
for attempt in range(MAX_RETRIES):
headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"X-Subscription-Token": api_key,
}
try:
async with aiohttp.ClientSession() as session:
logger.info(
f"Brave search: '{task.query}' (attempt {attempt + 1}/{MAX_RETRIES}, "
f"key {_key_manager.current_index + 1}/{_key_manager.get_key_count()})"
)
async with session.get(
BRAVE_SEARCH_URL,
headers=headers,
params=params,
timeout=aiohttp.ClientTimeout(total=30),
) as response:
if response.status == 200:
data = await response.json()
formatted_results = {
"query": task.query,
"total_results": len(
data.get("web", {}).get("results", [])
),
"results": [],
}
web_results = data.get("web", {}).get("results", [])
for result in web_results[:count]:
item = {
"title": result.get("title", ""),
"description": result.get("description", ""),
"url": result.get("url", ""),
"display_url": result.get("display_url", ""),
"age": result.get("age", ""),
"page_age": result.get("page_age", ""),
"language": result.get("language", ""),
"family_friendly": result.get(
"family_friendly", True
),
}
if "profile" in result:
item["profile"] = {
"name": result["profile"].get("name", ""),
"url": result["profile"].get("url", ""),
"long_name": result["profile"].get(
"long_name", ""
),
"img": result["profile"].get("img", ""),
}
formatted_results["results"].append(item)
logger.info(
f"Brave search OK: {len(formatted_results['results'])} results"
)
# Increment default-key usage counter on success
if (
rate_limit_applies
and task.user_id
and task.redis_client
):
from tools.manage_api_keys import (
increment_default_key_usage,
)
await increment_default_key_usage(
task.user_id, "brave_web_search", task.redis_client
)
return json.dumps(
formatted_results, indent=2, ensure_ascii=False
)
elif response.status == 429:
last_error = "Rate limit exceeded"
if attempt < MAX_RETRIES - 1:
if not using_user_key:
total_keys = _key_manager.get_key_count()
new_key = _key_manager.rotate_key()
if new_key and total_keys > 1:
api_key = new_key
backoff_time = INITIAL_BACKOFF_SECONDS * (2**attempt)
await asyncio.sleep(backoff_time)
continue
from observability import publish_http_error_event
asyncio.create_task(
publish_http_error_event(
http_service="brave_search",
http_status=429,
endpoint="api.search.brave.com/res/v1/web/search",
detail="Rate limit exceeded - all retries exhausted",
)
)
return json.dumps(
{"error": "Rate limit exceeded - all retries exhausted"}
)
elif response.status == 402:
from observability import publish_http_error_event
asyncio.create_task(
publish_http_error_event(
http_service="brave_search",
http_status=402,
endpoint="api.search.brave.com/res/v1/web/search",
detail="Payment required / free tier exhausted",
)
)
return json.dumps(
{
"error": "payment_required",
"message": (
"The Brave Search API key has exceeded its free tier quota. "
"The user needs to provide their own Brave Search API key. "
"They can obtain one for free at https://brave.com/search/api/ "
"(the free plan allows 2,000 queries/month). "
"To add their key, they should send you a DM with something like: "
"'Save my Brave search API key: BSA_xxxxxxxxx' "
"and you will store it securely for their future searches."
),
}
)
elif response.status == 401:
from observability import publish_http_error_event
asyncio.create_task(
publish_http_error_event(
http_service="brave_search",
http_status=401,
endpoint="api.search.brave.com/res/v1/web/search",
detail="Invalid API key or unauthorized",
)
)
return json.dumps(
{"error": "Invalid API key or unauthorized access"}
)
elif response.status == 403:
from observability import publish_http_error_event
asyncio.create_task(
publish_http_error_event(
http_service="brave_search",
http_status=403,
endpoint="api.search.brave.com/res/v1/web/search",
detail="Forbidden - check API key permissions",
)
)
return json.dumps(
{"error": "Forbidden - check API key permissions"}
)
elif response.status == 400:
from observability import publish_http_error_event
asyncio.create_task(
publish_http_error_event(
http_service="brave_search",
http_status=400,
endpoint="api.search.brave.com/res/v1/web/search",
detail="Bad request - check query parameters",
)
)
return json.dumps(
{"error": "Bad request - check query parameters"}
)
else:
from observability import publish_http_error_event
asyncio.create_task(
publish_http_error_event(
http_service="brave_search",
http_status=response.status,
endpoint="api.search.brave.com/res/v1/web/search",
detail=f"API error: HTTP {response.status}",
)
)
return json.dumps(
{"error": f"API error: HTTP {response.status}"}
)
except aiohttp.ClientError as e:
last_error = str(e)
logger.error(f"Network error on attempt {attempt + 1}: {e}")
if attempt < MAX_RETRIES - 1:
backoff_time = INITIAL_BACKOFF_SECONDS * (2**attempt)
await asyncio.sleep(backoff_time)
if not using_user_key:
api_key = _key_manager.rotate_key() or api_key
continue
from observability import publish_http_error_event
asyncio.create_task(
publish_http_error_event(
http_service="brave_search",
http_status=0,
endpoint="api.search.brave.com/res/v1/web/search",
detail=f"Search failed after {MAX_RETRIES} attempts: {last_error}"[
:500
],
error_kind="network",
)
)
return json.dumps(
{"error": f"Search failed after {MAX_RETRIES} attempts: {last_error}"}
)
[docs]
async def search(
self,
query: str,
count: Optional[int] = 10,
country: Optional[str] = None,
search_lang: Optional[str] = None,
ui_lang: Optional[str] = None,
safesearch: Optional[str] = "moderate",
user_api_key: Optional[str] = None,
user_id: Optional[str] = None,
redis_client: Optional[object] = None,
config: Optional[object] = None,
) -> str:
"""Submit a search to the rate-limited worker and await its result.
The public entry point on the limiter. It rejects an empty query up
front, ensures the worker is running via :meth:`_ensure_initialized`,
wraps the arguments in a :class:`BraveSearchTask` carrying a fresh
future, enqueues it, and awaits the future the worker resolves. Any
exception surfaced through the future is logged and converted into a
JSON error payload so callers always get a string. Called by
:func:`run` (the tool entry point) and :func:`search_with_key` (the
internal programmatic helper).
Args:
query (str): Search query or input string.
count (Optional[int]): Number of results to return.
country (Optional[str]): The country value.
search_lang (Optional[str]): The search lang value.
ui_lang (Optional[str]): The ui lang value.
safesearch (Optional[str]): The safesearch value.
user_api_key (Optional[str]): The user api key value.
user_id (Optional[str]): Unique identifier for the user.
redis_client (Optional[object]): Redis connection client.
config (Optional[object]): Bot config for admin/privilege checks.
Returns:
str: Result string.
"""
if not query or not query.strip():
return json.dumps({"error": "Search query cannot be empty"})
await self._ensure_initialized()
future = asyncio.Future()
task = BraveSearchTask(
query=query,
count=count,
country=country,
search_lang=search_lang,
ui_lang=ui_lang,
safesearch=safesearch,
future=future,
user_api_key=user_api_key,
user_id=user_id,
redis_client=redis_client,
config=config,
)
await self.task_queue.put(task)
try:
return await future
except Exception as e:
logger.error(f"Error in Brave search: {e}")
return json.dumps({"error": f"Search failed: {str(e)}"})
_rate_limiter = BraveSearchRateLimiter(calls_per_interval=0.5, interval_seconds=1.0)
# ---------------------------------------------------------------------------
# Public API for programmatic (non-tool) callers
# ---------------------------------------------------------------------------
[docs]
async def search_with_key(
query: str,
count: int = 3,
api_key: Optional[str] = None,
) -> str:
"""Execute a Brave search through the shared rate limiter.
This is the entry point for internal callers (e.g.
:class:`~web_search_context.WebSearchContextManager`) that already
have a resolved API key and don't need the tool-context ceremony.
"""
global _keys_loaded
if not _keys_loaded:
try:
from config import Config
_key_manager.load_keys(Config.load())
_keys_loaded = True
except Exception:
pass
return await _rate_limiter.search(
query=query,
count=count,
user_api_key=api_key,
)
# ---------------------------------------------------------------------------
# v3 tool interface
# ---------------------------------------------------------------------------
TOOL_NAME = "brave_web_search"
# Exempt from consecutive-round repetitive tool-call detection (openrouter_client).
TOOL_ALLOW_REPEAT = True
TOOL_DESCRIPTION = (
"Search the web using the Brave Search API. Returns titles, "
"descriptions, URLs, and metadata for matching results. Supports search "
"operators in the query (e.g. site:, filetype:, intitle:, lang:) for "
"precise filtering."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"Search query. You can use Brave search operators for precise results. "
"Examples: site:example.com (limit to a domain); filetype:pdf or ext:pdf "
'(file type); intitle:keyword (term in title); inbody:"exact phrase" '
'(term in body); lang:es or loc:ca (language/region); "exact phrase"; '
"-term (exclude); +term (require); AND, OR, NOT for logic. "
'E.g. "climate change filetype:pdf site:edu" or "python asyncio site:docs.python.org".'
),
},
"count": {
"type": "integer",
"description": "Number of results to return (1-20, default 10).",
},
"country": {
"type": "string",
"description": "Country code for localised results (e.g. US, GB, CA).",
},
"search_lang": {
"type": "string",
"description": "Language code for search results (e.g. en, es, fr).",
},
"ui_lang": {
"type": "string",
"description": "Language code for UI elements.",
},
"safesearch": {
"type": "string",
"enum": ["off", "moderate", "strict"],
"description": "Safe-search level (default: off).",
},
},
"required": ["query"],
}
[docs]
async def run(
query: str,
count: int = 10,
country: str = None,
search_lang: str = None,
ui_lang: str = None,
safesearch: str = "off",
ctx=None,
) -> str:
"""Entry point for the ``brave_web_search`` tool: authorize, then search.
Dispatched by ``tool_loader`` under the single-tool ``TOOL_NAME``/``run``
convention when the LLM invokes ``brave_web_search``. It first enforces the
WEB_SEARCH privilege via :func:`_check_web_search_access`, lazily loads the
shared key pool from ``ctx.config`` (guarded by the module ``_keys_loaded``
flag), and looks up the caller's own stored Brave key from Redis through
:func:`tools.manage_api_keys.get_user_api_key` so personal keys bypass the
pool quota. It then forwards everything to
:meth:`BraveSearchRateLimiter.search` on the shared ``_rate_limiter`` and
returns its JSON result string.
Args:
query (str): Search query, optionally using Brave search operators.
count (int): Number of results to return (clamped to 1-20).
country (str): Country code for localised results (e.g. US, GB).
search_lang (str): Language code for search results (e.g. en, es).
ui_lang (str): Language code for UI elements.
safesearch (str): Safe-search level (off, moderate, or strict).
ctx: Tool execution context exposing ``user_id``, ``redis``,
``channel_id``, and ``config``.
Returns:
str: A JSON string of search results, or a JSON error payload.
"""
auth_err = await _check_web_search_access(ctx)
if auth_err:
return auth_err
global _keys_loaded
if not _keys_loaded and ctx and ctx.config:
_key_manager.load_keys(ctx.config)
_keys_loaded = True
user_api_key = None
if ctx and ctx.redis and ctx.user_id:
try:
from tools.manage_api_keys import get_user_api_key
user_api_key = await get_user_api_key(
ctx.user_id,
"brave",
redis_client=ctx.redis,
channel_id=ctx.channel_id,
config=getattr(ctx, "config", None),
)
except Exception:
pass
return await _rate_limiter.search(
query=query,
count=count,
country=country,
search_lang=search_lang,
ui_lang=ui_lang,
safesearch=safesearch,
user_api_key=user_api_key,
user_id=getattr(ctx, "user_id", None) if ctx else None,
redis_client=getattr(ctx, "redis", None) if ctx else None,
config=getattr(ctx, "config", None) if ctx else None,
)