Source code for tools.youtube_describe

"""Analyse a video via the Gemini API — YouTube (native ingestion), Rumble,
Twitch, direct MP4, and 1000+ other sites via yt-dlp."""

import asyncio
import jsonutil as json
import logging
import mimetypes
import os
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Optional
from urllib.parse import unquote, urlparse

import aiofiles
import httpx

from google import genai
from google.genai import types

from gemini_embed_pool import mark_key_daily_spent, next_gemini_flash_key
from tools._safe_http import safe_http_stream, safe_httpx_client
from url_utils import YTDLP_METADATA_NETWORK_ARGS, parse_ytdlp_dump_json_stdout

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

MAX_DURATION_SECONDS = 3600

DEFAULT_MODEL = "gemini-3.1-flash-lite"
FALLBACK_MODELS = ["gemini-3-flash-preview", "gemini-3.1-pro-preview"]

MAX_DOWNLOAD_SIZE = 200 * 1024 * 1024  # 200 MB
DOWNLOAD_TIMEOUT = 300
YTDLP_COOKIES = "/root/cookies.txt"
YTDLP_FORMAT_SELECTOR = "bestvideo[height<=720]+bestaudio/best[height<=720]/best"
GEMINI_UPLOAD_POLL_INTERVAL = 10  # seconds
GEMINI_UPLOAD_TIMEOUT = 300  # 5 minutes

VIDEO_EXTENSIONS = frozenset(
    {
        ".mp4",
        ".webm",
        ".mkv",
        ".mov",
        ".avi",
        ".flv",
        ".wmv",
        ".m4v",
        ".ts",
        ".mpeg",
        ".mpg",
        ".3gp",
    }
)

SYSTEM_INSTRUCTION = """You are an expert video analyst, content researcher, and transcriptionist. You provide both an executive summary for quick understanding AND exhaustive, highly detailed descriptions that capture everything happening in a video, along with deeper insights that most viewers would miss.

You watch videos with the eye of a film critic, the curiosity of a researcher, and the attention to detail of an investigative journalist. You notice subtle details: background elements, editing choices, body language, tone shifts, implied meanings, cultural references, and connections to broader topics.

Your output should serve two purposes: (1) let someone quickly grasp the essence of a video, and (2) provide enough depth that they could understand not just WHAT happened, but HOW it was presented, WHY certain choices were made, and WHAT deeper meanings or implications exist.

When dialogue or narration is present, you transcribe it VERBATIM whenever possible. You capture not just the gist, but the exact words spoken. Your transcripts preserve filler words, false starts, and natural speech patterns to give an authentic record of what was said.

LENGTH IS NOT A CONSTRAINT. You have a 65,000 token output budget. Use as much space as needed to be thorough. Never truncate, abbreviate, or skip details to save space. More detail is always better. Err on the side of being too comprehensive rather than too brief.

You dig deep. You connect dots. You provide value beyond what's obvious."""

ANALYSIS_PROMPT = """Provide an exhaustive, deeply detailed analysis of this video.

## EXECUTIVE SUMMARY

Start with a summary (5-8 sentences) that captures:
- What this video is about and its main topic/purpose
- The key takeaway or central message
- Who made it and who it's for
- Why it matters or what makes it notable

---

## FULL VIDEO DESCRIPTION

Walk through the ENTIRE video chronologically with granular detail:
- Describe every segment, scene, and transition
- Note exactly what is shown visually at each moment (settings, objects, people, text overlays, graphics)
- Capture what is said, including notable phrasing, tone, and delivery
- Include timestamps (MM:SS) throughout to anchor your description
- Don't skip anything—even "minor" moments often contain important context

## VISUAL & PRODUCTION ANALYSIS

- Camera work: shots, angles, movements, framing choices
- Editing style: pacing, cuts, transitions, rhythm
- Graphics, animations, text overlays, and their timing
- Color grading, lighting, visual mood
- B-roll footage and how it's used
- Thumbnail and title analysis (if visible/relevant)

## TRANSCRIPT / DIALOGUE

Provide a full verbatim transcript of all spoken content in the video, organized chronologically with timestamps:
- Transcribe ALL dialogue, narration, and spoken content word-for-word
- Include speaker identification where multiple speakers are present
- Note filler words, false starts, laughter, and other vocal elements in [brackets]
- Use timestamps (MM:SS) to anchor each segment of speech
- If the video is very long, prioritize completeness over brevity — capture everything said
- For non-English content, provide the original language plus an English translation

## AUDIO & PRODUCTION ANALYSIS

- Speaking style, tone, energy, and how it shifts throughout
- Background music/sound design and its emotional effect
- Pauses, emphasis, and rhetorical techniques
- Sound mixing choices and audio quality

## DEEPER INSIGHTS & NON-OBVIOUS OBSERVATIONS

This is crucial—provide analysis that goes beyond what's surface-level:
- What is the creator's underlying message or agenda (stated or unstated)?
- What persuasion techniques or narrative structures are being used?
- What assumptions does the video make about its audience?
- What context (cultural, historical, industry-specific) helps understand this content?
- What biases or perspectives are present?
- What questions does this video raise but not answer?
- How does this connect to broader trends, debates, or topics?
- What might a casual viewer miss that's actually significant?

## CONTENT STRUCTURE & STRATEGY

- How is the video structured? What's the narrative arc?
- How does it hook viewers and maintain attention?
- What calls-to-action exist (explicit or implicit)?
- How does it compare to typical content in this genre/niche?

## CREATOR & CONTEXT

- Who made this and what's their background/credibility?
- What's the apparent purpose (educate, entertain, persuade, sell)?
- Who is the target audience and how can you tell?

Be extremely thorough. Length is not a concern—you have a large output budget, so use it. Your analysis should be comprehensive enough that someone could understand this video in rich detail without ever watching it. Include specific examples, timestamps, and direct observations rather than vague generalizations. Do not truncate or abbreviate any section."""


