Source code for url_utils.fetch_social

"""Social / thread content fetchers (Twitter, Reddit, Bluesky, Wikipedia)."""

from __future__ import annotations

import asyncio
import html as _html
import logging
import os
import re
import time
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
from xml.etree import ElementTree as ET

import aiohttp

from .cache import get_url_cache
from .fetch_common import _HEADERS, _LONG_TIMEOUT, _TIMEOUT

logger = logging.getLogger(__name__)


[docs] async def get_tweet_content(tweet_url: str) -> Optional[Dict[str, Any]]: cache = get_url_cache() key = f"tweet:{tweet_url}" cached = cache.get(key) if cached is not None: return cached try: api_url = re.sub( r"(https?://)?(www\.)?(x|twitter)\.com/", "https://api.fxtwitter.com/", tweet_url, ) async with aiohttp.ClientSession() as s: async with s.get(api_url, timeout=_TIMEOUT) as r: if r.status != 200: return None data = await r.json() tweet = data.get("tweet") if tweet is None: return None result: Dict[str, Any] = { "author_name": tweet.get("author", {}).get("name", "Unknown"), "author_handle": tweet.get("author", {}).get( "screen_name", "unknown" ), "text": tweet.get("text", ""), "media_count": 0, "media_types": [], "is_thread": False, "thread_tweets": [], } media = tweet.get("media") if isinstance(media, dict): photos = media.get("photos") if isinstance(photos, list) and photos: result["media_count"] += len(photos) result["media_types"].append(f"{len(photos)} photo(s)") videos = media.get("videos") if isinstance(videos, list) and videos: result["media_count"] += len(videos) result["media_types"].append(f"{len(videos)} video(s)") if media.get("external"): result["media_types"].append("external link") replies = tweet.get("replies") if isinstance(replies, int) and replies > 0: result["is_thread"] = True thread = tweet.get("thread") if isinstance(thread, list) and thread: result["is_thread"] = True result["thread_tweets"] = [ { "author": t.get("author", {}).get("screen_name", "unknown"), "text": t.get("text", ""), } for t in thread ] cache.set(key, result) return result except asyncio.TimeoutError: logger.error("Timeout fetching tweet from %s", tweet_url) except Exception: logger.exception("Error fetching tweet from %s", tweet_url) return None
def _get_reddit_permalink(url: str) -> str: """Normalize a reddit URL or path for ``redd.AsyncRedd.get_post``. That API prepends ``https://www.reddit.com`` and appends ``.json``; it expects a path like ``/r/Python/comments/abc123/title``, not a full URL. """ s = url.strip().rstrip("/") if s.startswith(("http://", "https://")): path = urlparse(s).path or "/" return path.rstrip("/") or "/" if re.match( r"^(?:www\.|old\.|new\.)?reddit\.com/r/", s, re.IGNORECASE, ): path = urlparse("https://" + s).path or "/" return path.rstrip("/") or "/" if s.startswith("/"): return s.rstrip("/") or "/" return ("/" + s).rstrip("/") async def _get_reddit_oauth_token() -> Optional[str]: cache = get_url_cache() logger.debug("Checking cache for Reddit OAuth token") cached = cache.get("reddit_access_token") if isinstance(cached, dict) and cached.get("expires_at", 0) > time.time() + 60: logger.debug("Using cached Reddit OAuth token") return cached["token"] client_id = os.environ.get("REDDIT_CLIENT_ID") client_secret = os.environ.get("REDDIT_CLIENT_SECRET") if not client_id or not client_secret: logger.error("Missing REDDIT_CLIENT_ID or REDDIT_CLIENT_SECRET environment variables") return None logger.info("Fetching new Reddit OAuth token") try: auth = aiohttp.BasicAuth(client_id, client_secret) headers = { "User-Agent": "script:stargazer-v3-bot:v1.0 (internal)", } data = { "grant_type": "client_credentials", } async with aiohttp.ClientSession() as session: async with session.post( "https://www.reddit.com/api/v1/access_token", auth=auth, data=data, headers=headers, timeout=10.0, ) as response: if response.status != 200: body = await response.text() logger.error( "Failed to fetch Reddit OAuth token. HTTP %d: %s", response.status, body, ) return None res_data = await response.json() token = res_data.get("access_token") expires_in = res_data.get("expires_in", 3600) if not token: logger.error("Reddit token response missing access_token: %s", res_data) return None cache.set("reddit_access_token", { "token": token, "expires_at": time.time() + expires_in, }) logger.info("Successfully fetched and cached new Reddit OAuth token") return token except Exception: logger.exception("Exception while fetching Reddit OAuth token") return None # A normal browser UA — Reddit blocks empty/script UAs harder, and serves the # unauthenticated .rss endpoint to browser-shaped clients. _REDDIT_RSS_UA = ( "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/124.0 Safari/537.36" ) _ATOM = "{http://www.w3.org/2005/Atom}" _TAG_RE = re.compile(r"<[^>]+>") _WS_RE = re.compile(r"\s+") def _html_to_text(raw: str) -> str: """Collapse Reddit's HTML body (``<div class="md">…``) to plain text.""" if not raw: return "" return _WS_RE.sub(" ", _html.unescape(_TAG_RE.sub(" ", raw))).strip() async def _fetch_reddit_via_oauth( permalink: str, token: str ) -> Optional[Dict[str, Any]]: """Rich fetch via the authenticated ``oauth.reddit.com`` .json API.""" api_url = f"https://oauth.reddit.com{permalink}.json?raw_json=1" headers = { "Authorization": f"Bearer {token}", "User-Agent": "script:stargazer-v3-bot:v1.0 (internal)", } logger.info("Fetching Reddit content via OAuth: %s", api_url) async with aiohttp.ClientSession() as session: async with session.get(api_url, headers=headers, timeout=_TIMEOUT) as r: logger.info("Reddit fetch HTTP status: %d", r.status) if r.status != 200: body = await r.text() logger.error("Failed to fetch Reddit thread. HTTP %d: %s", r.status, body[:300]) return None data = await r.json() if not isinstance(data, list) or len(data) < 2: logger.error("Unexpected Reddit JSON response structure") return None post_children = data[0].get("data", {}).get("children", []) if not post_children: return None post_data = post_children[0].get("data", {}) top_comments: List[Dict[str, Any]] = [] for c in data[1].get("data", {}).get("children", []): if c.get("kind") == "t1": c_data = c.get("data", {}) top_comments.append({ "author": c_data.get("author") or "unknown", "body": (c_data.get("body") or "")[:200], "score": c_data.get("score") or 0, }) if len(top_comments) >= 3: break return { "subreddit": post_data.get("subreddit") or "unknown", "title": post_data.get("title") or "No title", "selftext": (post_data.get("selftext") or "")[:800], "author": post_data.get("author") or "unknown", "score": post_data.get("score") or 0, "top_comments": top_comments, } def _rss_author(el: "ET.Element") -> str: a = el.find(f"{_ATOM}author") name = (a.findtext(f"{_ATOM}name") if a is not None else "") or "" return name.lstrip("/").removeprefix("u/") or "unknown" async def _fetch_reddit_via_rss(permalink: str) -> Optional[Dict[str, Any]]: """Unauthenticated fallback via Reddit's public ``.rss`` Atom feed. Reddit 403-blocks the ``.json`` endpoints from datacenter/Tor IPs without OAuth, but still serves ``.rss`` to a normal browser UA. For a post permalink, ``{permalink}/.rss`` returns the submission as the first entry followed by its comments. Scores are not exposed via RSS (returned as ``None``). """ feed_url = f"https://www.reddit.com{permalink}/.rss" try: async with aiohttp.ClientSession() as session: async with session.get( feed_url, headers={"User-Agent": _REDDIT_RSS_UA}, timeout=_TIMEOUT ) as r: if r.status != 200: logger.warning("Reddit RSS fetch failed HTTP %d for %s", r.status, feed_url) return None body = await r.text() except Exception: logger.exception("Error fetching Reddit RSS from %s", feed_url) return None try: root = ET.fromstring(body) except ET.ParseError: logger.warning("Reddit RSS parse error for %s", feed_url) return None entries = root.findall(f"{_ATOM}entry") if not entries: return None post = entries[0] m = re.search(r"/r/([^/]+)/", permalink + "/") subreddit = m.group(1) if m else "unknown" title = (root.findtext(f"{_ATOM}title") or post.findtext(f"{_ATOM}title") or "No title").strip() # Reddit RSS titles are "Post Title : subreddit" — drop the trailing subreddit. if title.endswith(f" : {subreddit}"): title = title[: -len(f" : {subreddit}")].strip() top_comments: List[Dict[str, Any]] = [] if "/comments/" in permalink: for e in entries[1:]: # entries after the submission are comments txt = _html_to_text(e.findtext(f"{_ATOM}content") or "") if not txt: continue top_comments.append({"author": _rss_author(e), "body": txt[:200], "score": None}) if len(top_comments) >= 3: break else: # subreddit / user feed: list recent post titles instead of comments for e in entries[:3]: top_comments.append({ "author": _rss_author(e), "body": (e.findtext(f"{_ATOM}title") or "").strip()[:200], "score": None, }) return { "subreddit": subreddit, "title": title, "selftext": _html_to_text(post.findtext(f"{_ATOM}content") or "")[:800], "author": _rss_author(post), "score": None, # not exposed via RSS "top_comments": top_comments, "source": "rss", }
[docs] async def get_reddit_content(url: str) -> Optional[Dict[str, Any]]: """Fetch a Reddit post's content + top comments. Prefers the authenticated ``oauth.reddit.com`` .json API when ``REDDIT_CLIENT_ID`` / ``REDDIT_CLIENT_SECRET`` are configured (richest data, including scores). Without credentials Reddit 403-blocks the .json endpoints from this host, so it falls back to the public, unauthenticated ``.rss`` feed (no scores, best-effort comments) so Reddit links still resolve. """ try: permalink = url.rstrip("/") for old_host in ("old.reddit.com", "new.reddit.com"): permalink = permalink.replace(old_host, "www.reddit.com") if "//reddit.com" in permalink: permalink = permalink.replace("//reddit.com", "//www.reddit.com") permalink = _get_reddit_permalink(permalink) logger.debug("Normalized Reddit URL to permalink: %s", permalink) cache = get_url_cache() cache_key = f"reddit:{permalink}" cached = cache.get(cache_key) if cached is not None: return cached result: Optional[Dict[str, Any]] = None token = await _get_reddit_oauth_token() if token: result = await _fetch_reddit_via_oauth(permalink, token) if result is None: logger.info("Reddit OAuth fetch failed; falling back to public RSS") else: logger.info("No Reddit OAuth credentials; using public RSS fallback") if result is None: result = await _fetch_reddit_via_rss(permalink) if result is not None: cache.set(cache_key, result) return result except Exception: logger.exception("Error fetching Reddit from %s", url) return None
[docs] async def get_wikipedia_content(url: str) -> Optional[Dict[str, Any]]: try: from urllib.parse import quote, unquote m = re.search( r"(?:https?://)?([a-z]{2,3}\.)?wikipedia\.org/wiki/([^#\s]+)", url, ) if not m: return None lang = (m.group(1) or "en.").rstrip(".") title = unquote(m.group(2)) api = ( f"https://{lang}.wikipedia.org/w/api.php" f"?action=query&titles={quote(title)}&prop=extracts" "&explaintext=true&format=json" ) async with aiohttp.ClientSession() as s: async with s.get(api, timeout=_LONG_TIMEOUT, headers=_HEADERS) as r: if r.status != 200: return None d = await r.json() pages = d.get("query", {}).get("pages", {}) if not pages: return None pid = next(iter(pages)) if pid == "-1": return None pg = pages[pid] return { "title": pg.get("title", title), "extract": pg.get("extract", ""), "language": lang, } except asyncio.TimeoutError: logger.error("Timeout fetching Wikipedia from %s", url) except Exception: logger.exception("Error fetching Wikipedia from %s", url) return None
[docs] async def get_bluesky_content(url: str) -> Optional[Dict[str, Any]]: try: from urllib.parse import quote m = re.search(r"bsky\.app/profile/([^/]+)/post/([a-z0-9]+)", url) if not m: return None handle, rkey = m.group(1), m.group(2) async with aiohttp.ClientSession() as s: did = handle if not handle.startswith("did:"): resolve = ( "https://public.api.bsky.app/xrpc/" "com.atproto.identity.resolveHandle" f"?handle={quote(handle)}" ) async with s.get(resolve, timeout=_TIMEOUT, headers=_HEADERS) as r: if r.status != 200: return None did = (await r.json()).get("did") if not did: return None at_uri = f"at://{did}/app.bsky.feed.post/{rkey}" thread_url = ( "https://public.api.bsky.app/xrpc/" "app.bsky.feed.getPostThread" f"?uri={quote(at_uri)}&depth=0" ) async with s.get(thread_url, timeout=_TIMEOUT, headers=_HEADERS) as r: if r.status != 200: return None d = await r.json() post = d.get("thread", {}).get("post", {}) rec = post.get("record", {}) author = post.get("author", {}) embed = rec.get("embed", {}) return { "author_handle": author.get("handle", "unknown"), "author_name": author.get( "displayName", author.get("handle", "Unknown") ), "text": rec.get("text", ""), "likes": post.get("likeCount", 0), "reposts": post.get("repostCount", 0), "replies": post.get("replyCount", 0), "has_media": bool( embed and embed.get("$type", "").startswith("app.bsky.embed") ), } except asyncio.TimeoutError: logger.error("Timeout fetching Bluesky from %s", url) except Exception: logger.exception("Error fetching Bluesky from %s", url) return None