Source code for url_utils.fetch_social

"""Social / thread content fetchers (Twitter, Reddit, Bluesky, Wikipedia)."""

from __future__ import annotations

import asyncio
import logging
import re
from typing import Any, Dict, Optional

import aiohttp
from redd import AsyncRedd

from .cache import get_url_cache
from .fetch_common import _HEADERS, _LONG_TIMEOUT, _TIMEOUT

logger = logging.getLogger(__name__)


[docs] async def get_tweet_content(tweet_url: str) -> Optional[Dict[str, Any]]: cache = get_url_cache() key = f"tweet:{tweet_url}" cached = cache.get(key) if cached is not None: return cached try: api_url = re.sub( r"(https?://)?(www\.)?(x|twitter)\.com/", "https://api.fxtwitter.com/", tweet_url, ) async with aiohttp.ClientSession() as s: async with s.get(api_url, timeout=_TIMEOUT) as r: if r.status != 200: return None data = await r.json() tweet = data.get("tweet") if tweet is None: return None result: Dict[str, Any] = { "author_name": tweet.get("author", {}).get( "name", "Unknown" ), "author_handle": tweet.get("author", {}).get( "screen_name", "unknown" ), "text": tweet.get("text", ""), "media_count": 0, "media_types": [], "is_thread": False, "thread_tweets": [], } media = tweet.get("media") if isinstance(media, dict): photos = media.get("photos") if isinstance(photos, list) and photos: result["media_count"] += len(photos) result["media_types"].append( f"{len(photos)} photo(s)" ) videos = media.get("videos") if isinstance(videos, list) and videos: result["media_count"] += len(videos) result["media_types"].append( f"{len(videos)} video(s)" ) if media.get("external"): result["media_types"].append("external link") replies = tweet.get("replies") if isinstance(replies, int) and replies > 0: result["is_thread"] = True thread = tweet.get("thread") if isinstance(thread, list) and thread: result["is_thread"] = True result["thread_tweets"] = [ { "author": t.get("author", {}).get( "screen_name", "unknown" ), "text": t.get("text", ""), } for t in thread ] cache.set(key, result) return result except asyncio.TimeoutError: logger.error("Timeout fetching tweet from %s", tweet_url) except Exception: logger.exception("Error fetching tweet from %s", tweet_url) return None
[docs] async def get_reddit_content(url: str) -> Optional[Dict[str, Any]]: try: permalink = url.rstrip("/") for old_host in ("old.reddit.com", "new.reddit.com"): permalink = permalink.replace(old_host, "www.reddit.com") if "//reddit.com" in permalink: permalink = permalink.replace("//reddit.com", "//www.reddit.com") async with AsyncRedd(timeout=10.0) as client: detail = await client.get_post(permalink) return { "subreddit": detail.subreddit or "unknown", "title": detail.title or "No title", "selftext": (detail.body or "")[:800], "author": detail.author or "unknown", "score": detail.score or 0, "top_comments": [ { "author": c.author or "unknown", "body": (c.body or "")[:200], "score": c.score or 0, } for c in (detail.comments or [])[:3] ], } except Exception: logger.exception("Error fetching Reddit from %s", url) return None
[docs] async def get_wikipedia_content(url: str) -> Optional[Dict[str, Any]]: try: from urllib.parse import quote, unquote m = re.search( r"(?:https?://)?([a-z]{2,3}\.)?wikipedia\.org/wiki/([^#\s]+)", url, ) if not m: return None lang = (m.group(1) or "en.").rstrip(".") title = unquote(m.group(2)) api = ( f"https://{lang}.wikipedia.org/w/api.php" f"?action=query&titles={quote(title)}&prop=extracts" "&explaintext=true&format=json" ) async with aiohttp.ClientSession() as s: async with s.get( api, timeout=_LONG_TIMEOUT, headers=_HEADERS ) as r: if r.status != 200: return None d = await r.json() pages = d.get("query", {}).get("pages", {}) if not pages: return None pid = next(iter(pages)) if pid == "-1": return None pg = pages[pid] return { "title": pg.get("title", title), "extract": pg.get("extract", ""), "language": lang, } except asyncio.TimeoutError: logger.error("Timeout fetching Wikipedia from %s", url) except Exception: logger.exception("Error fetching Wikipedia from %s", url) return None
[docs] async def get_bluesky_content(url: str) -> Optional[Dict[str, Any]]: try: from urllib.parse import quote m = re.search(r"bsky\.app/profile/([^/]+)/post/([a-z0-9]+)", url) if not m: return None handle, rkey = m.group(1), m.group(2) async with aiohttp.ClientSession() as s: did = handle if not handle.startswith("did:"): resolve = ( "https://public.api.bsky.app/xrpc/" "com.atproto.identity.resolveHandle" f"?handle={quote(handle)}" ) async with s.get( resolve, timeout=_TIMEOUT, headers=_HEADERS ) as r: if r.status != 200: return None did = (await r.json()).get("did") if not did: return None at_uri = f"at://{did}/app.bsky.feed.post/{rkey}" thread_url = ( "https://public.api.bsky.app/xrpc/" "app.bsky.feed.getPostThread" f"?uri={quote(at_uri)}&depth=0" ) async with s.get( thread_url, timeout=_TIMEOUT, headers=_HEADERS ) as r: if r.status != 200: return None d = await r.json() post = d.get("thread", {}).get("post", {}) rec = post.get("record", {}) author = post.get("author", {}) embed = rec.get("embed", {}) return { "author_handle": author.get("handle", "unknown"), "author_name": author.get( "displayName", author.get("handle", "Unknown") ), "text": rec.get("text", ""), "likes": post.get("likeCount", 0), "reposts": post.get("repostCount", 0), "replies": post.get("replyCount", 0), "has_media": bool( embed and embed.get("$type", "").startswith( "app.bsky.embed" ) ), } except asyncio.TimeoutError: logger.error("Timeout fetching Bluesky from %s", url) except Exception: logger.exception("Error fetching Bluesky from %s", url) return None