# ---------------------------------------------------------------------------
# URL helpers
# ---------------------------------------------------------------------------


def _is_youtube_url(url: str) -> bool:
    """Return True if the URL is a recognised YouTube watch, short-link, or Shorts URL.

    Performs a case-insensitive substring check for the standard YouTube
    surfaces (``youtube.com/watch``, ``youtu.be/``, ``youtube.com/shorts``) so
    callers can route YouTube links down the native Gemini ingestion path
    instead of the download/upload fallback.

    This helper only inspects the string and has no side effects. It is called
    by :func:`_classify_url` as the first branch of URL classification, which in
    turn runs at the top of :func:`run`.

    Args:
        url: The candidate video URL (may be empty or whitespace-padded).

    Returns:
        bool: ``True`` if ``url`` matches a known YouTube pattern, else ``False``
        (including for empty/``None``-ish input).
    """
    if not url:
        return False
    url_lower = url.lower().strip()
    return (
        "youtube.com/watch" in url_lower
        or "youtu.be/" in url_lower
        or "youtube.com/shorts" in url_lower
    )


def _extract_video_id(url: str) -> Optional[str]:
    """Extract the 11-character YouTube video ID from a YouTube URL.

    Tries a sequence of regular expressions covering watch links, ``youtu.be``
    short links, Shorts, ``/embed/``, and ``/v/`` paths, returning the first
    match. The extracted ID is used for logging labels and to populate the
    ``video_id`` field of the tool result.

    This helper only parses the string and has no side effects. It is called by
    :func:`run`: once to build the ``yt:<id>`` ``video_label`` for log messages
    when the URL classifies as YouTube, and again when assembling the success
    result for the native-ingestion path. (A separate, unrelated function of the
    same name exists in ``tools/file_download.py`` and is not this one.)

    Args:
        url: The YouTube URL to parse.

    Returns:
        Optional[str]: The 11-character video ID, or ``None`` if no supported
        pattern matched.
    """
    patterns = [
        r"(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/shorts/)([a-zA-Z0-9_-]{11})",
        r"(?:youtube\.com/embed/)([a-zA-Z0-9_-]{11})",
        r"(?:youtube\.com/v/)([a-zA-Z0-9_-]{11})",
    ]
    for pattern in patterns:
        match = re.search(pattern, url)
        if match:
            return match.group(1)
    return None


def _is_direct_video_url(url: str) -> bool:
    """Return True if the URL path ends with a known video extension.

    Parses out the URL path and checks its file extension against
    ``VIDEO_EXTENSIONS`` so callers can decide whether a link is a raw media
    file to download over plain HTTP rather than something that needs yt-dlp.

    This helper only inspects the string and has no side effects. It is called
    by :func:`_classify_url` (to assign the ``direct`` class) and by
    :func:`run` in the non-YouTube branch, where a failed yt-dlp attempt falls
    back to :func:`_download_direct` only when the URL still looks direct.

    Args:
        url: The candidate video URL.

    Returns:
        bool: ``True`` when the path extension is in ``VIDEO_EXTENSIONS``,
        otherwise ``False``.
    """
    path = urlparse(url).path
    ext = os.path.splitext(path)[1].lower()
    return ext in VIDEO_EXTENSIONS


def _classify_url(url: str) -> str:
    """Classify a video URL into a processing strategy label.

    Routes the URL to one of three ingestion paths by consulting
    :func:`_is_youtube_url` first and :func:`_is_direct_video_url` second,
    defaulting to the generic yt-dlp downloader. The returned label drives all
    of the major branching in :func:`run`: ``youtube`` uses Gemini native
    ingestion, ``direct`` downloads the file over HTTP, and ``ytdlp`` shells
    out to yt-dlp.

    This helper only inspects the string and has no side effects. It is called
    once near the top of :func:`run`.

    Args:
        url: The candidate video URL.

    Returns:
        str: One of ``youtube``, ``direct``, or ``ytdlp``.
    """
    if _is_youtube_url(url):
        return "youtube"
    if _is_direct_video_url(url):
        return "direct"
    return "ytdlp"


