"""Analyse a video via the Gemini API — YouTube (native ingestion), Rumble,
Twitch, direct MP4, and 1000+ other sites via yt-dlp."""
import asyncio
import jsonutil as json
import logging
import mimetypes
import os
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Optional
from urllib.parse import unquote, urlparse
import aiofiles
import httpx
from google import genai
from google.genai import types
from gemini_embed_pool import mark_key_daily_spent, next_gemini_flash_key
from tools._safe_http import safe_http_stream, safe_httpx_client
from url_utils import YTDLP_METADATA_NETWORK_ARGS, parse_ytdlp_dump_json_stdout
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
MAX_DURATION_SECONDS = 3600
DEFAULT_MODEL = "gemini-3.1-flash-lite"
FALLBACK_MODELS = ["gemini-3-flash-preview", "gemini-3.1-pro-preview"]
MAX_DOWNLOAD_SIZE = 200 * 1024 * 1024 # 200 MB
DOWNLOAD_TIMEOUT = 300
YTDLP_COOKIES = "/root/cookies.txt"
YTDLP_FORMAT_SELECTOR = "bestvideo[height<=720]+bestaudio/best[height<=720]/best"
GEMINI_UPLOAD_POLL_INTERVAL = 10 # seconds
GEMINI_UPLOAD_TIMEOUT = 300 # 5 minutes
VIDEO_EXTENSIONS = frozenset(
{
".mp4",
".webm",
".mkv",
".mov",
".avi",
".flv",
".wmv",
".m4v",
".ts",
".mpeg",
".mpg",
".3gp",
}
)
SYSTEM_INSTRUCTION = """You are an expert video analyst, content researcher, and transcriptionist. You provide both an executive summary for quick understanding AND exhaustive, highly detailed descriptions that capture everything happening in a video, along with deeper insights that most viewers would miss.
You watch videos with the eye of a film critic, the curiosity of a researcher, and the attention to detail of an investigative journalist. You notice subtle details: background elements, editing choices, body language, tone shifts, implied meanings, cultural references, and connections to broader topics.
Your output should serve two purposes: (1) let someone quickly grasp the essence of a video, and (2) provide enough depth that they could understand not just WHAT happened, but HOW it was presented, WHY certain choices were made, and WHAT deeper meanings or implications exist.
When dialogue or narration is present, you transcribe it VERBATIM whenever possible. You capture not just the gist, but the exact words spoken. Your transcripts preserve filler words, false starts, and natural speech patterns to give an authentic record of what was said.
LENGTH IS NOT A CONSTRAINT. You have a 65,000 token output budget. Use as much space as needed to be thorough. Never truncate, abbreviate, or skip details to save space. More detail is always better. Err on the side of being too comprehensive rather than too brief.
You dig deep. You connect dots. You provide value beyond what's obvious."""
ANALYSIS_PROMPT = """Provide an exhaustive, deeply detailed analysis of this video.
## EXECUTIVE SUMMARY
Start with a summary (5-8 sentences) that captures:
- What this video is about and its main topic/purpose
- The key takeaway or central message
- Who made it and who it's for
- Why it matters or what makes it notable
---
## FULL VIDEO DESCRIPTION
Walk through the ENTIRE video chronologically with granular detail:
- Describe every segment, scene, and transition
- Note exactly what is shown visually at each moment (settings, objects, people, text overlays, graphics)
- Capture what is said, including notable phrasing, tone, and delivery
- Include timestamps (MM:SS) throughout to anchor your description
- Don't skip anything—even "minor" moments often contain important context
## VISUAL & PRODUCTION ANALYSIS
- Camera work: shots, angles, movements, framing choices
- Editing style: pacing, cuts, transitions, rhythm
- Graphics, animations, text overlays, and their timing
- Color grading, lighting, visual mood
- B-roll footage and how it's used
- Thumbnail and title analysis (if visible/relevant)
## TRANSCRIPT / DIALOGUE
Provide a full verbatim transcript of all spoken content in the video, organized chronologically with timestamps:
- Transcribe ALL dialogue, narration, and spoken content word-for-word
- Include speaker identification where multiple speakers are present
- Note filler words, false starts, laughter, and other vocal elements in [brackets]
- Use timestamps (MM:SS) to anchor each segment of speech
- If the video is very long, prioritize completeness over brevity — capture everything said
- For non-English content, provide the original language plus an English translation
## AUDIO & PRODUCTION ANALYSIS
- Speaking style, tone, energy, and how it shifts throughout
- Background music/sound design and its emotional effect
- Pauses, emphasis, and rhetorical techniques
- Sound mixing choices and audio quality
## DEEPER INSIGHTS & NON-OBVIOUS OBSERVATIONS
This is crucial—provide analysis that goes beyond what's surface-level:
- What is the creator's underlying message or agenda (stated or unstated)?
- What persuasion techniques or narrative structures are being used?
- What assumptions does the video make about its audience?
- What context (cultural, historical, industry-specific) helps understand this content?
- What biases or perspectives are present?
- What questions does this video raise but not answer?
- How does this connect to broader trends, debates, or topics?
- What might a casual viewer miss that's actually significant?
## CONTENT STRUCTURE & STRATEGY
- How is the video structured? What's the narrative arc?
- How does it hook viewers and maintain attention?
- What calls-to-action exist (explicit or implicit)?
- How does it compare to typical content in this genre/niche?
## CREATOR & CONTEXT
- Who made this and what's their background/credibility?
- What's the apparent purpose (educate, entertain, persuade, sell)?
- Who is the target audience and how can you tell?
Be extremely thorough. Length is not a concern—you have a large output budget, so use it. Your analysis should be comprehensive enough that someone could understand this video in rich detail without ever watching it. Include specific examples, timestamps, and direct observations rather than vague generalizations. Do not truncate or abbreviate any section."""
# ---------------------------------------------------------------------------
# URL helpers
# ---------------------------------------------------------------------------
def _is_youtube_url(url: str) -> bool:
"""Return True if the URL is a recognised YouTube watch, short-link, or Shorts URL.
Performs a case-insensitive substring check for the standard YouTube
surfaces (``youtube.com/watch``, ``youtu.be/``, ``youtube.com/shorts``) so
callers can route YouTube links down the native Gemini ingestion path
instead of the download/upload fallback.
This helper only inspects the string and has no side effects. It is called
by :func:`_classify_url` as the first branch of URL classification, which in
turn runs at the top of :func:`run`.
Args:
url: The candidate video URL (may be empty or whitespace-padded).
Returns:
bool: ``True`` if ``url`` matches a known YouTube pattern, else ``False``
(including for empty/``None``-ish input).
"""
if not url:
return False
url_lower = url.lower().strip()
return (
"youtube.com/watch" in url_lower
or "youtu.be/" in url_lower
or "youtube.com/shorts" in url_lower
)
def _extract_video_id(url: str) -> Optional[str]:
"""Extract the 11-character YouTube video ID from a YouTube URL.
Tries a sequence of regular expressions covering watch links, ``youtu.be``
short links, Shorts, ``/embed/``, and ``/v/`` paths, returning the first
match. The extracted ID is used for logging labels and to populate the
``video_id`` field of the tool result.
This helper only parses the string and has no side effects. It is called by
:func:`run`: once to build the ``yt:<id>`` ``video_label`` for log messages
when the URL classifies as YouTube, and again when assembling the success
result for the native-ingestion path. (A separate, unrelated function of the
same name exists in ``tools/file_download.py`` and is not this one.)
Args:
url: The YouTube URL to parse.
Returns:
Optional[str]: The 11-character video ID, or ``None`` if no supported
pattern matched.
"""
patterns = [
r"(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/shorts/)([a-zA-Z0-9_-]{11})",
r"(?:youtube\.com/embed/)([a-zA-Z0-9_-]{11})",
r"(?:youtube\.com/v/)([a-zA-Z0-9_-]{11})",
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def _is_direct_video_url(url: str) -> bool:
"""Return True if the URL path ends with a known video extension.
Parses out the URL path and checks its file extension against
``VIDEO_EXTENSIONS`` so callers can decide whether a link is a raw media
file to download over plain HTTP rather than something that needs yt-dlp.
This helper only inspects the string and has no side effects. It is called
by :func:`_classify_url` (to assign the ``direct`` class) and by
:func:`run` in the non-YouTube branch, where a failed yt-dlp attempt falls
back to :func:`_download_direct` only when the URL still looks direct.
Args:
url: The candidate video URL.
Returns:
bool: ``True`` when the path extension is in ``VIDEO_EXTENSIONS``,
otherwise ``False``.
"""
path = urlparse(url).path
ext = os.path.splitext(path)[1].lower()
return ext in VIDEO_EXTENSIONS
def _classify_url(url: str) -> str:
"""Classify a video URL into a processing strategy label.
Routes the URL to one of three ingestion paths by consulting
:func:`_is_youtube_url` first and :func:`_is_direct_video_url` second,
defaulting to the generic yt-dlp downloader. The returned label drives all
of the major branching in :func:`run`: ``youtube`` uses Gemini native
ingestion, ``direct`` downloads the file over HTTP, and ``ytdlp`` shells
out to yt-dlp.
This helper only inspects the string and has no side effects. It is called
once near the top of :func:`run`.
Args:
url: The candidate video URL.
Returns:
str: One of ``youtube``, ``direct``, or ``ytdlp``.
"""
if _is_youtube_url(url):
return "youtube"
if _is_direct_video_url(url):
return "direct"
return "ytdlp"
# ---------------------------------------------------------------------------
# Metadata & FPS (shared — yt-dlp supports most sites)
# ---------------------------------------------------------------------------
def _calculate_fps(duration_seconds: int) -> float:
"""Calculate appropriate FPS based on video duration.
Shorter videos get higher FPS for more detail.
Longer videos get lower FPS to manage token usage.
"""
if duration_seconds <= 60:
return 4
elif duration_seconds <= 300:
return 2
elif duration_seconds <= 900:
return 1.5
elif duration_seconds <= 1800:
return 1
else:
return 0.5
def _get_video_metadata(url: str) -> Optional[dict]:
"""Fetch title, channel, duration, and other metadata for a video via yt-dlp.
Shells out to ``yt-dlp --dump-json --skip-download`` (with the shared
cookie jar and the project's network-hardening args), then normalises the
raw info dict into a flat metadata mapping and derives a formatted upload
date. The duration field is what :func:`run` uses to enforce
``MAX_DURATION_SECONDS`` and to pick an FPS, while the title, channel, tags,
and description feed the analysis prompt. The whole thing is best-effort:
any failure returns ``None`` so the tool can still describe the video.
This is a blocking subprocess call, so :func:`run` invokes it via
``asyncio.to_thread``. It touches the filesystem only by reading the
``YTDLP_COOKIES`` file that yt-dlp loads, and makes outbound HTTP requests
to the video host through yt-dlp.
Args:
url: The video URL to inspect.
Returns:
Optional[dict]: A metadata mapping (title, channel, duration,
upload_date, view_count, like_count, description, tags, categories,
extractor, and a derived ``upload_date_formatted``), or ``None`` on a
yt-dlp non-zero exit, parse failure, timeout, or any other exception.
"""
try:
result = subprocess.run(
[
"yt-dlp",
"--cookies",
YTDLP_COOKIES,
"--dump-json",
"--skip-download",
"--no-warnings",
"--no-playlist",
*YTDLP_METADATA_NETWORK_ARGS,
url,
],
capture_output=True,
text=True,
timeout=48,
)
if result.returncode != 0:
logger.warning(
"yt-dlp metadata failed (rc=%d): %s",
result.returncode,
result.stderr[:200],
)
return None
info = parse_ytdlp_dump_json_stdout(result.stdout)
if not info:
logger.warning("yt-dlp metadata parse failed for %s", url)
return None
metadata = {
"title": info.get("title", "Unknown"),
"channel": info.get("channel", info.get("uploader", "Unknown")),
"channel_id": info.get("channel_id", ""),
"duration": int(info.get("duration", 0)),
"upload_date": info.get("upload_date", ""),
"view_count": info.get("view_count", 0),
"like_count": info.get("like_count", 0),
"description": info.get("description", ""),
"tags": info.get("tags", []),
"categories": info.get("categories", []),
"extractor": info.get("extractor", ""),
}
if metadata["upload_date"] and len(metadata["upload_date"]) == 8:
d = metadata["upload_date"]
metadata["upload_date_formatted"] = f"{d[:4]}-{d[4:6]}-{d[6:8]}"
else:
metadata["upload_date_formatted"] = metadata["upload_date"]
return metadata
except subprocess.TimeoutExpired as exc:
tail = ""
if exc.stderr:
tail = (
exc.stderr[-500:]
if isinstance(exc.stderr, str)
else exc.stderr.decode("utf-8", errors="replace")[-500:]
)
logger.warning(
"yt-dlp metadata timed out for %s (partial stderr: %r)",
url,
tail.replace("\n", " ")[:350] if tail else "<empty>",
)
return None
except Exception as e:
logger.warning("yt-dlp metadata fetch failed: %s", e)
return None
def _format_metadata_for_prompt(metadata: dict, source: str = "video platform") -> str:
"""Render a metadata mapping into a human-readable block for the Gemini prompt.
Builds a newline-joined summary (title, channel, duration, and any
optional upload date, view/like counts, tags, categories, and description)
so the model receives context about the video alongside the frames. Only
keys that are present and truthy are emitted, keeping the prompt compact.
This helper is pure string formatting with no side effects. It is called
by :func:`run` while assembling the prompt, immediately before the
``ANALYSIS_PROMPT`` template, whenever yt-dlp metadata was obtained.
Args:
metadata: A metadata mapping as produced by :func:`_get_video_metadata`
(``duration`` is required; the rest are optional).
source: Display label for the originating platform, woven into the
prompt header by the caller.
Returns:
str: The formatted, newline-joined metadata block.
"""
duration = metadata["duration"]
duration_str = f"{duration // 60}:{duration % 60:02d}"
lines = [
f"Title: {metadata['title']}",
f"Channel: {metadata['channel']}",
f"Duration: {duration_str}",
]
if metadata.get("upload_date_formatted"):
lines.append(f"Upload Date: {metadata['upload_date_formatted']}")
if metadata.get("view_count"):
lines.append(f"Views: {metadata['view_count']:,}")
if metadata.get("like_count"):
lines.append(f"Likes: {metadata['like_count']:,}")
if metadata.get("tags"):
lines.append(f"Tags: {', '.join(metadata['tags'][:15])}")
if metadata.get("categories"):
lines.append(f"Categories: {', '.join(metadata['categories'])}")
if metadata.get("description"):
lines.append(f"\nVideo Description:\n{metadata['description']}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Non-YouTube: download helpers
# ---------------------------------------------------------------------------
async def _download_with_ytdlp(
url: str,
temp_dir: str,
) -> tuple[Optional[str], Optional[str]]:
"""Download a video into a temp directory via yt-dlp.
Spawns ``yt-dlp`` as an async subprocess using the shared cookie jar and
the 720p-capped ``YTDLP_FORMAT_SELECTOR``, writing into ``temp_dir`` and
enforcing ``MAX_DOWNLOAD_SIZE`` both via ``--max-filesize`` and a
post-download size check. It first confirms yt-dlp is on ``PATH`` and reads
the resolved output path from ``--print after_move:filepath``, falling back
to scanning ``temp_dir`` for a media file. The whole call is bounded by
``DOWNLOAD_TIMEOUT``.
This is the primary downloader for the non-YouTube path. It writes media to
the filesystem and makes outbound HTTP requests through yt-dlp. It is called
by :func:`run`: directly for ``ytdlp``-classified URLs, and as a fallback
when :func:`_download_direct` fails for a ``direct`` URL.
Args:
url: The video URL to download.
temp_dir: An existing temporary directory to download into.
Returns:
tuple[Optional[str], Optional[str]]: ``(local_path, None)`` on success,
or ``(None, error_message)`` on a missing binary, non-zero exit,
oversized file, timeout, or any other failure.
"""
if not await asyncio.to_thread(shutil.which, "yt-dlp"):
return None, "yt-dlp is not installed."
template = os.path.join(temp_dir, "%(title).100s.%(ext)s")
cmd = [
"yt-dlp",
"--cookies",
YTDLP_COOKIES,
"-f",
YTDLP_FORMAT_SELECTOR,
"-o",
template,
"--no-playlist",
"--no-overwrites",
"--restrict-filenames",
"--max-filesize",
str(MAX_DOWNLOAD_SIZE),
"--print",
"after_move:filepath",
url,
]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=DOWNLOAD_TIMEOUT,
)
out = stdout.decode("utf-8", errors="replace").strip()
err = stderr.decode("utf-8", errors="replace").strip()
if proc.returncode != 0:
msg = err or out or f"yt-dlp exit {proc.returncode}"
return None, msg
if out:
last_line = out.strip().split("\n")[-1].strip()
p = Path(last_line)
if p.exists():
size = p.stat().st_size
if size > MAX_DOWNLOAD_SIZE:
p.unlink(missing_ok=True)
return None, (
f"Downloaded file too large "
f"({size / 1024 / 1024:.0f} MB > "
f"{MAX_DOWNLOAD_SIZE // 1024 // 1024} MB limit)."
)
return str(p), None
# Fallback: find any media file in temp_dir
media_exts = {".mp4", ".webm", ".mkv", ".m4a", ".mp3", ".flv", ".avi"}
for f in Path(temp_dir).iterdir():
if f.is_file() and f.suffix.lower() in media_exts:
return str(f), None
return None, "Download completed but output file not found."
except asyncio.TimeoutError:
return None, f"Download timed out after {DOWNLOAD_TIMEOUT}s."
except Exception as exc:
return None, f"yt-dlp download error: {exc}"
async def _download_direct(
url: str,
temp_dir: str,
) -> tuple[Optional[str], Optional[str]]:
"""Download a direct-link video file over HTTP into a temp directory.
Streams the response body to a sanitised filename inside ``temp_dir`` in
64 KiB chunks, enforcing ``MAX_DOWNLOAD_SIZE`` from both the
``Content-Length`` header and the running byte total. Crucially this routes
through the SSRF-guarded :func:`safe_httpx_client` and
:func:`safe_http_stream`, which validate every redirect hop, block private
IP ranges, and pin connects so a model-supplied URL cannot be used to reach
internal services.
It writes media to the filesystem and makes outbound HTTP requests. It is
called by :func:`run` for ``direct``-classified URLs, and as a fallback
when :func:`_download_with_ytdlp` fails on a URL that still looks direct.
Args:
url: The direct media URL to download.
temp_dir: An existing temporary directory to download into.
Returns:
tuple[Optional[str], Optional[str]]: ``(local_path, None)`` on success,
or ``(None, error_message)`` on a blocked/invalid URL, non-200 status,
oversized file, timeout, or any other failure.
"""
try:
parsed = urlparse(url)
filename = os.path.basename(unquote(parsed.path)) or "video.mp4"
filename = re.sub(r'[<>:"/\\|?*]', "_", filename)
local_path = os.path.join(temp_dir, filename)
timeout = httpx.Timeout(DOWNLOAD_TIMEOUT)
# SSRF guard: a model-supplied "direct video URL" must not be able to
# reach internal hosts. safe_http_stream validates each hop, blocks
# private ranges, and pins connects, so a redirect cannot smuggle the
# download to e.g. 10.10.0.x:6379.
async with safe_httpx_client(timeout=timeout) as client:
async with safe_http_stream(
client, "GET", url, max_redirects=5
) as resp:
if resp.status_code != 200:
return None, f"HTTP {resp.status_code}: {resp.reason_phrase}"
cl = resp.headers.get("Content-Length")
if cl and int(cl) > MAX_DOWNLOAD_SIZE:
return None, (
f"File too large "
f"({int(cl) / 1024 / 1024:.0f} MB > "
f"{MAX_DOWNLOAD_SIZE // 1024 // 1024} MB limit)."
)
total = 0
async with aiofiles.open(local_path, "wb") as f:
async for chunk in resp.aiter_bytes(65536):
total += len(chunk)
if total > MAX_DOWNLOAD_SIZE:
return None, (
f"File exceeded "
f"{MAX_DOWNLOAD_SIZE // 1024 // 1024} MB "
f"during download."
)
await f.write(chunk)
return local_path, None
except ValueError as exc:
return None, f"Blocked URL: {exc}"
except asyncio.TimeoutError:
return None, f"Download timed out after {DOWNLOAD_TIMEOUT}s."
except Exception as exc:
return None, f"HTTP download error: {exc}"
# ---------------------------------------------------------------------------
# Non-YouTube: Gemini File API upload
# ---------------------------------------------------------------------------
def _upload_to_gemini_sync(
client: genai.Client,
local_path: str,
) -> tuple[Optional[object], Optional[str]]:
"""Upload a local video to the Gemini File API and poll until it is ACTIVE.
Guesses the MIME type, uploads the file through the genai ``client``, then
polls ``files.get`` every ``GEMINI_UPLOAD_POLL_INTERVAL`` seconds until the
file reports ``ACTIVE`` or until ``GEMINI_UPLOAD_TIMEOUT`` elapses. On a
non-``ACTIVE``, non-``PROCESSING`` state or a timeout it deletes the
half-uploaded remote file before returning an error, so it never leaks
Gemini-hosted storage on the failure paths.
This makes blocking network calls to the Gemini File API and reads the
local file, so :func:`run` invokes it via ``asyncio.to_thread`` in the
non-YouTube branch (after the download step). The successful upload's
``uri`` is then handed to :func:`_generate_description`, and :func:`run`
deletes the file again in its ``finally`` block.
Args:
client: An initialised genai ``Client`` used for the upload and polls.
local_path: Filesystem path of the downloaded video.
Returns:
tuple[Optional[object], Optional[str]]: ``(gemini_file, None)`` once the
upload is ACTIVE, or ``(None, error_message)`` on upload failure,
a failed processing state, a status-check error, or a timeout.
"""
import time
mime_type = mimetypes.guess_type(local_path)[0] or "video/mp4"
try:
uploaded = client.files.upload(
file=local_path,
config={"mime_type": mime_type},
)
logger.info(
"describe_video: uploaded %s as %s (state=%s)",
local_path,
uploaded.name,
uploaded.state,
)
except Exception as exc:
return None, f"File upload failed: {exc}"
elapsed = 0
while True:
state_str = str(getattr(uploaded, "state", "")).upper()
if "ACTIVE" in state_str:
break
if "PROCESSING" not in state_str:
# Neither ACTIVE nor PROCESSING — something went wrong
try:
client.files.delete(name=uploaded.name)
except Exception:
pass
return None, f"File processing failed (state={uploaded.state})."
if elapsed >= GEMINI_UPLOAD_TIMEOUT:
try:
client.files.delete(name=uploaded.name)
except Exception:
pass
return None, (f"File processing timed out after {GEMINI_UPLOAD_TIMEOUT}s.")
time.sleep(GEMINI_UPLOAD_POLL_INTERVAL)
elapsed += GEMINI_UPLOAD_POLL_INTERVAL
try:
uploaded = client.files.get(name=uploaded.name)
except Exception as exc:
return None, f"Failed to check file status: {exc}"
logger.info(
"describe_video: file %s state=%s (elapsed=%ds)",
uploaded.name,
uploaded.state,
elapsed,
)
return uploaded, None
# ---------------------------------------------------------------------------
# Shared: Gemini generation with model fallback
# ---------------------------------------------------------------------------
def _is_daily_quota_error(error_str: str) -> bool:
"""Detect Gemini daily-quota exhaustion from a genai SDK exception message.
Distinguishes a per-day quota ceiling (which warrants rotating to a fresh
API key) from ordinary rate limiting by requiring both a ``429`` and the
phrase ``per day`` in the stringified error. This is a deliberately narrow
check so transient 429s do not burn through keys.
This helper only inspects the string and has no side effects. It is called
by :func:`_generate_description` inside its exception handler to decide
whether to mark the current key spent and rotate.
Args:
error_str: The stringified genai SDK exception.
Returns:
bool: ``True`` if the message indicates daily-quota exhaustion.
"""
low = error_str.lower()
return "429" in low and "per day" in low
async def _generate_description(
client: genai.Client,
video_part: types.Part,
prompt: str,
video_label: str,
) -> tuple[Optional[str], Optional[str], Optional[str]]:
"""Run the Gemini analysis call with model fallback and quota-aware key rotation.
Walks ``DEFAULT_MODEL`` followed by ``FALLBACK_MODELS``, issuing each
``generate_content`` request (high media resolution, unbounded thinking
budget, 65k output tokens) on a worker thread via ``asyncio.to_thread`` and
logging token usage. On a daily-quota 429 detected by
:func:`_is_daily_quota_error` it marks the spent key via
``mark_key_daily_spent``, swaps in a fresh client from
``next_gemini_flash_key`` (up to ``max_daily_retries`` times), and retries;
other retriable errors (503, generic 429, overloaded, rate limit, resource
exhausted) advance to the next model.
Beyond the Gemini HTTP calls this touches the shared key pool in
``gemini_embed_pool`` for rotation accounting. It is called by :func:`run`
in both branches: once with a native YouTube ``file_uri`` part, and once
with the part built from a Gemini File API upload.
Args:
client: The initialised genai ``Client`` for the first attempt
(replaced internally on key rotation).
video_part: The prepared ``types.Part`` referencing the video.
prompt: The fully assembled analysis prompt text.
video_label: Short label (e.g. ``yt:<id>``) used only in log lines.
Returns:
tuple[Optional[str], Optional[str], Optional[str]]: On success
``(result_text, model_used, None)``; on exhaustion of all models
``(None, None, error_message)``.
"""
models_to_try = [DEFAULT_MODEL] + [m for m in FALLBACK_MODELS if m != DEFAULT_MODEL]
last_error: Optional[Exception] = None
current_client = client
daily_retries = 0
max_daily_retries = 3
for current_model in models_to_try:
try:
logger.info(
"describe_video: trying model=%s for %s", current_model, video_label
)
response = await asyncio.to_thread(
current_client.models.generate_content,
model=current_model,
contents=types.Content(
parts=[video_part, types.Part(text=prompt)],
),
config=types.GenerateContentConfig(
system_instruction=SYSTEM_INSTRUCTION,
media_resolution=types.MediaResolution.MEDIA_RESOLUTION_HIGH,
thinking_config=types.ThinkingConfig(thinking_budget=-1),
max_output_tokens=65000,
),
)
if response.usage_metadata:
logger.info(
"describe_video: %s model=%s prompt_tokens=%s "
"response_tokens=%s total_tokens=%s",
video_label,
current_model,
response.usage_metadata.prompt_token_count,
response.usage_metadata.candidates_token_count,
response.usage_metadata.total_token_count,
)
return response.text, current_model, None
except Exception as e:
error_str = str(e)
last_error = e
if _is_daily_quota_error(error_str) and daily_retries < max_daily_retries:
try:
old_key = current_client._api_client.api_key
except AttributeError:
old_key = ""
if old_key:
await mark_key_daily_spent(old_key, "generate")
new_key = next_gemini_flash_key()
current_client = genai.Client(api_key=new_key)
daily_retries += 1
logger.warning(
"describe_video: daily quota hit, rotated to new key "
"(attempt %d/%d)",
daily_retries,
max_daily_retries,
)
continue
is_retriable = any(
kw in error_str.lower()
for kw in (
"503",
"429",
"overloaded",
"rate limit",
"resource exhausted",
)
)
if is_retriable and current_model != models_to_try[-1]:
logger.warning(
"describe_video: model=%s retriable error: %s — falling back",
current_model,
error_str[:200],
)
continue
else:
logger.error(
"describe_video: model=%s failed: %s",
current_model,
error_str[:500],
)
break
return None, None, f"Failed to describe video: {last_error}"
# ---------------------------------------------------------------------------
# Tool registration
# ---------------------------------------------------------------------------
TOOL_NAME = "describe_video"
TOOL_DESCRIPTION = (
"Provide an extremely detailed description and analysis of a video "
"using the Gemini API. Supports YouTube (native ingestion), Rumble, "
"Twitch VODs, direct MP4 files, and 1000+ other video sites via yt-dlp. "
"Processing time depends on video length (typically 30-120 seconds). "
"Works best with videos under 1 hour."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"video_url": {
"type": "string",
"description": (
"Full URL of the video to analyse "
"(YouTube, Rumble, Twitch, direct MP4, etc.)."
),
},
"focus_area": {
"type": "string",
"description": (
"Optional aspect to focus on (e.g. 'visual effects', "
"'speaker arguments', 'music analysis', 'tutorial steps')."
),
},
},
"required": ["video_url"],
}
[docs]
async def run(
video_url: str = "",
youtube_url: str = "",
focus_area: Optional[str] = None,
ctx=None,
) -> str:
"""Tool entry point: produce an exhaustive Gemini analysis of a video URL.
Orchestrates the whole pipeline: it classifies the URL via
:func:`_classify_url`, resolves a Gemini key (preferring the user's stored
key over the rotating default pool, with per-user default-key rate limiting
via ``manage_api_keys``), fetches metadata with :func:`_get_video_metadata`,
enforces ``MAX_DURATION_SECONDS``, picks an FPS with :func:`_calculate_fps`,
and builds the prompt from :func:`_format_metadata_for_prompt` plus
``ANALYSIS_PROMPT`` and any ``focus_area``. YouTube URLs take a fast native
Gemini ingestion path; everything else downloads
(:func:`_download_direct` or :func:`_download_with_ytdlp`), uploads via
:func:`_upload_to_gemini_sync`, and cleans up both the temp directory and
the Gemini-hosted file in a ``finally`` block. The actual model call goes
through :func:`_generate_description`.
Registered under ``TOOL_NAME = "describe_video"`` and discovered by
``tool_loader.load_tools``; the inference worker dispatches it through the
``ToolRegistry`` with a populated ``ctx``. It reads/writes per-user API-key
and usage state in Redis via ``ctx.redis``, touches the filesystem for
temporary downloads, and makes outbound HTTP/Gemini calls.
Args:
video_url: The video URL to analyse (primary argument).
youtube_url: Legacy alias accepted when ``video_url`` is absent.
focus_area: Optional aspect to give extra depth in the analysis.
ctx: Tool execution context supplying ``redis``, ``user_id``,
``channel_id``, and ``config`` for key resolution and limits.
Returns:
str: A JSON document. On success it carries ``success: true`` with the
``description``, ``model_used``, ``source``, and available metadata; on
failure it carries an ``error`` message (missing URL, unextractable ID,
over-length, rate-limit, download/upload, or generation error).
"""
url = video_url or youtube_url
if not url:
return json.dumps(
{
"error": "Missing required argument: video_url is required.",
}
)
url_type = _classify_url(url)
video_label = url
if url_type == "youtube":
video_id = _extract_video_id(url)
if not video_id:
return json.dumps(
{
"error": "Could not extract video ID from YouTube URL.",
}
)
video_label = f"yt:{video_id}"
logger.info("describe_video: url_type=%s label=%s", url_type, video_label)
# ------------------------------------------------------------------
# Resolve Gemini API key — prefer user key over default
# ------------------------------------------------------------------
user_gemini_key = None
if ctx and getattr(ctx, "redis", None) and getattr(ctx, "user_id", None):
try:
from tools.manage_api_keys import get_user_api_key
user_gemini_key = await get_user_api_key(
ctx.user_id,
"gemini",
redis_client=ctx.redis,
channel_id=getattr(ctx, "channel_id", None),
config=getattr(ctx, "config", None),
)
except Exception:
pass
if user_gemini_key:
client = genai.Client(api_key=user_gemini_key)
logger.info("describe_video: using user-provided Gemini API key")
_using_default_key = False
else:
from tools.manage_api_keys import (
check_default_key_limit,
default_key_limit_applies,
default_key_limit_error,
)
if await default_key_limit_applies(ctx):
allowed, current, limit = await check_default_key_limit(
ctx.user_id,
"describe_video",
ctx.redis,
daily_limit=20,
)
if not allowed:
return json.dumps(
{
"error": default_key_limit_error(
"describe_video",
current,
limit,
),
}
)
client = genai.Client(api_key=next_gemini_flash_key())
_using_default_key = True
# ------------------------------------------------------------------
# Fetch metadata via yt-dlp (best-effort — tool works without it)
# ------------------------------------------------------------------
metadata = await asyncio.to_thread(_get_video_metadata, url)
if metadata and metadata.get("duration"):
duration = metadata["duration"]
if duration > MAX_DURATION_SECONDS:
return json.dumps(
{
"error": (
f"Video is too long ({duration // 60} minutes). "
f"Maximum allowed duration is "
f"{MAX_DURATION_SECONDS // 60} minutes."
),
}
)
fps = _calculate_fps(duration)
logger.info(
"describe_video: %s title=%r duration=%ds fps=%.1f",
video_label,
metadata.get("title", "?"),
duration,
fps,
)
else:
fps = 1.0
logger.info("describe_video: no metadata, default fps=%.1f", fps)
# ------------------------------------------------------------------
# Build prompt with metadata + analysis template
# ------------------------------------------------------------------
source_name = (
"YouTube"
if url_type == "youtube"
else (
(metadata.get("extractor") or url_type).replace("_", " ").title()
if metadata
else url_type.title()
)
)
prompt_parts: list[str] = []
if metadata:
metadata_str = _format_metadata_for_prompt(metadata, source=source_name)
prompt_parts.append(
f"## VIDEO METADATA (from {source_name})\n\n" f"{metadata_str}\n\n---\n"
)
prompt_parts.append(ANALYSIS_PROMPT)
if focus_area:
prompt_parts.append(
f"""
## SPECIFIC FOCUS AREA
The user has requested special attention to the following:
\"\"\"{focus_area}\"\"\"
While still providing comprehensive analysis, give EXTRA DEPTH AND DETAIL to this specific area. If timestamps are mentioned, pay particular attention to those sections. If topics are mentioned, explore them more thoroughly than other aspects."""
)
prompt = "\n".join(prompt_parts)
# ------------------------------------------------------------------
# YouTube: native Gemini ingestion (fast path — no download)
# ------------------------------------------------------------------
if url_type == "youtube":
video_part = types.Part(
file_data=types.FileData(file_uri=url),
video_metadata=types.VideoMetadata(fps=fps),
)
result_text, model_used, err = await _generate_description(
client,
video_part,
prompt,
video_label,
)
if err:
return json.dumps({"error": err})
logger.info("describe_video: %s done (%d chars)", video_label, len(result_text))
result = {
"success": True,
"video_id": _extract_video_id(url),
"video_url": url,
"source": "youtube",
"model_used": model_used,
"description": result_text,
}
if metadata:
result["title"] = metadata.get("title")
result["channel"] = metadata.get("channel")
result["duration_seconds"] = metadata.get("duration")
if _using_default_key:
from tools.manage_api_keys import (
default_key_limit_applies,
increment_default_key_usage,
)
if await default_key_limit_applies(ctx):
await increment_default_key_usage(
ctx.user_id,
"describe_video",
ctx.redis,
)
return json.dumps(result)
# ------------------------------------------------------------------
# Non-YouTube: download → upload → describe → cleanup
# ------------------------------------------------------------------
temp_dir = tempfile.mkdtemp(prefix="describe_video_")
gemini_file = None
try:
# --- Download ------------------------------------------------
if url_type == "direct":
local_path, dl_err = await _download_direct(url, temp_dir)
if dl_err:
# Direct download failed — try yt-dlp as fallback
local_path, dl_err = await _download_with_ytdlp(url, temp_dir)
else:
local_path, dl_err = await _download_with_ytdlp(url, temp_dir)
if dl_err and _is_direct_video_url(url):
local_path, dl_err = await _download_direct(url, temp_dir)
if dl_err or not local_path:
return json.dumps(
{
"error": f"Failed to download video: {dl_err}",
}
)
file_size = os.path.getsize(local_path)
logger.info(
"describe_video: downloaded %s (%.1f MB)",
local_path,
file_size / 1024 / 1024,
)
# --- Upload to Gemini File API -------------------------------
gemini_file, up_err = await asyncio.to_thread(
_upload_to_gemini_sync,
client,
local_path,
)
if up_err or not gemini_file:
return json.dumps(
{
"error": f"Failed to upload video to Gemini: {up_err}",
}
)
# --- Generate description ------------------------------------
video_part = types.Part(
file_data=types.FileData(file_uri=gemini_file.uri),
video_metadata=types.VideoMetadata(fps=fps),
)
result_text, model_used, gen_err = await _generate_description(
client,
video_part,
prompt,
video_label,
)
if gen_err:
return json.dumps({"error": gen_err})
logger.info("describe_video: %s done (%d chars)", video_label, len(result_text))
result = {
"success": True,
"video_url": url,
"source": url_type,
"model_used": model_used,
"description": result_text,
}
if metadata:
result["title"] = metadata.get("title")
result["channel"] = metadata.get("channel")
result["duration_seconds"] = metadata.get("duration")
if metadata.get("extractor"):
result["platform"] = metadata["extractor"]
if _using_default_key:
from tools.manage_api_keys import (
default_key_limit_applies,
increment_default_key_usage,
)
if await default_key_limit_applies(ctx):
await increment_default_key_usage(
ctx.user_id,
"describe_video",
ctx.redis,
)
return json.dumps(result)
finally:
# Cleanup local temp files
shutil.rmtree(temp_dir, ignore_errors=True)
# Cleanup Gemini-hosted file
if gemini_file:
try:
await asyncio.to_thread(
client.files.delete,
name=gemini_file.name,
)
logger.info("describe_video: deleted Gemini file %s", gemini_file.name)
except Exception:
logger.warning(
"describe_video: failed to delete Gemini file %s",
getattr(gemini_file, "name", "?"),
)