Source code for tools.file_download

"""Download files from URLs (incl. YouTube via yt-dlp)."""

from __future__ import annotations

import aiofiles
import asyncio
import json
import logging
import os
import re
import shutil
from pathlib import Path
from typing import Any
from urllib.parse import unquote, urlparse

from tools._safe_http import assert_safe_http_url

logger = logging.getLogger(__name__)

# Base directory -- all downloads are sandboxed here
BASE_PATH = Path("/home/star/large_files")

# yt-dlp config
YTDLP_COOKIES = "/root/cookies.txt"
YTDLP_JS_RUNTIMES = "node"
YTDLP_FORMAT_SELECTOR = (
    "best[filesize<20M]/best[filesize_approx<20M]/"
    "bestvideo[filesize<15M]+bestaudio[filesize<5M]/"
    "bestvideo[filesize_approx<15M]+bestaudio[filesize_approx<5M]/"
    "worst"
)

MAX_DOWNLOAD_SIZE = 20 * 1024 * 1024  # 20 MB
DOWNLOAD_TIMEOUT = 300  # seconds

TOOL_NAME = "file_download"
TOOL_DESCRIPTION = (
    "Download a file from a URL (HTTPS only) to the server. "
    "Supports YouTube URLs via yt-dlp. Max 20 MB."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "url": {
            "type": "string",
            "description": (
                "URL to download from (HTTPS only). "
                "YouTube URLs are handled via yt-dlp."
            ),
        },
        "filename": {
            "type": "string",
            "description": (
                "Custom filename. For YouTube, only the base "
                "name (extension added automatically)."
            ),
        },
        "subdirectory": {
            "type": "string",
            "description": (
                "Sub-directory within /home/star/large_files."
            ),
        },
    },
    "required": ["url"],
}


# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------

def _resolve(file_path: str) -> tuple[Path | None, str | None]:
    """Internal helper: resolve.

        Args:
            file_path (str): The file path value.
        """
    try:
        base = BASE_PATH.resolve()
        base.mkdir(parents=True, exist_ok=True)
        if os.path.isabs(file_path):
            combined = base / file_path.lstrip("/")
        else:
            combined = base / file_path
        resolved = combined.resolve()
        if not str(resolved).startswith(str(base)):
            return None, "Path traversal detected."
        return resolved, None
    except Exception as exc:
        return None, f"Invalid path: {exc}"


def _is_youtube(url: str) -> bool:
    """Internal helper: is youtube.

        Args:
            url (str): URL string.

        Returns:
            bool: True on success, False otherwise.
        """
    u = url.lower().strip()
    return any(p in u for p in [
        "youtube.com/watch", "youtu.be/", "youtube.com/shorts",
    ])


def _extract_video_id(url: str) -> str | None:
    """Internal helper: extract video id.

        Args:
            url (str): URL string.
        """
    patterns = [
        r'(?:youtube\.com/watch\?v=|youtu\.be/|'
        r'youtube\.com/shorts/)([a-zA-Z0-9_-]{11})',
        r'(?:youtube\.com/embed/)([a-zA-Z0-9_-]{11})',
    ]
    for pat in patterns:
        m = re.search(pat, url)
        if m:
            return m.group(1)
    return None


def _sanitize(name: str) -> str:
    """Internal helper: sanitize.

        Args:
            name (str): Human-readable name.

        Returns:
            str: Result string.
        """
    s = re.sub(r'[<>:"/\\|?*]', '_', name).strip('. ')
    if len(s) > 200:
        stem, ext = os.path.splitext(s)
        s = stem[:200 - len(ext)] + ext
    return s or "downloaded_file"


def _filename_from_url(url: str) -> str:
    """Internal helper: filename from url.

        Args:
            url (str): URL string.

        Returns:
            str: Result string.
        """
    parsed = urlparse(url)
    path = unquote(parsed.path)
    fname = os.path.basename(path)
    return _sanitize(fname) if fname else "downloaded_file"


# ------------------------------------------------------------------
# Download backends
# ------------------------------------------------------------------

async def _download_youtube(
    url: str, output_dir: Path, filename: str | None,
) -> tuple[Path | None, str | None]:
    """Internal helper: download youtube.

        Args:
            url (str): URL string.
            output_dir (Path): The output dir value.
            filename (str | None): The filename value.
        """
    if not await asyncio.to_thread(shutil.which, "yt-dlp"):
        return None, "yt-dlp is not installed."

    if filename:
        safe = _sanitize(filename)
        safe = os.path.splitext(safe)[0]
        template = str(output_dir / f"{safe}.%(ext)s")
    else:
        template = str(output_dir / "%(title)s.%(ext)s")

    cmd = [
        "yt-dlp",
        "--cookies", YTDLP_COOKIES,
        "--js-runtimes", YTDLP_JS_RUNTIMES,
        "-f", YTDLP_FORMAT_SELECTOR,
        "-o", template,
        "--no-playlist",
        "--no-overwrites",
        "--restrict-filenames",
        "--print", "after_move:filepath",
        url,
    ]

    try:
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await asyncio.wait_for(
            proc.communicate(), timeout=DOWNLOAD_TIMEOUT,
        )
        out = stdout.decode("utf-8", errors="replace").strip()
        err = stderr.decode("utf-8", errors="replace").strip()

        if proc.returncode != 0:
            msg = err or out or f"yt-dlp exit {proc.returncode}"
            return None, f"YouTube download failed: {msg}"

        if out:
            last = out.strip().split("\n")[-1].strip()
            p = Path(last)
            if p.exists():
                return p, None

        # Fallback: find recently created media file
        exts = {".mp4", ".webm", ".mkv", ".m4a", ".mp3"}
        for f in output_dir.iterdir():
            if f.is_file() and f.suffix in exts:
                return f, None

        return None, "Download completed but file not found."

    except asyncio.TimeoutError:
        return None, (
            f"YouTube download timed out after {DOWNLOAD_TIMEOUT}s."
        )
    except Exception as exc:
        return None, f"YouTube download error: {exc}"