# ---------------------------------------------------------------------------
# Metadata & FPS (shared — yt-dlp supports most sites)
# ---------------------------------------------------------------------------


def _calculate_fps(duration_seconds: int) -> float:
    """Calculate appropriate FPS based on video duration.

    Shorter videos get higher FPS for more detail.
    Longer videos get lower FPS to manage token usage.
    """
    if duration_seconds <= 60:
        return 4
    elif duration_seconds <= 300:
        return 2
    elif duration_seconds <= 900:
        return 1.5
    elif duration_seconds <= 1800:
        return 1
    else:
        return 0.5


def _get_video_metadata(url: str) -> Optional[dict]:
    """Fetch title, channel, duration, and other metadata for a video via yt-dlp.

    Shells out to ``yt-dlp --dump-json --skip-download`` (with the shared
    cookie jar and the project's network-hardening args), then normalises the
    raw info dict into a flat metadata mapping and derives a formatted upload
    date. The duration field is what :func:`run` uses to enforce
    ``MAX_DURATION_SECONDS`` and to pick an FPS, while the title, channel, tags,
    and description feed the analysis prompt. The whole thing is best-effort:
    any failure returns ``None`` so the tool can still describe the video.

    This is a blocking subprocess call, so :func:`run` invokes it via
    ``asyncio.to_thread``. It touches the filesystem only by reading the
    ``YTDLP_COOKIES`` file that yt-dlp loads, and makes outbound HTTP requests
    to the video host through yt-dlp.

    Args:
        url: The video URL to inspect.

    Returns:
        Optional[dict]: A metadata mapping (title, channel, duration,
        upload_date, view_count, like_count, description, tags, categories,
        extractor, and a derived ``upload_date_formatted``), or ``None`` on a
        yt-dlp non-zero exit, parse failure, timeout, or any other exception.
    """
    try:
        result = subprocess.run(
            [
                "yt-dlp",
                "--cookies",
                YTDLP_COOKIES,
                "--dump-json",
                "--skip-download",
                "--no-warnings",
                "--no-playlist",
                *YTDLP_METADATA_NETWORK_ARGS,
                url,
            ],
            capture_output=True,
            text=True,
            timeout=48,
        )
        if result.returncode != 0:
            logger.warning(
                "yt-dlp metadata failed (rc=%d): %s",
                result.returncode,
                result.stderr[:200],
            )
            return None

        info = parse_ytdlp_dump_json_stdout(result.stdout)
        if not info:
            logger.warning("yt-dlp metadata parse failed for %s", url)
            return None
        metadata = {
            "title": info.get("title", "Unknown"),
            "channel": info.get("channel", info.get("uploader", "Unknown")),
            "channel_id": info.get("channel_id", ""),
            "duration": int(info.get("duration", 0)),
            "upload_date": info.get("upload_date", ""),
            "view_count": info.get("view_count", 0),
            "like_count": info.get("like_count", 0),
            "description": info.get("description", ""),
            "tags": info.get("tags", []),
            "categories": info.get("categories", []),
            "extractor": info.get("extractor", ""),
        }

        if metadata["upload_date"] and len(metadata["upload_date"]) == 8:
            d = metadata["upload_date"]
            metadata["upload_date_formatted"] = f"{d[:4]}-{d[4:6]}-{d[6:8]}"
        else:
            metadata["upload_date_formatted"] = metadata["upload_date"]

        return metadata

    except subprocess.TimeoutExpired as exc:
        tail = ""
        if exc.stderr:
            tail = (
                exc.stderr[-500:]
                if isinstance(exc.stderr, str)
                else exc.stderr.decode("utf-8", errors="replace")[-500:]
            )
        logger.warning(
            "yt-dlp metadata timed out for %s (partial stderr: %r)",
            url,
            tail.replace("\n", " ")[:350] if tail else "<empty>",
        )
        return None
    except Exception as e:
        logger.warning("yt-dlp metadata fetch failed: %s", e)
        return None


def _format_metadata_for_prompt(metadata: dict, source: str = "video platform") -> str:
    """Render a metadata mapping into a human-readable block for the Gemini prompt.

    Builds a newline-joined summary (title, channel, duration, and any
    optional upload date, view/like counts, tags, categories, and description)
    so the model receives context about the video alongside the frames. Only
    keys that are present and truthy are emitted, keeping the prompt compact.

    This helper is pure string formatting with no side effects. It is called
    by :func:`run` while assembling the prompt, immediately before the
    ``ANALYSIS_PROMPT`` template, whenever yt-dlp metadata was obtained.

    Args:
        metadata: A metadata mapping as produced by :func:`_get_video_metadata`
            (``duration`` is required; the rest are optional).
        source: Display label for the originating platform, woven into the
            prompt header by the caller.

    Returns:
        str: The formatted, newline-joined metadata block.
    """
    duration = metadata["duration"]
    duration_str = f"{duration // 60}:{duration % 60:02d}"

    lines = [
        f"Title: {metadata['title']}",
        f"Channel: {metadata['channel']}",
        f"Duration: {duration_str}",
    ]

    if metadata.get("upload_date_formatted"):
        lines.append(f"Upload Date: {metadata['upload_date_formatted']}")
    if metadata.get("view_count"):
        lines.append(f"Views: {metadata['view_count']:,}")
    if metadata.get("like_count"):
        lines.append(f"Likes: {metadata['like_count']:,}")
    if metadata.get("tags"):
        lines.append(f"Tags: {', '.join(metadata['tags'][:15])}")
    if metadata.get("categories"):
        lines.append(f"Categories: {', '.join(metadata['categories'])}")
    if metadata.get("description"):
        lines.append(f"\nVideo Description:\n{metadata['description']}")

    return "\n".join(lines)


