Source code for url_utils.fetch_dev

"""Developer / documentation content fetchers (GitHub, arXiv, SO, NVD, Gist)."""

from __future__ import annotations

import asyncio
import logging
import re
from typing import Any, Dict, Optional

import aiohttp

from .fetch_common import _HEADERS, _LONG_TIMEOUT, _TIMEOUT

logger = logging.getLogger(__name__)


[docs] async def get_github_content(url: str) -> Optional[Dict[str, Any]]: try: import base64 m = re.search( r"github\.com/([^/]+)/([^/]+)(?:/(issues|pull)/(\d+))?", url ) if not m: return None owner, repo, rtype, number = m.groups() repo = repo.split(".git")[0].split("?")[0].split("#")[0].rstrip("/") gh_headers = {"Accept": "application/vnd.github+json"} async with aiohttp.ClientSession() as s: if rtype in ("issues", "pull"): api = ( f"https://api.github.com/repos/{owner}/{repo}" f"/issues/{number}" ) async with s.get( api, timeout=_TIMEOUT, headers=gh_headers ) as r: if r.status != 200: return None d = await r.json() return { "type": "pr" if "pull_request" in d else "issue", "number": d.get("number"), "title": d.get("title", "No title"), "state": d.get("state", "unknown"), "body_preview": (d.get("body") or "")[:500], } repo_api = f"https://api.github.com/repos/{owner}/{repo}" readme_api = f"{repo_api}/readme" async with s.get( repo_api, timeout=_TIMEOUT, headers=gh_headers ) as r: if r.status != 200: return None rd = await r.json() result: Dict[str, Any] = { "type": "repo", "owner": owner, "repo": repo, "description": rd.get("description") or "No description", "language": rd.get("language") or "Unknown", "stars": rd.get("stargazers_count", 0), "forks": rd.get("forks_count", 0), "readme_preview": "", } try: async with s.get( readme_api, timeout=_TIMEOUT, headers=gh_headers ) as r: if r.status == 200: rdata = await r.json() if ( rdata.get("encoding") == "base64" and "content" in rdata ): result["readme_preview"] = base64.b64decode( rdata["content"], ).decode("utf-8", errors="ignore")[:10_000] except Exception: pass return result except asyncio.TimeoutError: logger.error("Timeout fetching GitHub content from %s", url) except Exception: logger.exception("Error fetching GitHub content from %s", url) return None
[docs] async def get_arxiv_content(url: str) -> Optional[Dict[str, Any]]: try: import xml.etree.ElementTree as ET m = re.search(r"arxiv\.org/(?:abs|pdf)/([\d\.]+v?\d*)", url) if not m: return None aid = m.group(1).replace(".pdf", "") api = f"http://export.arxiv.org/api/query?id_list={aid}" ns = { "atom": "http://www.w3.org/2005/Atom", "arxiv": "http://arxiv.org/schemas/atom", } async with aiohttp.ClientSession() as s: async with s.get(api, timeout=_TIMEOUT) as r: if r.status != 200: return None xml = await r.text() root = ET.fromstring(xml) entry = root.find("atom:entry", ns) if entry is None: return None title_el = entry.find("atom:title", ns) title = ( title_el.text.strip().replace("\n", " ") if title_el is not None else "No title" ) authors = [] for a in entry.findall("atom:author", ns)[:3]: n = a.find("atom:name", ns) if n is not None: authors.append(n.text) total = len(entry.findall("atom:author", ns)) summ = entry.find("atom:summary", ns) abstract = ( summ.text.strip().replace("\n", " ")[:600] if summ is not None else "No abstract" ) cat = entry.find("arxiv:primary_category", ns) return { "arxiv_id": aid, "title": title, "authors": authors, "author_count": total, "abstract": abstract, "category": ( cat.get("term") if cat is not None else "Unknown" ), } except asyncio.TimeoutError: logger.error("Timeout fetching arXiv from %s", url) except Exception: logger.exception("Error fetching arXiv from %s", url) return None
[docs] async def get_gist_content(url: str) -> Optional[Dict[str, Any]]: try: m = re.search(r"gist\.github\.com/(?:[^/]+/)?([a-f0-9]+)", url) if not m: return None gid = m.group(1) api = f"https://api.github.com/gists/{gid}" async with aiohttp.ClientSession() as s: gist_headers = { **_HEADERS, "Accept": "application/vnd.github+json", } async with s.get(api, timeout=_TIMEOUT, headers=gist_headers) as r: if r.status != 200: return None d = await r.json() files = [] for fname, fdata in d.get("files", {}).items(): content = fdata.get("content", "") if len(content) > 15_000: content = content[:15_000] + "\n... (truncated)" files.append({ "name": fname, "language": fdata.get("language", "Unknown"), "content": content, }) return { "id": gid, "description": d.get("description") or "No description", "owner": d.get("owner", {}).get("login", "anonymous"), "files": files, "public": d.get("public", True), } except asyncio.TimeoutError: logger.error("Timeout fetching Gist from %s", url) except Exception: logger.exception("Error fetching Gist from %s", url) return None
[docs] async def get_stackoverflow_content(url: str) -> Optional[Dict[str, Any]]: try: import html as html_mod m = re.search( r"(stackoverflow\.com|superuser\.com|serverfault\.com|" r"askubuntu\.com|([a-z]+)\.stackexchange\.com)/questions/(\d+)", url, ) if not m: return None full_site, subdomain, qid = m.group(1), m.group(2), m.group(3) site_map = { "stackoverflow.com": "stackoverflow", "superuser.com": "superuser", "serverfault.com": "serverfault", "askubuntu.com": "askubuntu", } api_site = site_map.get(full_site, subdomain or "stackoverflow") api = ( f"https://api.stackexchange.com/2.3/questions/{qid}" f"?order=desc&sort=votes&site={api_site}&filter=withbody" ) async with aiohttp.ClientSession() as s: async with s.get(api, timeout=_TIMEOUT, headers=_HEADERS) as r: if r.status != 200: return None d = await r.json() items = d.get("items", []) if not items: return None q = items[0] body = re.sub(r"<[^>]+>", " ", q.get("body", "")) body = html_mod.unescape(body) body = " ".join(body.split())[:1500] result: Dict[str, Any] = { "site": api_site, "question_id": int(qid), "title": html_mod.unescape(q.get("title", "No title")), "body": body, "score": q.get("score", 0), "answers": q.get("answer_count", 0), "top_answer": None, "tags": q.get("tags", [])[:5], } if result["answers"] > 0: ans_api = ( f"https://api.stackexchange.com/2.3/questions/" f"{qid}/answers?order=desc&sort=votes" f"&site={api_site}&filter=withbody&pagesize=1" ) async with s.get( ans_api, timeout=_TIMEOUT, headers=_HEADERS ) as ar: if ar.status == 200: ad = await ar.json() ai = ad.get("items", []) if ai: ab = re.sub( r"<[^>]+>", " ", ai[0].get("body", "") ) ab = html_mod.unescape(ab) ab = " ".join(ab.split())[:1000] result["top_answer"] = { "body": ab, "score": ai[0].get("score", 0), "is_accepted": ai[0].get( "is_accepted", False ), } return result except asyncio.TimeoutError: logger.error("Timeout fetching SO from %s", url) except Exception: logger.exception("Error fetching SO from %s", url) return None
[docs] async def get_nvd_cve_content(url: str) -> Optional[Dict[str, Any]]: try: m = re.search( r"nvd\.nist\.gov/vuln/detail/(CVE-\d{4}-\d{4,})", url, re.IGNORECASE, ) if not m: return None cve_id = m.group(1).upper() api = ( f"https://services.nvd.nist.gov/rest/json/cves/2.0" f"?cveId={cve_id}" ) async with aiohttp.ClientSession() as s: async with s.get( api, timeout=_LONG_TIMEOUT, headers=_HEADERS ) as r: if r.status != 200: return None d = await r.json() vulns = d.get("vulnerabilities", []) if not vulns: return None cve = vulns[0].get("cve", {}) desc = "" for dd in cve.get("descriptions", []): if dd.get("lang") == "en": desc = dd.get("value", "") break if not desc and cve.get("descriptions"): desc = cve["descriptions"][0].get("value", "") metrics = cve.get("metrics", {}) v3s, v3sev = None, None for key in ("cvssMetricV31", "cvssMetricV30"): entries = metrics.get(key, []) if entries: cd = entries[0].get("cvssData", {}) v3s = cd.get("baseScore") v3sev = cd.get("baseSeverity") break v2s, v2sev = None, None v2 = metrics.get("cvssMetricV2", []) if v2: cd = v2[0].get("cvssData", {}) v2s = cd.get("baseScore") v2sev = v2[0].get("baseSeverity") cwes: list[str] = [] for w in cve.get("weaknesses", []): for wd in w.get("description", []): if wd.get("lang") == "en": v = wd.get("value", "") if v and v not in cwes: cwes.append(v) refs = [ ref.get("url", "") for ref in cve.get("references", [])[:5] if ref.get("url") ] products: list[str] = [] for cfg in cve.get("configurations", [])[:3]: for node in cfg.get("nodes", [])[:3]: for cm in node.get("cpeMatch", [])[:5]: parts = cm.get("criteria", "").split(":") if len(parts) >= 5: ps = f"{parts[3]}:{parts[4]}" if len(parts) > 5 and parts[5] != "*": ps += f" ({parts[5]})" if ps not in products: products.append(ps) if len(products) >= 5: break return { "cve_id": cve_id, "description": desc[:1500] or "No description available", "published": cve.get("published", "Unknown"), "last_modified": cve.get("lastModified", "Unknown"), "cvss_v3_score": v3s, "cvss_v3_severity": v3sev, "cvss_v2_score": v2s, "cvss_v2_severity": v2sev, "cwe_ids": cwes[:5], "references": refs, "affected_products": products, } except asyncio.TimeoutError: logger.error("Timeout fetching NVD CVE from %s", url) except Exception: logger.exception("Error fetching NVD CVE from %s", url) return None