Source code for url_utils.fetch_media

"""oEmbed and HTML-scraping fetchers for video/audio/GIF platforms."""

from __future__ import annotations

import asyncio
import logging
import re
from typing import Any, Dict, Optional

import aiohttp

from .cache import get_url_cache
from .fetch_common import _TIMEOUT

logger = logging.getLogger(__name__)


[docs] async def get_youtube_content(url: str) -> Optional[Dict[str, Any]]: cache = get_url_cache() key = f"youtube:{url}" cached = cache.get(key) if cached is not None: return cached try: from urllib.parse import quote norm = url.strip() if not norm.startswith("http"): norm = "https://" + norm is_shorts = "/shorts/" in norm.lower() oembed = ( f"https://www.youtube.com/oembed?url={quote(norm, safe='')}" "&format=json" ) async with aiohttp.ClientSession() as s: async with s.get(oembed, timeout=_TIMEOUT) as r: if r.status != 200: return None d = await r.json() result = { "title": d.get("title", "Unknown Title"), "channel_name": d.get("author_name", "Unknown Channel"), "channel_url": d.get("author_url", ""), "is_shorts": is_shorts, "thumbnail_url": d.get("thumbnail_url", ""), } cache.set(key, result) return result except asyncio.TimeoutError: logger.error("Timeout fetching YouTube metadata from %s", url) except Exception: logger.exception("Error fetching YouTube metadata from %s", url) return None
[docs] async def get_spotify_content(url: str) -> Optional[Dict[str, Any]]: cache = get_url_cache() key = f"spotify:{url}" cached = cache.get(key) if cached is not None: return cached try: from urllib.parse import quote norm = url.strip() if not norm.startswith("http"): norm = "https://" + norm ctype = "track" for t in ["track", "album", "playlist", "episode", "artist", "show"]: if f"/{t}/" in norm: ctype = t break oembed = f"https://open.spotify.com/oembed?url={quote(norm, safe='')}" async with aiohttp.ClientSession() as s: async with s.get(oembed, timeout=_TIMEOUT) as r: if r.status != 200: return None d = await r.json() result = { "title": d.get("title", "Unknown"), "type": ctype, "thumbnail_url": d.get("thumbnail_url", ""), } cache.set(key, result) return result except asyncio.TimeoutError: logger.error("Timeout fetching Spotify metadata from %s", url) except Exception: logger.exception("Error fetching Spotify metadata from %s", url) return None
[docs] async def get_soundcloud_content(url: str) -> Optional[Dict[str, Any]]: cache = get_url_cache() key = f"soundcloud:{url}" cached = cache.get(key) if cached is not None: return cached try: from urllib.parse import quote norm = url.strip() if not norm.startswith("http"): norm = "https://" + norm oembed = ( f"https://soundcloud.com/oembed?url={quote(norm, safe='')}" "&format=json" ) async with aiohttp.ClientSession() as s: async with s.get(oembed, timeout=_TIMEOUT) as r: if r.status != 200: return None d = await r.json() result = { "title": d.get("title", "Unknown"), "author_name": d.get("author_name", "Unknown Artist"), "description": d.get("description", ""), "thumbnail_url": d.get("thumbnail_url", ""), } cache.set(key, result) return result except asyncio.TimeoutError: logger.error("Timeout fetching SoundCloud metadata from %s", url) except Exception: logger.exception("Error fetching SoundCloud metadata from %s", url) return None
[docs] async def get_tiktok_content(url: str) -> Optional[Dict[str, Any]]: cache = get_url_cache() key = f"tiktok:{url}" cached = cache.get(key) if cached is not None: return cached try: from urllib.parse import quote norm = url.strip() if not norm.startswith("http"): norm = "https://" + norm oembed = f"https://www.tiktok.com/oembed?url={quote(norm, safe='')}" async with aiohttp.ClientSession() as s: async with s.get(oembed, timeout=_TIMEOUT) as r: if r.status != 200: return None d = await r.json() result = { "title": d.get("title", "TikTok Video"), "author_name": d.get("author_name", "Unknown"), "author_url": d.get("author_url", ""), "thumbnail_url": d.get("thumbnail_url", ""), } cache.set(key, result) return result except asyncio.TimeoutError: logger.error("Timeout fetching TikTok metadata from %s", url) except Exception: logger.exception("Error fetching TikTok metadata from %s", url) return None
[docs] async def get_vimeo_content(url: str) -> Optional[Dict[str, Any]]: cache = get_url_cache() key = f"vimeo:{url}" cached = cache.get(key) if cached is not None: return cached try: from urllib.parse import quote norm = url.strip() if not norm.startswith("http"): norm = "https://" + norm oembed = ( f"https://vimeo.com/api/oembed.json?url={quote(norm, safe='')}" ) async with aiohttp.ClientSession() as s: async with s.get(oembed, timeout=_TIMEOUT) as r: if r.status != 200: return None d = await r.json() result = { "title": d.get("title", "Unknown"), "author_name": d.get("author_name", "Unknown"), "description": d.get("description", ""), "duration": d.get("duration", 0), "thumbnail_url": d.get("thumbnail_url", ""), } cache.set(key, result) return result except asyncio.TimeoutError: logger.error("Timeout fetching Vimeo metadata from %s", url) except Exception: logger.exception("Error fetching Vimeo metadata from %s", url) return None
[docs] async def get_tenor_media_url(tenor_url: str) -> Optional[str]: try: tenor_direct = ( "media.tenor.com" in tenor_url and any( tenor_url.endswith(e) for e in (".mp4", ".webm", ".gif") ) ) if tenor_direct: return tenor_url async with aiohttp.ClientSession() as s: async with s.get(tenor_url, timeout=_TIMEOUT) as r: if r.status != 200: return None html = await r.text() for pat in [ r"(https://media\.tenor\.com/[^\s\"'<>]+\.mp4)", r"(https://media\.tenor\.com/[^\s\"'<>]+\.webm)", r"(https://media\.tenor\.com/[^\s\"'<>]+\.gif)", ]: m = re.search(pat, html) if m: return m.group(1) except Exception: logger.exception("Error extracting Tenor media from %s", tenor_url) return None
[docs] async def get_giphy_media_url(giphy_url: str) -> Optional[str]: try: if "media" in giphy_url and ".giphy.com" in giphy_url: giphy_direct = any( giphy_url.endswith(e) for e in (".mp4", ".webm", ".gif", ".webp") ) if giphy_direct: return giphy_url media_id: str | None = None gifs_m = re.search(r"giphy\.com/gifs/([a-zA-Z0-9-]+)", giphy_url) if gifs_m: media_id = gifs_m.group(1).split("-")[-1] embed_m = re.search(r"giphy\.com/embed/([a-zA-Z0-9]+)", giphy_url) if embed_m: media_id = embed_m.group(1) media_m = re.search( r"media\d*\.giphy\.com/media/([a-zA-Z0-9]+)/", giphy_url ) if media_m: media_id = media_m.group(1) if "gph.is/" in giphy_url: async with aiohttp.ClientSession() as s: async with s.get( giphy_url, timeout=_TIMEOUT, allow_redirects=True ) as r: if r.status == 200: return await get_giphy_media_url(str(r.url)) return None if media_id: return f"https://media.giphy.com/media/{media_id}/giphy.mp4" async with aiohttp.ClientSession() as s: async with s.get(giphy_url, timeout=_TIMEOUT) as r: if r.status != 200: return None html = await r.text() for pat in [ r"(https://media\d*\.giphy\.com/media/" r"[a-zA-Z0-9]+/[a-zA-Z0-9_]+\.mp4)", r"(https://media\d*\.giphy\.com/media/" r"[a-zA-Z0-9]+/giphy\.gif)", ]: m = re.search(pat, html) if m: return m.group(1) except Exception: logger.exception("Error extracting Giphy media from %s", giphy_url) return None