# ---------------------------------------------------------------------------
# Non-YouTube: download helpers
# ---------------------------------------------------------------------------


async def _download_with_ytdlp(
    url: str,
    temp_dir: str,
) -> tuple[Optional[str], Optional[str]]:
    """Download a video into a temp directory via yt-dlp.

    Spawns ``yt-dlp`` as an async subprocess using the shared cookie jar and
    the 720p-capped ``YTDLP_FORMAT_SELECTOR``, writing into ``temp_dir`` and
    enforcing ``MAX_DOWNLOAD_SIZE`` both via ``--max-filesize`` and a
    post-download size check. It first confirms yt-dlp is on ``PATH`` and reads
    the resolved output path from ``--print after_move:filepath``, falling back
    to scanning ``temp_dir`` for a media file. The whole call is bounded by
    ``DOWNLOAD_TIMEOUT``.

    This is the primary downloader for the non-YouTube path. It writes media to
    the filesystem and makes outbound HTTP requests through yt-dlp. It is called
    by :func:`run`: directly for ``ytdlp``-classified URLs, and as a fallback
    when :func:`_download_direct` fails for a ``direct`` URL.

    Args:
        url: The video URL to download.
        temp_dir: An existing temporary directory to download into.

    Returns:
        tuple[Optional[str], Optional[str]]: ``(local_path, None)`` on success,
        or ``(None, error_message)`` on a missing binary, non-zero exit,
        oversized file, timeout, or any other failure.
    """
    if not await asyncio.to_thread(shutil.which, "yt-dlp"):
        return None, "yt-dlp is not installed."

    template = os.path.join(temp_dir, "%(title).100s.%(ext)s")
    cmd = [
        "yt-dlp",
        "--cookies",
        YTDLP_COOKIES,
        "-f",
        YTDLP_FORMAT_SELECTOR,
        "-o",
        template,
        "--no-playlist",
        "--no-overwrites",
        "--restrict-filenames",
        "--max-filesize",
        str(MAX_DOWNLOAD_SIZE),
        "--print",
        "after_move:filepath",
        url,
    ]

    try:
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await asyncio.wait_for(
            proc.communicate(),
            timeout=DOWNLOAD_TIMEOUT,
        )
        out = stdout.decode("utf-8", errors="replace").strip()
        err = stderr.decode("utf-8", errors="replace").strip()

        if proc.returncode != 0:
            msg = err or out or f"yt-dlp exit {proc.returncode}"
            return None, msg

        if out:
            last_line = out.strip().split("\n")[-1].strip()
            p = Path(last_line)
            if p.exists():
                size = p.stat().st_size
                if size > MAX_DOWNLOAD_SIZE:
                    p.unlink(missing_ok=True)
                    return None, (
                        f"Downloaded file too large "
                        f"({size / 1024 / 1024:.0f} MB > "
                        f"{MAX_DOWNLOAD_SIZE // 1024 // 1024} MB limit)."
                    )
                return str(p), None

        # Fallback: find any media file in temp_dir
        media_exts = {".mp4", ".webm", ".mkv", ".m4a", ".mp3", ".flv", ".avi"}
        for f in Path(temp_dir).iterdir():
            if f.is_file() and f.suffix.lower() in media_exts:
                return str(f), None

        return None, "Download completed but output file not found."

    except asyncio.TimeoutError:
        return None, f"Download timed out after {DOWNLOAD_TIMEOUT}s."
    except Exception as exc:
        return None, f"yt-dlp download error: {exc}"


