"""Download files from URLs (incl. YouTube via yt-dlp)."""
from __future__ import annotations
import aiofiles
import asyncio
import json
import logging
import os
import re
import shutil
from pathlib import Path
from typing import Any
from urllib.parse import unquote, urlparse
from tools._safe_http import assert_safe_http_url
logger = logging.getLogger(__name__)
# Base directory -- all downloads are sandboxed here
BASE_PATH = Path("/home/star/large_files")
# yt-dlp config
YTDLP_COOKIES = "/root/cookies.txt"
YTDLP_JS_RUNTIMES = "node"
YTDLP_FORMAT_SELECTOR = (
"best[filesize<20M]/best[filesize_approx<20M]/"
"bestvideo[filesize<15M]+bestaudio[filesize<5M]/"
"bestvideo[filesize_approx<15M]+bestaudio[filesize_approx<5M]/"
"worst"
)
MAX_DOWNLOAD_SIZE = 20 * 1024 * 1024 # 20 MB
DOWNLOAD_TIMEOUT = 300 # seconds
TOOL_NAME = "file_download"
TOOL_DESCRIPTION = (
"Download a file from a URL (HTTPS only) to the server. "
"Supports YouTube URLs via yt-dlp. Max 20 MB."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": (
"URL to download from (HTTPS only). "
"YouTube URLs are handled via yt-dlp."
),
},
"filename": {
"type": "string",
"description": (
"Custom filename. For YouTube, only the base "
"name (extension added automatically)."
),
},
"subdirectory": {
"type": "string",
"description": (
"Sub-directory within /home/star/large_files."
),
},
},
"required": ["url"],
}
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _resolve(file_path: str) -> tuple[Path | None, str | None]:
"""Internal helper: resolve.
Args:
file_path (str): The file path value.
"""
try:
base = BASE_PATH.resolve()
base.mkdir(parents=True, exist_ok=True)
if os.path.isabs(file_path):
combined = base / file_path.lstrip("/")
else:
combined = base / file_path
resolved = combined.resolve()
if not str(resolved).startswith(str(base)):
return None, "Path traversal detected."
return resolved, None
except Exception as exc:
return None, f"Invalid path: {exc}"
def _is_youtube(url: str) -> bool:
"""Internal helper: is youtube.
Args:
url (str): URL string.
Returns:
bool: True on success, False otherwise.
"""
u = url.lower().strip()
return any(p in u for p in [
"youtube.com/watch", "youtu.be/", "youtube.com/shorts",
])
def _extract_video_id(url: str) -> str | None:
"""Internal helper: extract video id.
Args:
url (str): URL string.
"""
patterns = [
r'(?:youtube\.com/watch\?v=|youtu\.be/|'
r'youtube\.com/shorts/)([a-zA-Z0-9_-]{11})',
r'(?:youtube\.com/embed/)([a-zA-Z0-9_-]{11})',
]
for pat in patterns:
m = re.search(pat, url)
if m:
return m.group(1)
return None
def _sanitize(name: str) -> str:
"""Internal helper: sanitize.
Args:
name (str): Human-readable name.
Returns:
str: Result string.
"""
s = re.sub(r'[<>:"/\\|?*]', '_', name).strip('. ')
if len(s) > 200:
stem, ext = os.path.splitext(s)
s = stem[:200 - len(ext)] + ext
return s or "downloaded_file"
def _filename_from_url(url: str) -> str:
"""Internal helper: filename from url.
Args:
url (str): URL string.
Returns:
str: Result string.
"""
parsed = urlparse(url)
path = unquote(parsed.path)
fname = os.path.basename(path)
return _sanitize(fname) if fname else "downloaded_file"
# ------------------------------------------------------------------
# Download backends
# ------------------------------------------------------------------
async def _download_youtube(
url: str, output_dir: Path, filename: str | None,
) -> tuple[Path | None, str | None]:
"""Internal helper: download youtube.
Args:
url (str): URL string.
output_dir (Path): The output dir value.
filename (str | None): The filename value.
"""
if not await asyncio.to_thread(shutil.which, "yt-dlp"):
return None, "yt-dlp is not installed."
if filename:
safe = _sanitize(filename)
safe = os.path.splitext(safe)[0]
template = str(output_dir / f"{safe}.%(ext)s")
else:
template = str(output_dir / "%(title)s.%(ext)s")
cmd = [
"yt-dlp",
"--cookies", YTDLP_COOKIES,
"--js-runtimes", YTDLP_JS_RUNTIMES,
"-f", YTDLP_FORMAT_SELECTOR,
"-o", template,
"--no-playlist",
"--no-overwrites",
"--restrict-filenames",
"--print", "after_move:filepath",
url,
]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=DOWNLOAD_TIMEOUT,
)
out = stdout.decode("utf-8", errors="replace").strip()
err = stderr.decode("utf-8", errors="replace").strip()
if proc.returncode != 0:
msg = err or out or f"yt-dlp exit {proc.returncode}"
return None, f"YouTube download failed: {msg}"
if out:
last = out.strip().split("\n")[-1].strip()
p = Path(last)
if p.exists():
return p, None
# Fallback: find recently created media file
exts = {".mp4", ".webm", ".mkv", ".m4a", ".mp3"}
for f in output_dir.iterdir():
if f.is_file() and f.suffix in exts:
return f, None
return None, "Download completed but file not found."
except asyncio.TimeoutError:
return None, (
f"YouTube download timed out after {DOWNLOAD_TIMEOUT}s."
)
except Exception as exc:
return None, f"YouTube download error: {exc}"
async def _download_http(
url: str, output_path: Path,
) -> tuple[Path | None, str | None]:
"""Internal helper: download http.
Args:
url (str): URL string.
output_path (Path): The output path value.
"""
import aiohttp
try:
timeout = aiohttp.ClientTimeout(total=DOWNLOAD_TIMEOUT)
async with aiohttp.ClientSession(timeout=timeout) as sess:
async with sess.get(url) as resp:
if resp.status != 200:
return None, (
f"HTTP error {resp.status}: {resp.reason}"
)
cl = resp.headers.get("Content-Length")
if cl and int(cl) > MAX_DOWNLOAD_SIZE:
mb = int(cl) / 1024 / 1024
return None, (
f"File too large: {mb:.1f} MB exceeds "
f"20 MB limit."
)
output_path.parent.mkdir(parents=True, exist_ok=True)
total = 0
async with aiofiles.open(output_path, "wb") as f:
async for chunk in resp.content.iter_chunked(
8192,
):
total += len(chunk)
if total > MAX_DOWNLOAD_SIZE:
output_path.unlink(missing_ok=True)
return None, (
"File too large: exceeded 20 MB "
"during download."
)
await f.write(chunk)
return output_path, None
except asyncio.TimeoutError:
return None, (
f"Download timed out after {DOWNLOAD_TIMEOUT}s."
)
except Exception as exc:
return None, f"Download error: {exc}"
# ------------------------------------------------------------------
# Entry point
# ------------------------------------------------------------------
[docs]
async def run(
url: str,
filename: str | None = None,
subdirectory: str | None = None,
**_kwargs: Any,
) -> str:
"""Execute this tool and return the result.
Args:
url (str): URL string.
filename (str | None): The filename value.
subdirectory (str | None): The subdirectory value.
Returns:
str: Result string.
"""
if not url:
return json.dumps({
"status": "error", "error": "url is required.",
})
try:
url = assert_safe_http_url(url.strip())
except ValueError as exc:
return json.dumps({"status": "error", "error": str(exc)})
parsed = urlparse(url)
if parsed.scheme != "https":
return json.dumps({
"status": "error",
"error": (
f"Invalid URL scheme: '{parsed.scheme or 'none'}'. "
f"Only HTTPS is allowed."
),
})
if not parsed.netloc:
return json.dumps({
"status": "error", "error": "Invalid URL: no domain.",
})
# Determine output directory
if subdirectory:
output_dir, err = _resolve(subdirectory)
if err:
return json.dumps({"status": "error", "error": err})
else:
output_dir = BASE_PATH.resolve()
output_dir.mkdir(parents=True, exist_ok=True)
if _is_youtube(url):
vid = _extract_video_id(url)
logger.info("Starting YouTube download: %s", vid or url)
path, err = await _download_youtube(
url, output_dir, filename,
)
if err:
return json.dumps({
"status": "error", "error": err, "source": "youtube",
})
size = path.stat().st_size
if size > MAX_DOWNLOAD_SIZE:
path.unlink(missing_ok=True)
mb = size / 1024 / 1024
return json.dumps({
"status": "error",
"error": (
f"File too large: {mb:.2f} MB. Removed."
),
"source": "youtube",
})
return json.dumps({
"status": "success",
"message": "YouTube video downloaded.",
"file_path": str(path),
"filename": path.name,
"size": size,
"size_human": f"{size / 1024 / 1024:.2f} MB",
"source": "youtube",
"video_id": vid,
})
# Regular HTTPS download
safe = _sanitize(filename) if filename else _filename_from_url(url)
out_path, err = _resolve(
str(Path(subdirectory or "") / safe),
)
if err:
return json.dumps({"status": "error", "error": err})
logger.info("Starting HTTP download: %s -> %s", url, out_path)
path, err = await _download_http(url, out_path)
if err:
return json.dumps({
"status": "error", "error": err, "source": "http",
})
size = path.stat().st_size
return json.dumps({
"status": "success",
"message": "File downloaded.",
"file_path": str(path),
"filename": path.name,
"size": size,
"size_human": f"{size / 1024 / 1024:.2f} MB",
"source": "http",
})