Source code for url_utils.fetch_social
"""Social / thread content fetchers (Twitter, Reddit, Bluesky, Wikipedia)."""
from __future__ import annotations
import asyncio
import logging
import re
from typing import Any, Dict, Optional
import aiohttp
from redd import AsyncRedd
from .cache import get_url_cache
from .fetch_common import _HEADERS, _LONG_TIMEOUT, _TIMEOUT
logger = logging.getLogger(__name__)
[docs]
async def get_tweet_content(tweet_url: str) -> Optional[Dict[str, Any]]:
cache = get_url_cache()
key = f"tweet:{tweet_url}"
cached = cache.get(key)
if cached is not None:
return cached
try:
api_url = re.sub(
r"(https?://)?(www\.)?(x|twitter)\.com/",
"https://api.fxtwitter.com/",
tweet_url,
)
async with aiohttp.ClientSession() as s:
async with s.get(api_url, timeout=_TIMEOUT) as r:
if r.status != 200:
return None
data = await r.json()
tweet = data.get("tweet")
if tweet is None:
return None
result: Dict[str, Any] = {
"author_name": tweet.get("author", {}).get(
"name", "Unknown"
),
"author_handle": tweet.get("author", {}).get(
"screen_name", "unknown"
),
"text": tweet.get("text", ""),
"media_count": 0,
"media_types": [],
"is_thread": False,
"thread_tweets": [],
}
media = tweet.get("media")
if isinstance(media, dict):
photos = media.get("photos")
if isinstance(photos, list) and photos:
result["media_count"] += len(photos)
result["media_types"].append(
f"{len(photos)} photo(s)"
)
videos = media.get("videos")
if isinstance(videos, list) and videos:
result["media_count"] += len(videos)
result["media_types"].append(
f"{len(videos)} video(s)"
)
if media.get("external"):
result["media_types"].append("external link")
replies = tweet.get("replies")
if isinstance(replies, int) and replies > 0:
result["is_thread"] = True
thread = tweet.get("thread")
if isinstance(thread, list) and thread:
result["is_thread"] = True
result["thread_tweets"] = [
{
"author": t.get("author", {}).get(
"screen_name", "unknown"
),
"text": t.get("text", ""),
}
for t in thread
]
cache.set(key, result)
return result
except asyncio.TimeoutError:
logger.error("Timeout fetching tweet from %s", tweet_url)
except Exception:
logger.exception("Error fetching tweet from %s", tweet_url)
return None
[docs]
async def get_reddit_content(url: str) -> Optional[Dict[str, Any]]:
try:
permalink = url.rstrip("/")
for old_host in ("old.reddit.com", "new.reddit.com"):
permalink = permalink.replace(old_host, "www.reddit.com")
if "//reddit.com" in permalink:
permalink = permalink.replace("//reddit.com", "//www.reddit.com")
async with AsyncRedd(timeout=10.0) as client:
detail = await client.get_post(permalink)
return {
"subreddit": detail.subreddit or "unknown",
"title": detail.title or "No title",
"selftext": (detail.body or "")[:800],
"author": detail.author or "unknown",
"score": detail.score or 0,
"top_comments": [
{
"author": c.author or "unknown",
"body": (c.body or "")[:200],
"score": c.score or 0,
}
for c in (detail.comments or [])[:3]
],
}
except Exception:
logger.exception("Error fetching Reddit from %s", url)
return None
[docs]
async def get_wikipedia_content(url: str) -> Optional[Dict[str, Any]]:
try:
from urllib.parse import quote, unquote
m = re.search(
r"(?:https?://)?([a-z]{2,3}\.)?wikipedia\.org/wiki/([^#\s]+)",
url,
)
if not m:
return None
lang = (m.group(1) or "en.").rstrip(".")
title = unquote(m.group(2))
api = (
f"https://{lang}.wikipedia.org/w/api.php"
f"?action=query&titles={quote(title)}&prop=extracts"
"&explaintext=true&format=json"
)
async with aiohttp.ClientSession() as s:
async with s.get(
api, timeout=_LONG_TIMEOUT, headers=_HEADERS
) as r:
if r.status != 200:
return None
d = await r.json()
pages = d.get("query", {}).get("pages", {})
if not pages:
return None
pid = next(iter(pages))
if pid == "-1":
return None
pg = pages[pid]
return {
"title": pg.get("title", title),
"extract": pg.get("extract", ""),
"language": lang,
}
except asyncio.TimeoutError:
logger.error("Timeout fetching Wikipedia from %s", url)
except Exception:
logger.exception("Error fetching Wikipedia from %s", url)
return None
[docs]
async def get_bluesky_content(url: str) -> Optional[Dict[str, Any]]:
try:
from urllib.parse import quote
m = re.search(r"bsky\.app/profile/([^/]+)/post/([a-z0-9]+)", url)
if not m:
return None
handle, rkey = m.group(1), m.group(2)
async with aiohttp.ClientSession() as s:
did = handle
if not handle.startswith("did:"):
resolve = (
"https://public.api.bsky.app/xrpc/"
"com.atproto.identity.resolveHandle"
f"?handle={quote(handle)}"
)
async with s.get(
resolve, timeout=_TIMEOUT, headers=_HEADERS
) as r:
if r.status != 200:
return None
did = (await r.json()).get("did")
if not did:
return None
at_uri = f"at://{did}/app.bsky.feed.post/{rkey}"
thread_url = (
"https://public.api.bsky.app/xrpc/"
"app.bsky.feed.getPostThread"
f"?uri={quote(at_uri)}&depth=0"
)
async with s.get(
thread_url, timeout=_TIMEOUT, headers=_HEADERS
) as r:
if r.status != 200:
return None
d = await r.json()
post = d.get("thread", {}).get("post", {})
rec = post.get("record", {})
author = post.get("author", {})
embed = rec.get("embed", {})
return {
"author_handle": author.get("handle", "unknown"),
"author_name": author.get(
"displayName", author.get("handle", "Unknown")
),
"text": rec.get("text", ""),
"likes": post.get("likeCount", 0),
"reposts": post.get("repostCount", 0),
"replies": post.get("replyCount", 0),
"has_media": bool(
embed
and embed.get("$type", "").startswith(
"app.bsky.embed"
)
),
}
except asyncio.TimeoutError:
logger.error("Timeout fetching Bluesky from %s", url)
except Exception:
logger.exception("Error fetching Bluesky from %s", url)
return None