async def _download_http(
    url: str, output_path: Path,
) -> tuple[Path | None, str | None]:
    """Internal helper: download http.

        Args:
            url (str): URL string.
            output_path (Path): The output path value.
        """
    import aiohttp

    try:
        timeout = aiohttp.ClientTimeout(total=DOWNLOAD_TIMEOUT)
        async with aiohttp.ClientSession(timeout=timeout) as sess:
            async with sess.get(url) as resp:
                if resp.status != 200:
                    return None, (
                        f"HTTP error {resp.status}: {resp.reason}"
                    )

                cl = resp.headers.get("Content-Length")
                if cl and int(cl) > MAX_DOWNLOAD_SIZE:
                    mb = int(cl) / 1024 / 1024
                    return None, (
                        f"File too large: {mb:.1f} MB exceeds "
                        f"20 MB limit."
                    )

                output_path.parent.mkdir(parents=True, exist_ok=True)
                total = 0
                async with aiofiles.open(output_path, "wb") as f:
                    async for chunk in resp.content.iter_chunked(
                        8192,
                    ):
                        total += len(chunk)
                        if total > MAX_DOWNLOAD_SIZE:
                            output_path.unlink(missing_ok=True)
                            return None, (
                                "File too large: exceeded 20 MB "
                                "during download."
                            )
                        await f.write(chunk)

                return output_path, None

    except asyncio.TimeoutError:
        return None, (
            f"Download timed out after {DOWNLOAD_TIMEOUT}s."
        )
    except Exception as exc:
        return None, f"Download error: {exc}"


# ------------------------------------------------------------------
# Entry point
# ------------------------------------------------------------------

[docs] async def run( url: str, filename: str | None = None, subdirectory: str | None = None, **_kwargs: Any, ) -> str: """Execute this tool and return the result. Args: url (str): URL string. filename (str | None): The filename value. subdirectory (str | None): The subdirectory value. Returns: str: Result string. """ if not url: return json.dumps({ "status": "error", "error": "url is required.", }) try: url = assert_safe_http_url(url.strip()) except ValueError as exc: return json.dumps({"status": "error", "error": str(exc)}) parsed = urlparse(url) if parsed.scheme != "https": return json.dumps({ "status": "error", "error": ( f"Invalid URL scheme: '{parsed.scheme or 'none'}'. " f"Only HTTPS is allowed." ), }) if not parsed.netloc: return json.dumps({ "status": "error", "error": "Invalid URL: no domain.", }) # Determine output directory if subdirectory: output_dir, err = _resolve(subdirectory) if err: return json.dumps({"status": "error", "error": err}) else: output_dir = BASE_PATH.resolve() output_dir.mkdir(parents=True, exist_ok=True) if _is_youtube(url): vid = _extract_video_id(url) logger.info("Starting YouTube download: %s", vid or url) path, err = await _download_youtube( url, output_dir, filename, ) if err: return json.dumps({ "status": "error", "error": err, "source": "youtube", }) size = path.stat().st_size if size > MAX_DOWNLOAD_SIZE: path.unlink(missing_ok=True) mb = size / 1024 / 1024 return json.dumps({ "status": "error", "error": ( f"File too large: {mb:.2f} MB. Removed." ), "source": "youtube", }) return json.dumps({ "status": "success", "message": "YouTube video downloaded.", "file_path": str(path), "filename": path.name, "size": size, "size_human": f"{size / 1024 / 1024:.2f} MB", "source": "youtube", "video_id": vid, }) # Regular HTTPS download safe = _sanitize(filename) if filename else _filename_from_url(url) out_path, err = _resolve( str(Path(subdirectory or "") / safe), ) if err: return json.dumps({"status": "error", "error": err}) logger.info("Starting HTTP download: %s -> %s", url, out_path) path, err = await _download_http(url, out_path) if err: return json.dumps({ "status": "error", "error": err, "source": "http", }) size = path.stat().st_size return json.dumps({ "status": "success", "message": "File downloaded.", "file_path": str(path), "filename": path.name, "size": size, "size_human": f"{size / 1024 / 1024:.2f} MB", "source": "http", })