async def _download_direct(
    url: str,
    temp_dir: str,
) -> tuple[Optional[str], Optional[str]]:
    """Download a direct-link video file over HTTP into a temp directory.

    Streams the response body to a sanitised filename inside ``temp_dir`` in
    64 KiB chunks, enforcing ``MAX_DOWNLOAD_SIZE`` from both the
    ``Content-Length`` header and the running byte total. Crucially this routes
    through the SSRF-guarded :func:`safe_httpx_client` and
    :func:`safe_http_stream`, which validate every redirect hop, block private
    IP ranges, and pin connects so a model-supplied URL cannot be used to reach
    internal services.

    It writes media to the filesystem and makes outbound HTTP requests. It is
    called by :func:`run` for ``direct``-classified URLs, and as a fallback
    when :func:`_download_with_ytdlp` fails on a URL that still looks direct.

    Args:
        url: The direct media URL to download.
        temp_dir: An existing temporary directory to download into.

    Returns:
        tuple[Optional[str], Optional[str]]: ``(local_path, None)`` on success,
        or ``(None, error_message)`` on a blocked/invalid URL, non-200 status,
        oversized file, timeout, or any other failure.
    """
    try:
        parsed = urlparse(url)
        filename = os.path.basename(unquote(parsed.path)) or "video.mp4"
        filename = re.sub(r'[<>:"/\\|?*]', "_", filename)
        local_path = os.path.join(temp_dir, filename)

        timeout = httpx.Timeout(DOWNLOAD_TIMEOUT)
        # SSRF guard: a model-supplied "direct video URL" must not be able to
        # reach internal hosts. safe_http_stream validates each hop, blocks
        # private ranges, and pins connects, so a redirect cannot smuggle the
        # download to e.g. 10.10.0.x:6379.
        async with safe_httpx_client(timeout=timeout) as client:
            async with safe_http_stream(
                client, "GET", url, max_redirects=5
            ) as resp:
                if resp.status_code != 200:
                    return None, f"HTTP {resp.status_code}: {resp.reason_phrase}"

                cl = resp.headers.get("Content-Length")
                if cl and int(cl) > MAX_DOWNLOAD_SIZE:
                    return None, (
                        f"File too large "
                        f"({int(cl) / 1024 / 1024:.0f} MB > "
                        f"{MAX_DOWNLOAD_SIZE // 1024 // 1024} MB limit)."
                    )

                total = 0
                async with aiofiles.open(local_path, "wb") as f:
                    async for chunk in resp.aiter_bytes(65536):
                        total += len(chunk)
                        if total > MAX_DOWNLOAD_SIZE:
                            return None, (
                                f"File exceeded "
                                f"{MAX_DOWNLOAD_SIZE // 1024 // 1024} MB "
                                f"during download."
                            )
                        await f.write(chunk)

        return local_path, None

    except ValueError as exc:
        return None, f"Blocked URL: {exc}"
    except asyncio.TimeoutError:
        return None, f"Download timed out after {DOWNLOAD_TIMEOUT}s."
    except Exception as exc:
        return None, f"HTTP download error: {exc}"


# ---------------------------------------------------------------------------
# Non-YouTube: Gemini File API upload
# ---------------------------------------------------------------------------


def _upload_to_gemini_sync(
    client: genai.Client,
    local_path: str,
) -> tuple[Optional[object], Optional[str]]:
    """Upload a local video to the Gemini File API and poll until it is ACTIVE.

    Guesses the MIME type, uploads the file through the genai ``client``, then
    polls ``files.get`` every ``GEMINI_UPLOAD_POLL_INTERVAL`` seconds until the
    file reports ``ACTIVE`` or until ``GEMINI_UPLOAD_TIMEOUT`` elapses. On a
    non-``ACTIVE``, non-``PROCESSING`` state or a timeout it deletes the
    half-uploaded remote file before returning an error, so it never leaks
    Gemini-hosted storage on the failure paths.

    This makes blocking network calls to the Gemini File API and reads the
    local file, so :func:`run` invokes it via ``asyncio.to_thread`` in the
    non-YouTube branch (after the download step). The successful upload's
    ``uri`` is then handed to :func:`_generate_description`, and :func:`run`
    deletes the file again in its ``finally`` block.

    Args:
        client: An initialised genai ``Client`` used for the upload and polls.
        local_path: Filesystem path of the downloaded video.

    Returns:
        tuple[Optional[object], Optional[str]]: ``(gemini_file, None)`` once the
        upload is ACTIVE, or ``(None, error_message)`` on upload failure,
        a failed processing state, a status-check error, or a timeout.
    """
    import time

    mime_type = mimetypes.guess_type(local_path)[0] or "video/mp4"

    try:
        uploaded = client.files.upload(
            file=local_path,
            config={"mime_type": mime_type},
        )
        logger.info(
            "describe_video: uploaded %s as %s (state=%s)",
            local_path,
            uploaded.name,
            uploaded.state,
        )
    except Exception as exc:
        return None, f"File upload failed: {exc}"

    elapsed = 0
    while True:
        state_str = str(getattr(uploaded, "state", "")).upper()
        if "ACTIVE" in state_str:
            break
        if "PROCESSING" not in state_str:
            # Neither ACTIVE nor PROCESSING — something went wrong
            try:
                client.files.delete(name=uploaded.name)
            except Exception:
                pass
            return None, f"File processing failed (state={uploaded.state})."
        if elapsed >= GEMINI_UPLOAD_TIMEOUT:
            try:
                client.files.delete(name=uploaded.name)
            except Exception:
                pass
            return None, (f"File processing timed out after {GEMINI_UPLOAD_TIMEOUT}s.")
        time.sleep(GEMINI_UPLOAD_POLL_INTERVAL)
        elapsed += GEMINI_UPLOAD_POLL_INTERVAL
        try:
            uploaded = client.files.get(name=uploaded.name)
        except Exception as exc:
            return None, f"Failed to check file status: {exc}"
        logger.info(
            "describe_video: file %s state=%s (elapsed=%ds)",
            uploaded.name,
            uploaded.state,
            elapsed,
        )

    return uploaded, None


