Source code for tools.brave_search

"""Web search via the Brave Search API with rate limiting and key rotation."""

import aiohttp
import asyncio
import jsonutil as json
import logging
import time
from dataclasses import dataclass
from typing import Optional

from tools.alter_privileges import has_privilege, PRIVILEGES

logger = logging.getLogger(__name__)

BRAVE_SEARCH_URL = "https://api.search.brave.com/res/v1/web/search"
TAVILY_SEARCH_URL = "https://api.tavily.com/search"
TAVILY_API_KEY = "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"
MAX_RETRIES = 3
INITIAL_BACKOFF_SECONDS = 1.0


[docs] class BraveAPIKeyManager: """Round-robin manager for the pool of shared Brave Search API keys. Holds the list of default/pool Brave subscription tokens and hands them out in rotation so that the per-key free-tier quota is spread across all configured keys. Rotation is advanced both on every normal request (via :meth:`get_next_key`) and on demand when a key returns HTTP 429/402, letting :class:`BraveSearchRateLimiter` fail over to a fresh key without aborting the search. A single module-level instance (``_key_manager``) is shared by the rate limiter and the public helpers; state is purely in-memory and not persisted. """
[docs] def __init__(self): """Construct an empty key manager with no keys loaded. Keys are populated later by :meth:`load_keys` once a config object is available; the round-robin cursor starts at zero and the asyncio lock guarding :meth:`get_next_key` is created lazily on first use. """ self.keys: list = [] self.current_index: int = 0 self._lock: Optional[asyncio.Lock] = None
[docs] def load_keys(self, config): """Populate the key pool from the bot configuration. Reads ``config.API_KEYS["brave"]`` (which may be a single string or a list), discards blank/whitespace-only entries, and stores the cleaned result on ``self.keys``; the outcome is logged at INFO when keys are found and WARNING when none are. Any exception while reading the config is swallowed and leaves the pool empty so a misconfiguration degrades gracefully into "no shared key". Called once (guarded by the module-level ``_keys_loaded`` flag) from :func:`run` and :func:`search_with_key` the first time a search is performed. Args: config: Bot configuration object exposing an ``API_KEYS`` mapping. """ try: brave_keys = getattr(config, "API_KEYS", {}).get("brave") if isinstance(brave_keys, list): self.keys = [k for k in brave_keys if k and k.strip()] elif brave_keys and brave_keys.strip(): self.keys = [brave_keys.strip()] else: self.keys = [] if self.keys: logger.info(f"Loaded {len(self.keys)} Brave Search API key(s)") else: logger.warning("No Brave Search API keys configured") except Exception as e: logger.error(f"Failed to load Brave API keys: {e}") self.keys = []
async def _ensure_lock(self): """Lazily create the asyncio lock that serialises key handout. The lock cannot be built in :meth:`__init__` because the manager is instantiated at import time, possibly before any event loop exists. This is called by :meth:`get_next_key` immediately before acquiring the lock and is a no-op once the lock has been created. """ if self._lock is None: self._lock = asyncio.Lock()
[docs] async def get_next_key(self) -> Optional[str]: """Return the next pool key and advance the round-robin cursor. Hands back the key at the current index, then moves the cursor forward (wrapping modulo the pool size) so consecutive callers spread load across keys. The read-and-advance is performed under the lazily created lock from :meth:`_ensure_lock` so concurrent searches never receive the same index or race the cursor update. Called by :meth:`BraveSearchRateLimiter._brave_search` when the request is not using a caller-supplied user key. Returns: Optional[str]: The selected pool key, or ``None`` if the pool is empty (no Brave keys configured). """ if not self.keys: return None await self._ensure_lock() async with self._lock: key = self.keys[self.current_index] self.current_index = (self.current_index + 1) % len(self.keys) return key
[docs] def rotate_key(self) -> Optional[str]: """Advance to the next pool key and return it, for failover. Unlike :meth:`get_next_key`, this advances the cursor *before* reading so it can be called mid-request to deliberately switch away from a key that just failed, and it logs the new position at INFO. It is synchronous (no lock) because it is only invoked from inside the rate limiter worker's single-threaded retry loop. Called by :meth:`BraveSearchRateLimiter._brave_search` after an HTTP 429 or a network error to retry on a fresh key. Returns: Optional[str]: The newly current key, or ``None`` if the pool is empty. """ if not self.keys: return None self.current_index = (self.current_index + 1) % len(self.keys) logger.info( f"Rotated to Brave API key {self.current_index + 1}/{len(self.keys)}" ) return self.keys[self.current_index]
[docs] def get_current_key(self) -> Optional[str]: """Return the key at the current cursor without advancing it. A read-only peek at the active pool key; the round-robin cursor is left untouched. Provided for completeness alongside the rotation helpers; it is not currently called elsewhere in the repo. Returns: Optional[str]: The current key, or ``None`` if the pool is empty. """ if not self.keys: return None return self.keys[self.current_index]
[docs] def get_key_count(self) -> int: """Return how many keys are in the pool. Used to decide whether failover is even possible (rotating is pointless with a single key) and to render the ``key N/M`` progress string in the search logs. Called by :meth:`BraveSearchRateLimiter._brave_search`. Returns: int: The number of configured pool keys. """ return len(self.keys)
_key_manager = BraveAPIKeyManager() _keys_loaded = False # --------------------------------------------------------------------------- # Authorization # --------------------------------------------------------------------------- async def _check_web_search_access(ctx) -> str | None: """Gate the tool behind the WEB_SEARCH privilege. Pulls ``user_id``, ``redis``, and ``config`` off the tool context and consults :func:`tools.alter_privileges.has_privilege` (which reads the user's granted privileges from Redis) to decide whether the caller may run a web search. On denial it returns a ready-to-emit JSON error string so the caller can short-circuit; on success it returns ``None``. Called at the top of :func:`run` as the authorization guard for this tool. Args: ctx: Tool execution context exposing ``user_id``, ``redis``, and ``config``. Returns: str | None: A JSON-encoded error payload when the user lacks the WEB_SEARCH privilege, otherwise ``None``. """ user_id = getattr(ctx, "user_id", "") or "" redis = getattr(ctx, "redis", None) config = getattr(ctx, "config", None) if not await has_privilege(redis, user_id, PRIVILEGES["WEB_SEARCH"], config): return json.dumps( { "success": False, "error": "The user does not have the WEB_SEARCH privilege. Ask an admin to grant it.", } ) return None # --------------------------------------------------------------------------- async def _tavily_fallback(query: str, count: int) -> Optional[str]: """Run the same query against the Tavily Search API as a fallback. Invoked when the primary Brave request fails (quota exhausted, rate limit, or network error) so a user still gets results. Issues an authenticated HTTP POST to ``api.tavily.com/search`` via aiohttp and reshapes Tavily's response into the same normalised result schema Brave returns (tagging it with ``"source": "tavily"``). Any non-200 status or exception is logged and reported to observability by scheduling :func:`observability.publish_http_error_event` as a fire-and-forget task, after which ``None`` is returned so the caller falls back to the raw Brave error. Called by :meth:`BraveSearchRateLimiter._execute_search`. Args: query (str): The search query to run on Tavily. count (int): Desired result count; clamped to the 1-20 range. Returns: Optional[str]: A JSON string of normalised results on success, or ``None`` if the fallback request failed. """ try: payload = { "query": query.strip(), "max_results": max(1, min(20, count)), "search_depth": "basic", } headers = { "Authorization": f"Bearer {TAVILY_API_KEY}", "Content-Type": "application/json", } async with aiohttp.ClientSession() as session: logger.info("Tavily fallback search: '%s'", query) async with session.post( TAVILY_SEARCH_URL, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=30), ) as resp: if resp.status != 200: body = await resp.text() logger.warning( "Tavily fallback HTTP %d: %s", resp.status, body[:300] ) from observability import publish_http_error_event asyncio.create_task( publish_http_error_event( http_service="tavily_search", http_status=resp.status, endpoint="api.tavily.com/search", detail=body[:500], ) ) return None data = await resp.json() results = data.get("results") or [] formatted = { "query": query, "source": "tavily", "total_results": len(results), "results": [ { "title": r.get("title", ""), "description": r.get("content", ""), "url": r.get("url", ""), "display_url": "", "age": "", "page_age": "", "language": "", "family_friendly": True, } for r in results ], } logger.info("Tavily fallback OK: %d results", len(formatted["results"])) return json.dumps(formatted, indent=2, ensure_ascii=False) except Exception as exc: logger.warning("Tavily fallback failed: %s", exc, exc_info=True) from observability import publish_http_error_event asyncio.create_task( publish_http_error_event( http_service="tavily_search", http_status=0, endpoint="api.tavily.com/search", detail=str(exc)[:500], error_kind="network", ) ) return None
[docs] @dataclass class BraveSearchTask: """A single queued Brave search request awaiting rate-limited execution. Bundles everything :class:`BraveSearchRateLimiter` needs to run one search (the query plus its locale/safesearch options, the resolved key and caller identity for quota accounting, and a Redis handle and config for privilege checks) together with an :class:`asyncio.Future` the worker resolves with the result. Instances are created by :meth:`BraveSearchRateLimiter.search`, placed on the limiter's internal queue, and consumed by its background worker. """ query: str count: Optional[int] country: Optional[str] search_lang: Optional[str] ui_lang: Optional[str] safesearch: Optional[str] future: asyncio.Future user_api_key: Optional[str] = None user_id: Optional[str] = None redis_client: Optional[object] = None config: Optional[object] = None
[docs] class BraveSearchRateLimiter: """Process-wide serialiser that throttles all Brave API traffic. Every search is enqueued as a :class:`BraveSearchTask` and executed by a single background worker coroutine, guaranteeing the configured minimum gap between outbound Brave calls regardless of how many concurrent tool invocations are in flight. The worker performs the Brave request (with retries and key rotation), applies the Tavily fallback on failure, and resolves each task's future with the result. A single module-level instance (``_rate_limiter``) backs both :func:`run` and :func:`search_with_key`. """
[docs] def __init__(self, calls_per_interval: float = 0.5, interval_seconds: float = 1.0): """Configure the throttle rate; defer all async setup until first use. Stores the rate parameters but does not create the queue, lock, or worker task here (no event loop is guaranteed at construction time); those are built lazily by :meth:`_ensure_initialized`. The default of 0.5 calls per 1.0 second yields a minimum spacing of two seconds between Brave requests. Args: calls_per_interval (float): Permitted number of calls within each interval window. interval_seconds (float): Length of the interval window in seconds. """ self.calls_per_interval = calls_per_interval self.interval_seconds = interval_seconds self.task_queue: Optional[asyncio.Queue] = None self.last_call_time: float = 0.0 self._lock: Optional[asyncio.Lock] = None self._worker_task: Optional[asyncio.Task] = None self._initialized = False
async def _ensure_initialized(self): """Lazily build the queue, lock, and background worker on first search. Creates the :class:`asyncio.Queue` tasks are pushed onto, the lock that guards rate-limit timing, and spawns the long-lived :meth:`_worker` coroutine via :func:`asyncio.create_task`. Guarded by the ``_initialized`` flag so the worker is started exactly once. Called at the top of :meth:`search`. """ if not self._initialized: self.task_queue = asyncio.Queue() self._lock = asyncio.Lock() self._worker_task = asyncio.create_task(self._worker()) self._initialized = True async def _worker(self): """Background loop that drains the queue one task at a time. Runs forever, pulling each queued :class:`BraveSearchTask`, waiting out the rate limit via :meth:`_wait_for_rate_limit`, then executing the search through :meth:`_execute_search` and resolving the task's future with the result (or its exception). This serial structure is what enforces the global throttle. Exits cleanly on :class:`asyncio.CancelledError`; other per-task errors are logged and propagated to that task's future without killing the loop. Spawned once by :meth:`_ensure_initialized`. """ while True: try: task = await self.task_queue.get() await self._wait_for_rate_limit() try: result = await self._execute_search(task) task.future.set_result(result) except Exception as e: task.future.set_exception(e) self.task_queue.task_done() except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in Brave search rate limiter worker: {e}") if "task" in locals() and not task.future.done(): task.future.set_exception(e) async def _wait_for_rate_limit(self): """Sleep just long enough to honour the minimum inter-call spacing. Under the limiter lock, computes the minimum interval implied by ``interval_seconds / calls_per_interval`` and, if the previous call was more recent than that, awaits the remaining time before stamping ``last_call_time``. This is the actual throttle enforced once per task by :meth:`_worker`. """ async with self._lock: current_time = time.time() time_since_last = current_time - self.last_call_time min_interval = self.interval_seconds / self.calls_per_interval if time_since_last < min_interval: await asyncio.sleep(min_interval - time_since_last) self.last_call_time = time.time() async def _execute_search(self, task: BraveSearchTask) -> str: """Run the Brave request and fall back to Tavily when it fails. Delegates to :meth:`_brave_search`, parses the JSON it returns, and on success tags the payload with ``"source": "brave"`` and re-serialises it. If Brave reported an error, logs a warning and tries :func:`_tavily_fallback`; the Tavily result is returned when available, otherwise the original Brave error string is passed through unchanged. Called by :meth:`_worker` for each dequeued task. Args: task (BraveSearchTask): The queued search request to execute. Returns: str: A JSON string of results (from Brave or Tavily) or a JSON error payload. """ brave_result = await self._brave_search(task) try: parsed = json.loads(brave_result) except (json.JSONDecodeError, TypeError): parsed = {"error": "unparseable response"} if "error" not in parsed: parsed["source"] = "brave" return json.dumps(parsed, indent=2, ensure_ascii=False) logger.warning( "Brave search failed (%s), trying Tavily fallback", parsed.get("error", "")[:120], ) count = task.count if task.count is not None else 10 fallback = await _tavily_fallback(task.query, count) if fallback is not None: return fallback return brave_result async def _brave_search(self, task: BraveSearchTask) -> str: """Issue the Brave Search HTTP request with quota checks and retries. The core network routine. It resolves which key to use (the caller's own ``user_api_key`` when present, otherwise the next pool key from ``_key_manager``) and, for pool-key usage by a non-exempt user, enforces a 50/day per-user cap via :func:`tools.manage_api_keys.check_default_key_limit` (admins and holders of the BYPASS_RATELIMIT privilege are exempt), incrementing the counter with :func:`tools.manage_api_keys.increment_default_key_usage` on success. It then builds the query parameters, performs up to ``MAX_RETRIES`` aiohttp GETs against the Brave endpoint with exponential backoff, rotating the pool key on HTTP 429 and network errors, and maps each error status (401/402/403/400/429/other) to a descriptive JSON payload. Every failure path schedules :func:`observability.publish_http_error_event` as a fire-and-forget task. On HTTP 200 it reshapes the Brave web results into the normalised result schema. Called by :meth:`_execute_search`. Args: task (BraveSearchTask): The request to execute, including query, options, resolved key, and caller identity for quota accounting. Returns: str: A JSON string of normalised results on success, or a JSON error payload describing the failure. """ if task.user_api_key: api_key = task.user_api_key using_user_key = True else: api_key = await _key_manager.get_next_key() using_user_key = False if not api_key: from tools.manage_api_keys import missing_api_key_error return json.dumps({"error": missing_api_key_error("brave")}) # Rate-limit default/pool key usage (50/day for search) # (exempt: admin, BYPASS_RATELIMIT privilege, own key) rate_limit_applies = False if not using_user_key and task.user_id and task.redis_client: task_config = getattr(task, "config", None) admin_ids = getattr(task_config, "admin_user_ids", None) or [] is_admin = str(task.user_id) in admin_ids bypass = is_admin or await has_privilege( task.redis_client, task.user_id, PRIVILEGES["BYPASS_RATELIMIT"], task_config, ) rate_limit_applies = not bypass if rate_limit_applies: from tools.manage_api_keys import ( check_default_key_limit, default_key_limit_error, ) allowed, current, limit = await check_default_key_limit( task.user_id, "brave_web_search", task.redis_client, daily_limit=50, ) if not allowed: return json.dumps( { "error": default_key_limit_error( "brave_web_search", current, limit ) } ) count = task.count if count is None: count = 10 try: count = int(count) except (TypeError, ValueError): count = 10 count = max(1, min(20, count)) params = { "q": task.query.strip(), "count": count, "safesearch": task.safesearch or "moderate", } if task.country: params["country"] = task.country.upper() if task.search_lang: params["search_lang"] = task.search_lang.lower() if task.ui_lang: params["ui_lang"] = task.ui_lang.lower() last_error = None for attempt in range(MAX_RETRIES): headers = { "Accept": "application/json", "Accept-Encoding": "gzip", "X-Subscription-Token": api_key, } try: async with aiohttp.ClientSession() as session: logger.info( f"Brave search: '{task.query}' (attempt {attempt + 1}/{MAX_RETRIES}, " f"key {_key_manager.current_index + 1}/{_key_manager.get_key_count()})" ) async with session.get( BRAVE_SEARCH_URL, headers=headers, params=params, timeout=aiohttp.ClientTimeout(total=30), ) as response: if response.status == 200: data = await response.json() formatted_results = { "query": task.query, "total_results": len( data.get("web", {}).get("results", []) ), "results": [], } web_results = data.get("web", {}).get("results", []) for result in web_results[:count]: item = { "title": result.get("title", ""), "description": result.get("description", ""), "url": result.get("url", ""), "display_url": result.get("display_url", ""), "age": result.get("age", ""), "page_age": result.get("page_age", ""), "language": result.get("language", ""), "family_friendly": result.get( "family_friendly", True ), } if "profile" in result: item["profile"] = { "name": result["profile"].get("name", ""), "url": result["profile"].get("url", ""), "long_name": result["profile"].get( "long_name", "" ), "img": result["profile"].get("img", ""), } formatted_results["results"].append(item) logger.info( f"Brave search OK: {len(formatted_results['results'])} results" ) # Increment default-key usage counter on success if ( rate_limit_applies and task.user_id and task.redis_client ): from tools.manage_api_keys import ( increment_default_key_usage, ) await increment_default_key_usage( task.user_id, "brave_web_search", task.redis_client ) return json.dumps( formatted_results, indent=2, ensure_ascii=False ) elif response.status == 429: last_error = "Rate limit exceeded" if attempt < MAX_RETRIES - 1: if not using_user_key: total_keys = _key_manager.get_key_count() new_key = _key_manager.rotate_key() if new_key and total_keys > 1: api_key = new_key backoff_time = INITIAL_BACKOFF_SECONDS * (2**attempt) await asyncio.sleep(backoff_time) continue from observability import publish_http_error_event asyncio.create_task( publish_http_error_event( http_service="brave_search", http_status=429, endpoint="api.search.brave.com/res/v1/web/search", detail="Rate limit exceeded - all retries exhausted", ) ) return json.dumps( {"error": "Rate limit exceeded - all retries exhausted"} ) elif response.status == 402: from observability import publish_http_error_event asyncio.create_task( publish_http_error_event( http_service="brave_search", http_status=402, endpoint="api.search.brave.com/res/v1/web/search", detail="Payment required / free tier exhausted", ) ) return json.dumps( { "error": "payment_required", "message": ( "The Brave Search API key has exceeded its free tier quota. " "The user needs to provide their own Brave Search API key. " "They can obtain one for free at https://brave.com/search/api/ " "(the free plan allows 2,000 queries/month). " "To add their key, they should send you a DM with something like: " "'Save my Brave search API key: BSA_xxxxxxxxx' " "and you will store it securely for their future searches." ), } ) elif response.status == 401: from observability import publish_http_error_event asyncio.create_task( publish_http_error_event( http_service="brave_search", http_status=401, endpoint="api.search.brave.com/res/v1/web/search", detail="Invalid API key or unauthorized", ) ) return json.dumps( {"error": "Invalid API key or unauthorized access"} ) elif response.status == 403: from observability import publish_http_error_event asyncio.create_task( publish_http_error_event( http_service="brave_search", http_status=403, endpoint="api.search.brave.com/res/v1/web/search", detail="Forbidden - check API key permissions", ) ) return json.dumps( {"error": "Forbidden - check API key permissions"} ) elif response.status == 400: from observability import publish_http_error_event asyncio.create_task( publish_http_error_event( http_service="brave_search", http_status=400, endpoint="api.search.brave.com/res/v1/web/search", detail="Bad request - check query parameters", ) ) return json.dumps( {"error": "Bad request - check query parameters"} ) else: from observability import publish_http_error_event asyncio.create_task( publish_http_error_event( http_service="brave_search", http_status=response.status, endpoint="api.search.brave.com/res/v1/web/search", detail=f"API error: HTTP {response.status}", ) ) return json.dumps( {"error": f"API error: HTTP {response.status}"} ) except aiohttp.ClientError as e: last_error = str(e) logger.error(f"Network error on attempt {attempt + 1}: {e}") if attempt < MAX_RETRIES - 1: backoff_time = INITIAL_BACKOFF_SECONDS * (2**attempt) await asyncio.sleep(backoff_time) if not using_user_key: api_key = _key_manager.rotate_key() or api_key continue from observability import publish_http_error_event asyncio.create_task( publish_http_error_event( http_service="brave_search", http_status=0, endpoint="api.search.brave.com/res/v1/web/search", detail=f"Search failed after {MAX_RETRIES} attempts: {last_error}"[ :500 ], error_kind="network", ) ) return json.dumps( {"error": f"Search failed after {MAX_RETRIES} attempts: {last_error}"} )
[docs] async def search( self, query: str, count: Optional[int] = 10, country: Optional[str] = None, search_lang: Optional[str] = None, ui_lang: Optional[str] = None, safesearch: Optional[str] = "moderate", user_api_key: Optional[str] = None, user_id: Optional[str] = None, redis_client: Optional[object] = None, config: Optional[object] = None, ) -> str: """Submit a search to the rate-limited worker and await its result. The public entry point on the limiter. It rejects an empty query up front, ensures the worker is running via :meth:`_ensure_initialized`, wraps the arguments in a :class:`BraveSearchTask` carrying a fresh future, enqueues it, and awaits the future the worker resolves. Any exception surfaced through the future is logged and converted into a JSON error payload so callers always get a string. Called by :func:`run` (the tool entry point) and :func:`search_with_key` (the internal programmatic helper). Args: query (str): Search query or input string. count (Optional[int]): Number of results to return. country (Optional[str]): The country value. search_lang (Optional[str]): The search lang value. ui_lang (Optional[str]): The ui lang value. safesearch (Optional[str]): The safesearch value. user_api_key (Optional[str]): The user api key value. user_id (Optional[str]): Unique identifier for the user. redis_client (Optional[object]): Redis connection client. config (Optional[object]): Bot config for admin/privilege checks. Returns: str: Result string. """ if not query or not query.strip(): return json.dumps({"error": "Search query cannot be empty"}) await self._ensure_initialized() future = asyncio.Future() task = BraveSearchTask( query=query, count=count, country=country, search_lang=search_lang, ui_lang=ui_lang, safesearch=safesearch, future=future, user_api_key=user_api_key, user_id=user_id, redis_client=redis_client, config=config, ) await self.task_queue.put(task) try: return await future except Exception as e: logger.error(f"Error in Brave search: {e}") return json.dumps({"error": f"Search failed: {str(e)}"})
_rate_limiter = BraveSearchRateLimiter(calls_per_interval=0.5, interval_seconds=1.0) # --------------------------------------------------------------------------- # Public API for programmatic (non-tool) callers # ---------------------------------------------------------------------------
[docs] async def search_with_key( query: str, count: int = 3, api_key: Optional[str] = None, ) -> str: """Execute a Brave search through the shared rate limiter. This is the entry point for internal callers (e.g. :class:`~web_search_context.WebSearchContextManager`) that already have a resolved API key and don't need the tool-context ceremony. """ global _keys_loaded if not _keys_loaded: try: from config import Config _key_manager.load_keys(Config.load()) _keys_loaded = True except Exception: pass return await _rate_limiter.search( query=query, count=count, user_api_key=api_key, )
# --------------------------------------------------------------------------- # v3 tool interface # --------------------------------------------------------------------------- TOOL_NAME = "brave_web_search" # Exempt from consecutive-round repetitive tool-call detection (openrouter_client). TOOL_ALLOW_REPEAT = True TOOL_DESCRIPTION = ( "Search the web using the Brave Search API. Returns titles, " "descriptions, URLs, and metadata for matching results. Supports search " "operators in the query (e.g. site:, filetype:, intitle:, lang:) for " "precise filtering." ) TOOL_PARAMETERS = { "type": "object", "properties": { "query": { "type": "string", "description": ( "Search query. You can use Brave search operators for precise results. " "Examples: site:example.com (limit to a domain); filetype:pdf or ext:pdf " '(file type); intitle:keyword (term in title); inbody:"exact phrase" ' '(term in body); lang:es or loc:ca (language/region); "exact phrase"; ' "-term (exclude); +term (require); AND, OR, NOT for logic. " 'E.g. "climate change filetype:pdf site:edu" or "python asyncio site:docs.python.org".' ), }, "count": { "type": "integer", "description": "Number of results to return (1-20, default 10).", }, "country": { "type": "string", "description": "Country code for localised results (e.g. US, GB, CA).", }, "search_lang": { "type": "string", "description": "Language code for search results (e.g. en, es, fr).", }, "ui_lang": { "type": "string", "description": "Language code for UI elements.", }, "safesearch": { "type": "string", "enum": ["off", "moderate", "strict"], "description": "Safe-search level (default: off).", }, }, "required": ["query"], }
[docs] async def run( query: str, count: int = 10, country: str = None, search_lang: str = None, ui_lang: str = None, safesearch: str = "off", ctx=None, ) -> str: """Entry point for the ``brave_web_search`` tool: authorize, then search. Dispatched by ``tool_loader`` under the single-tool ``TOOL_NAME``/``run`` convention when the LLM invokes ``brave_web_search``. It first enforces the WEB_SEARCH privilege via :func:`_check_web_search_access`, lazily loads the shared key pool from ``ctx.config`` (guarded by the module ``_keys_loaded`` flag), and looks up the caller's own stored Brave key from Redis through :func:`tools.manage_api_keys.get_user_api_key` so personal keys bypass the pool quota. It then forwards everything to :meth:`BraveSearchRateLimiter.search` on the shared ``_rate_limiter`` and returns its JSON result string. Args: query (str): Search query, optionally using Brave search operators. count (int): Number of results to return (clamped to 1-20). country (str): Country code for localised results (e.g. US, GB). search_lang (str): Language code for search results (e.g. en, es). ui_lang (str): Language code for UI elements. safesearch (str): Safe-search level (off, moderate, or strict). ctx: Tool execution context exposing ``user_id``, ``redis``, ``channel_id``, and ``config``. Returns: str: A JSON string of search results, or a JSON error payload. """ auth_err = await _check_web_search_access(ctx) if auth_err: return auth_err global _keys_loaded if not _keys_loaded and ctx and ctx.config: _key_manager.load_keys(ctx.config) _keys_loaded = True user_api_key = None if ctx and ctx.redis and ctx.user_id: try: from tools.manage_api_keys import get_user_api_key user_api_key = await get_user_api_key( ctx.user_id, "brave", redis_client=ctx.redis, channel_id=ctx.channel_id, config=getattr(ctx, "config", None), ) except Exception: pass return await _rate_limiter.search( query=query, count=count, country=country, search_lang=search_lang, ui_lang=ui_lang, safesearch=safesearch, user_api_key=user_api_key, user_id=getattr(ctx, "user_id", None) if ctx else None, redis_client=getattr(ctx, "redis", None) if ctx else None, config=getattr(ctx, "config", None) if ctx else None, )