Source code for url_utils.paste

"""Paste-site URL detection and raw text fetching."""

from __future__ import annotations

import asyncio
import logging
import re
from typing import Any, Dict, Optional

import aiohttp

from .cache import get_url_cache

logger = logging.getLogger(__name__)

_PASTE_SITES: list[tuple[re.Pattern, str, Any]] = [
    (
        re.compile(r"(https?://)?(?:www\.)?pastebin\.com/(?:raw/)?([a-zA-Z0-9]+)"),
        "Pastebin",
        lambda m: f"https://pastebin.com/raw/{m.group(2)}",
    ),
    (
        re.compile(r"(https?://)?(?:www\.)?hastebin\.com/(?:raw/)?([a-zA-Z0-9]+)"),
        "Hastebin",
        lambda m: f"https://hastebin.com/raw/{m.group(2)}",
    ),
    (
        re.compile(r"(https?://)?rentry\.co/([a-zA-Z0-9_-]+)(?:/raw)?"),
        "Rentry",
        lambda m: f"https://rentry.co/{m.group(2)}/raw",
    ),
    (
        re.compile(r"(https?://)?dpaste\.org/([a-zA-Z0-9]+)(?:/raw)?"),
        "dpaste",
        lambda m: f"https://dpaste.org/{m.group(2)}/raw",
    ),
    (
        re.compile(
            r"(https?://)?(?:www\.)?ghostbin\.com/paste/([a-zA-Z0-9]+)(?:/raw)?",
        ),
        "Ghostbin",
        lambda m: f"https://ghostbin.com/paste/{m.group(2)}/raw",
    ),
    (
        re.compile(r"(https?://)?paste\.ee/p/([a-zA-Z0-9]+)"),
        "Paste.ee",
        lambda m: f"https://paste.ee/r/{m.group(2)}",
    ),
]


[docs] def is_paste_url(url: str) -> bool: return any(pat.search(url) is not None for pat, _, _ in _PASTE_SITES)
[docs] async def get_paste_content(url: str) -> Optional[Dict[str, Any]]: cache = get_url_cache() key = f"paste:{url}" cached = cache.get(key) if cached is not None: return cached for pat, site_name, raw_builder in _PASTE_SITES: m = pat.search(url) if not m: continue raw_url = raw_builder(m) try: async with aiohttp.ClientSession() as s: async with s.get( raw_url, timeout=aiohttp.ClientTimeout(total=10), headers={"User-Agent": "StargazerBot/1.0"}, ) as r: if r.status != 200: return None text = await r.text() if len(text) > 15_000: text = text[:15_000] + "\n... (truncated)" result = { "site": site_name, "paste_id": m.group(2), "content": text, } cache.set(key, result) return result except asyncio.TimeoutError: logger.error("Timeout fetching paste from %s", raw_url) except Exception: logger.exception("Error fetching paste from %s", raw_url) return None return None