# ---------------------------------------------------------------------------
# Shared: Gemini generation with model fallback
# ---------------------------------------------------------------------------


def _is_daily_quota_error(error_str: str) -> bool:
    """Detect Gemini daily-quota exhaustion from a genai SDK exception message.

    Distinguishes a per-day quota ceiling (which warrants rotating to a fresh
    API key) from ordinary rate limiting by requiring both a ``429`` and the
    phrase ``per day`` in the stringified error. This is a deliberately narrow
    check so transient 429s do not burn through keys.

    This helper only inspects the string and has no side effects. It is called
    by :func:`_generate_description` inside its exception handler to decide
    whether to mark the current key spent and rotate.

    Args:
        error_str: The stringified genai SDK exception.

    Returns:
        bool: ``True`` if the message indicates daily-quota exhaustion.
    """
    low = error_str.lower()
    return "429" in low and "per day" in low


async def _generate_description(
    client: genai.Client,
    video_part: types.Part,
    prompt: str,
    video_label: str,
) -> tuple[Optional[str], Optional[str], Optional[str]]:
    """Run the Gemini analysis call with model fallback and quota-aware key rotation.

    Walks ``DEFAULT_MODEL`` followed by ``FALLBACK_MODELS``, issuing each
    ``generate_content`` request (high media resolution, unbounded thinking
    budget, 65k output tokens) on a worker thread via ``asyncio.to_thread`` and
    logging token usage. On a daily-quota 429 detected by
    :func:`_is_daily_quota_error` it marks the spent key via
    ``mark_key_daily_spent``, swaps in a fresh client from
    ``next_gemini_flash_key`` (up to ``max_daily_retries`` times), and retries;
    other retriable errors (503, generic 429, overloaded, rate limit, resource
    exhausted) advance to the next model.

    Beyond the Gemini HTTP calls this touches the shared key pool in
    ``gemini_embed_pool`` for rotation accounting. It is called by :func:`run`
    in both branches: once with a native YouTube ``file_uri`` part, and once
    with the part built from a Gemini File API upload.

    Args:
        client: The initialised genai ``Client`` for the first attempt
            (replaced internally on key rotation).
        video_part: The prepared ``types.Part`` referencing the video.
        prompt: The fully assembled analysis prompt text.
        video_label: Short label (e.g. ``yt:<id>``) used only in log lines.

    Returns:
        tuple[Optional[str], Optional[str], Optional[str]]: On success
        ``(result_text, model_used, None)``; on exhaustion of all models
        ``(None, None, error_message)``.
    """
    models_to_try = [DEFAULT_MODEL] + [m for m in FALLBACK_MODELS if m != DEFAULT_MODEL]

    last_error: Optional[Exception] = None
    current_client = client
    daily_retries = 0
    max_daily_retries = 3

    for current_model in models_to_try:
        try:
            logger.info(
                "describe_video: trying model=%s for %s", current_model, video_label
            )

            response = await asyncio.to_thread(
                current_client.models.generate_content,
                model=current_model,
                contents=types.Content(
                    parts=[video_part, types.Part(text=prompt)],
                ),
                config=types.GenerateContentConfig(
                    system_instruction=SYSTEM_INSTRUCTION,
                    media_resolution=types.MediaResolution.MEDIA_RESOLUTION_HIGH,
                    thinking_config=types.ThinkingConfig(thinking_budget=-1),
                    max_output_tokens=65000,
                ),
            )

            if response.usage_metadata:
                logger.info(
                    "describe_video: %s model=%s prompt_tokens=%s "
                    "response_tokens=%s total_tokens=%s",
                    video_label,
                    current_model,
                    response.usage_metadata.prompt_token_count,
                    response.usage_metadata.candidates_token_count,
                    response.usage_metadata.total_token_count,
                )

            return response.text, current_model, None

        except Exception as e:
            error_str = str(e)
            last_error = e

            if _is_daily_quota_error(error_str) and daily_retries < max_daily_retries:
                try:
                    old_key = current_client._api_client.api_key
                except AttributeError:
                    old_key = ""
                if old_key:
                    await mark_key_daily_spent(old_key, "generate")
                new_key = next_gemini_flash_key()
                current_client = genai.Client(api_key=new_key)
                daily_retries += 1
                logger.warning(
                    "describe_video: daily quota hit, rotated to new key "
                    "(attempt %d/%d)",
                    daily_retries,
                    max_daily_retries,
                )
                continue

            is_retriable = any(
                kw in error_str.lower()
                for kw in (
                    "503",
                    "429",
                    "overloaded",
                    "rate limit",
                    "resource exhausted",
                )
            )
            if is_retriable and current_model != models_to_try[-1]:
                logger.warning(
                    "describe_video: model=%s retriable error: %s — falling back",
                    current_model,
                    error_str[:200],
                )
                continue
            else:
                logger.error(
                    "describe_video: model=%s failed: %s",
                    current_model,
                    error_str[:500],
                )
                break

    return None, None, f"Failed to describe video: {last_error}"


