"""Tenor GIF Search tool.
Searches for GIFs and stickers using the Tenor API v2.
"""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
from typing import TYPE_CHECKING
import aiohttp
from tools.alter_privileges import has_privilege, PRIVILEGES
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
TENOR_API_BASE_URL = "https://tenor.googleapis.com/v2"
TENOR_API_KEY = "AIzaSyAAgd2qiLChWy8gaT6N4VGPleXKaStaS6c"
TOOL_NAME = "search_tenor_gifs"
TOOL_DESCRIPTION = (
"Search for GIFs or stickers on Tenor. Returns share URLs. "
"Supports content filtering, sticker search, and random ordering."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query string."},
"limit": {
"type": "integer",
"description": "Number of results (1-50).",
"default": 10,
},
"content_filter": {
"type": "string",
"description": "Safety filter: off, low, medium, high.",
"default": "medium",
},
"media_filter": {
"type": "string",
"description": "Comma-separated GIF formats: gif,mp4,tinygif,tinymp4.",
},
"search_filter": {
"type": "string",
"description": "Use 'sticker' for stickers instead of GIFs.",
},
"random": {
"type": "boolean",
"description": "Random order instead of relevance.",
"default": False,
},
},
"required": ["query"],
}
# ---------------------------------------------------------------------------
# Authorization
# ---------------------------------------------------------------------------
async def _check_web_search_access(ctx) -> str | None:
"""Gate Tenor access behind the WEB_SEARCH privilege.
Pulls ``user_id``, ``redis``, and ``config`` off the ToolContext and asks
:func:`tools.alter_privileges.has_privilege` whether the caller holds the
``WEB_SEARCH`` privilege; that check reads the user's privilege set from
Redis. This is the shared authorization gate used across the search tools
so GIF search obeys the same access policy as web search.
Called by :func:`run` at the top of every Tenor lookup; a sibling
same-named helper exists in the other search tool modules
(``brave_search``, ``prowlarr_search``, etc.).
Args:
ctx: Tool execution context exposing ``user_id``, ``redis``, and
``config``.
Returns:
str | None: A JSON error string when the privilege is missing, or
``None`` when the caller is authorized to proceed.
"""
user_id = getattr(ctx, "user_id", "") or ""
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
if not await has_privilege(redis, user_id, PRIVILEGES["WEB_SEARCH"], config):
return json.dumps(
{
"success": False,
"error": "The user does not have the WEB_SEARCH privilege. Ask an admin to grant it.",
}
)
return None
[docs]
async def run(
query: str,
limit: int = 10,
content_filter: str = "medium",
media_filter: str = None,
search_filter: str = None,
random: bool = False,
ctx: ToolContext | None = None,
) -> str:
"""Execute this tool and return the result.
Args:
query (str): Search query or input string.
limit (int): Maximum number of items.
content_filter (str): The content filter value.
media_filter (str): The media filter value.
search_filter (str): The search filter value.
random (bool): The random value.
ctx (ToolContext | None): Tool execution context providing access to bot internals.
Returns:
str: Result string.
"""
auth_err = await _check_web_search_access(ctx)
if auth_err:
return auth_err
if not query or not query.strip():
return json.dumps({"error": "Search query cannot be empty"})
limit = max(1, min(50, limit or 10))
params = {
"key": TENOR_API_KEY,
"q": query.strip(),
"limit": limit,
"contentfilter": content_filter or "medium",
"country": "US",
"locale": "en_US",
"client_key": "stargazer-bot",
}
if media_filter:
params["media_filter"] = media_filter
if search_filter:
params["searchfilter"] = search_filter
if random:
params["random"] = "true"
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{TENOR_API_BASE_URL}/search",
params=params,
timeout=aiohttp.ClientTimeout(total=30),
) as response:
if response.status == 200:
data = await response.json()
urls = [
r.get("url", "")
for r in data.get("results", [])
if r.get("url")
]
return json.dumps(
{"query": query, "count": len(urls), "urls": urls},
indent=2,
ensure_ascii=False,
)
elif response.status == 429:
return json.dumps({"error": "Rate limit exceeded"})
else:
error_text = await response.text()
return json.dumps(
{"error": f"API error: HTTP {response.status}. {error_text}"}
)
except asyncio.TimeoutError:
return json.dumps({"error": "Request timed out"})
except Exception as e:
return json.dumps({"error": f"Search failed: {str(e)}"})