# ---------------------------------------------------------------------------
# Tool registration
# ---------------------------------------------------------------------------

TOOL_NAME = "describe_video"
TOOL_DESCRIPTION = (
    "Provide an extremely detailed description and analysis of a video "
    "using the Gemini API. Supports YouTube (native ingestion), Rumble, "
    "Twitch VODs, direct MP4 files, and 1000+ other video sites via yt-dlp. "
    "Processing time depends on video length (typically 30-120 seconds). "
    "Works best with videos under 1 hour."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "video_url": {
            "type": "string",
            "description": (
                "Full URL of the video to analyse "
                "(YouTube, Rumble, Twitch, direct MP4, etc.)."
            ),
        },
        "focus_area": {
            "type": "string",
            "description": (
                "Optional aspect to focus on (e.g. 'visual effects', "
                "'speaker arguments', 'music analysis', 'tutorial steps')."
            ),
        },
    },
    "required": ["video_url"],
}


[docs] async def run( video_url: str = "", youtube_url: str = "", focus_area: Optional[str] = None, ctx=None, ) -> str: """Tool entry point: produce an exhaustive Gemini analysis of a video URL. Orchestrates the whole pipeline: it classifies the URL via :func:`_classify_url`, resolves a Gemini key (preferring the user's stored key over the rotating default pool, with per-user default-key rate limiting via ``manage_api_keys``), fetches metadata with :func:`_get_video_metadata`, enforces ``MAX_DURATION_SECONDS``, picks an FPS with :func:`_calculate_fps`, and builds the prompt from :func:`_format_metadata_for_prompt` plus ``ANALYSIS_PROMPT`` and any ``focus_area``. YouTube URLs take a fast native Gemini ingestion path; everything else downloads (:func:`_download_direct` or :func:`_download_with_ytdlp`), uploads via :func:`_upload_to_gemini_sync`, and cleans up both the temp directory and the Gemini-hosted file in a ``finally`` block. The actual model call goes through :func:`_generate_description`. Registered under ``TOOL_NAME = "describe_video"`` and discovered by ``tool_loader.load_tools``; the inference worker dispatches it through the ``ToolRegistry`` with a populated ``ctx``. It reads/writes per-user API-key and usage state in Redis via ``ctx.redis``, touches the filesystem for temporary downloads, and makes outbound HTTP/Gemini calls. Args: video_url: The video URL to analyse (primary argument). youtube_url: Legacy alias accepted when ``video_url`` is absent. focus_area: Optional aspect to give extra depth in the analysis. ctx: Tool execution context supplying ``redis``, ``user_id``, ``channel_id``, and ``config`` for key resolution and limits. Returns: str: A JSON document. On success it carries ``success: true`` with the ``description``, ``model_used``, ``source``, and available metadata; on failure it carries an ``error`` message (missing URL, unextractable ID, over-length, rate-limit, download/upload, or generation error). """ url = video_url or youtube_url if not url: return json.dumps( { "error": "Missing required argument: video_url is required.", } ) url_type = _classify_url(url) video_label = url if url_type == "youtube": video_id = _extract_video_id(url) if not video_id: return json.dumps( { "error": "Could not extract video ID from YouTube URL.", } ) video_label = f"yt:{video_id}" logger.info("describe_video: url_type=%s label=%s", url_type, video_label) # ------------------------------------------------------------------ # Resolve Gemini API key — prefer user key over default # ------------------------------------------------------------------ user_gemini_key = None if ctx and getattr(ctx, "redis", None) and getattr(ctx, "user_id", None): try: from tools.manage_api_keys import get_user_api_key user_gemini_key = await get_user_api_key( ctx.user_id, "gemini", redis_client=ctx.redis, channel_id=getattr(ctx, "channel_id", None), config=getattr(ctx, "config", None), ) except Exception: pass if user_gemini_key: client = genai.Client(api_key=user_gemini_key) logger.info("describe_video: using user-provided Gemini API key") _using_default_key = False else: from tools.manage_api_keys import ( check_default_key_limit, default_key_limit_applies, default_key_limit_error, ) if await default_key_limit_applies(ctx): allowed, current, limit = await check_default_key_limit( ctx.user_id, "describe_video", ctx.redis, daily_limit=20, ) if not allowed: return json.dumps( { "error": default_key_limit_error( "describe_video", current, limit, ), } ) client = genai.Client(api_key=next_gemini_flash_key()) _using_default_key = True # ------------------------------------------------------------------ # Fetch metadata via yt-dlp (best-effort — tool works without it) # ------------------------------------------------------------------ metadata = await asyncio.to_thread(_get_video_metadata, url) if metadata and metadata.get("duration"): duration = metadata["duration"] if duration > MAX_DURATION_SECONDS: return json.dumps( { "error": ( f"Video is too long ({duration // 60} minutes). " f"Maximum allowed duration is " f"{MAX_DURATION_SECONDS // 60} minutes." ), } ) fps = _calculate_fps(duration) logger.info( "describe_video: %s title=%r duration=%ds fps=%.1f", video_label, metadata.get("title", "?"), duration, fps, ) else: fps = 1.0 logger.info("describe_video: no metadata, default fps=%.1f", fps) # ------------------------------------------------------------------ # Build prompt with metadata + analysis template # ------------------------------------------------------------------ source_name = ( "YouTube" if url_type == "youtube" else ( (metadata.get("extractor") or url_type).replace("_", " ").title() if metadata else url_type.title() ) ) prompt_parts: list[str] = [] if metadata: metadata_str = _format_metadata_for_prompt(metadata, source=source_name) prompt_parts.append( f"## VIDEO METADATA (from {source_name})\n\n" f"{metadata_str}\n\n---\n" ) prompt_parts.append(ANALYSIS_PROMPT) if focus_area: prompt_parts.append( f""" ## SPECIFIC FOCUS AREA The user has requested special attention to the following: \"\"\"{focus_area}\"\"\" While still providing comprehensive analysis, give EXTRA DEPTH AND DETAIL to this specific area. If timestamps are mentioned, pay particular attention to those sections. If topics are mentioned, explore them more thoroughly than other aspects.""" ) prompt = "\n".join(prompt_parts) # ------------------------------------------------------------------ # YouTube: native Gemini ingestion (fast path — no download) # ------------------------------------------------------------------ if url_type == "youtube": video_part = types.Part( file_data=types.FileData(file_uri=url), video_metadata=types.VideoMetadata(fps=fps), ) result_text, model_used, err = await _generate_description( client, video_part, prompt, video_label, ) if err: return json.dumps({"error": err}) logger.info("describe_video: %s done (%d chars)", video_label, len(result_text)) result = { "success": True, "video_id": _extract_video_id(url), "video_url": url, "source": "youtube", "model_used": model_used, "description": result_text, } if metadata: result["title"] = metadata.get("title") result["channel"] = metadata.get("channel") result["duration_seconds"] = metadata.get("duration") if _using_default_key: from tools.manage_api_keys import ( default_key_limit_applies, increment_default_key_usage, ) if await default_key_limit_applies(ctx): await increment_default_key_usage( ctx.user_id, "describe_video", ctx.redis, ) return json.dumps(result) # ------------------------------------------------------------------ # Non-YouTube: download → upload → describe → cleanup # ------------------------------------------------------------------ temp_dir = tempfile.mkdtemp(prefix="describe_video_") gemini_file = None try: # --- Download ------------------------------------------------ if url_type == "direct": local_path, dl_err = await _download_direct(url, temp_dir) if dl_err: # Direct download failed — try yt-dlp as fallback local_path, dl_err = await _download_with_ytdlp(url, temp_dir) else: local_path, dl_err = await _download_with_ytdlp(url, temp_dir) if dl_err and _is_direct_video_url(url): local_path, dl_err = await _download_direct(url, temp_dir) if dl_err or not local_path: return json.dumps( { "error": f"Failed to download video: {dl_err}", } ) file_size = os.path.getsize(local_path) logger.info( "describe_video: downloaded %s (%.1f MB)", local_path, file_size / 1024 / 1024, ) # --- Upload to Gemini File API ------------------------------- gemini_file, up_err = await asyncio.to_thread( _upload_to_gemini_sync, client, local_path, ) if up_err or not gemini_file: return json.dumps( { "error": f"Failed to upload video to Gemini: {up_err}", } ) # --- Generate description ------------------------------------ video_part = types.Part( file_data=types.FileData(file_uri=gemini_file.uri), video_metadata=types.VideoMetadata(fps=fps), ) result_text, model_used, gen_err = await _generate_description( client, video_part, prompt, video_label, ) if gen_err: return json.dumps({"error": gen_err}) logger.info("describe_video: %s done (%d chars)", video_label, len(result_text)) result = { "success": True, "video_url": url, "source": url_type, "model_used": model_used, "description": result_text, } if metadata: result["title"] = metadata.get("title") result["channel"] = metadata.get("channel") result["duration_seconds"] = metadata.get("duration") if metadata.get("extractor"): result["platform"] = metadata["extractor"] if _using_default_key: from tools.manage_api_keys import ( default_key_limit_applies, increment_default_key_usage, ) if await default_key_limit_applies(ctx): await increment_default_key_usage( ctx.user_id, "describe_video", ctx.redis, ) return json.dumps(result) finally: # Cleanup local temp files shutil.rmtree(temp_dir, ignore_errors=True) # Cleanup Gemini-hosted file if gemini_file: try: await asyncio.to_thread( client.files.delete, name=gemini_file.name, ) logger.info("describe_video: deleted Gemini file %s", gemini_file.name) except Exception: logger.warning( "describe_video: failed to delete Gemini file %s", getattr(gemini_file, "name", "?"), )