"""Developer / documentation content fetchers (GitHub, arXiv, SO, NVD, Gist)."""
from __future__ import annotations
import asyncio
import logging
import re
from typing import Any, Dict, Optional
import aiohttp
from .fetch_common import _HEADERS, _LONG_TIMEOUT, _TIMEOUT
logger = logging.getLogger(__name__)
[docs]
async def get_github_content(url: str) -> Optional[Dict[str, Any]]:
try:
import base64
m = re.search(
r"github\.com/([^/]+)/([^/]+)(?:/(issues|pull)/(\d+))?", url
)
if not m:
return None
owner, repo, rtype, number = m.groups()
repo = repo.split(".git")[0].split("?")[0].split("#")[0].rstrip("/")
gh_headers = {"Accept": "application/vnd.github+json"}
async with aiohttp.ClientSession() as s:
if rtype in ("issues", "pull"):
api = (
f"https://api.github.com/repos/{owner}/{repo}"
f"/issues/{number}"
)
async with s.get(
api, timeout=_TIMEOUT, headers=gh_headers
) as r:
if r.status != 200:
return None
d = await r.json()
return {
"type": "pr" if "pull_request" in d else "issue",
"number": d.get("number"),
"title": d.get("title", "No title"),
"state": d.get("state", "unknown"),
"body_preview": (d.get("body") or "")[:500],
}
repo_api = f"https://api.github.com/repos/{owner}/{repo}"
readme_api = f"{repo_api}/readme"
async with s.get(
repo_api, timeout=_TIMEOUT, headers=gh_headers
) as r:
if r.status != 200:
return None
rd = await r.json()
result: Dict[str, Any] = {
"type": "repo",
"owner": owner,
"repo": repo,
"description": rd.get("description") or "No description",
"language": rd.get("language") or "Unknown",
"stars": rd.get("stargazers_count", 0),
"forks": rd.get("forks_count", 0),
"readme_preview": "",
}
try:
async with s.get(
readme_api, timeout=_TIMEOUT, headers=gh_headers
) as r:
if r.status == 200:
rdata = await r.json()
if (
rdata.get("encoding") == "base64"
and "content" in rdata
):
result["readme_preview"] = base64.b64decode(
rdata["content"],
).decode("utf-8", errors="ignore")[:10_000]
except Exception:
pass
return result
except asyncio.TimeoutError:
logger.error("Timeout fetching GitHub content from %s", url)
except Exception:
logger.exception("Error fetching GitHub content from %s", url)
return None
[docs]
async def get_arxiv_content(url: str) -> Optional[Dict[str, Any]]:
try:
import xml.etree.ElementTree as ET
m = re.search(r"arxiv\.org/(?:abs|pdf)/([\d\.]+v?\d*)", url)
if not m:
return None
aid = m.group(1).replace(".pdf", "")
api = f"http://export.arxiv.org/api/query?id_list={aid}"
ns = {
"atom": "http://www.w3.org/2005/Atom",
"arxiv": "http://arxiv.org/schemas/atom",
}
async with aiohttp.ClientSession() as s:
async with s.get(api, timeout=_TIMEOUT) as r:
if r.status != 200:
return None
xml = await r.text()
root = ET.fromstring(xml)
entry = root.find("atom:entry", ns)
if entry is None:
return None
title_el = entry.find("atom:title", ns)
title = (
title_el.text.strip().replace("\n", " ")
if title_el is not None
else "No title"
)
authors = []
for a in entry.findall("atom:author", ns)[:3]:
n = a.find("atom:name", ns)
if n is not None:
authors.append(n.text)
total = len(entry.findall("atom:author", ns))
summ = entry.find("atom:summary", ns)
abstract = (
summ.text.strip().replace("\n", " ")[:600]
if summ is not None
else "No abstract"
)
cat = entry.find("arxiv:primary_category", ns)
return {
"arxiv_id": aid,
"title": title,
"authors": authors,
"author_count": total,
"abstract": abstract,
"category": (
cat.get("term") if cat is not None else "Unknown"
),
}
except asyncio.TimeoutError:
logger.error("Timeout fetching arXiv from %s", url)
except Exception:
logger.exception("Error fetching arXiv from %s", url)
return None
[docs]
async def get_gist_content(url: str) -> Optional[Dict[str, Any]]:
try:
m = re.search(r"gist\.github\.com/(?:[^/]+/)?([a-f0-9]+)", url)
if not m:
return None
gid = m.group(1)
api = f"https://api.github.com/gists/{gid}"
async with aiohttp.ClientSession() as s:
gist_headers = {
**_HEADERS,
"Accept": "application/vnd.github+json",
}
async with s.get(api, timeout=_TIMEOUT, headers=gist_headers) as r:
if r.status != 200:
return None
d = await r.json()
files = []
for fname, fdata in d.get("files", {}).items():
content = fdata.get("content", "")
if len(content) > 15_000:
content = content[:15_000] + "\n... (truncated)"
files.append({
"name": fname,
"language": fdata.get("language", "Unknown"),
"content": content,
})
return {
"id": gid,
"description": d.get("description") or "No description",
"owner": d.get("owner", {}).get("login", "anonymous"),
"files": files,
"public": d.get("public", True),
}
except asyncio.TimeoutError:
logger.error("Timeout fetching Gist from %s", url)
except Exception:
logger.exception("Error fetching Gist from %s", url)
return None
[docs]
async def get_stackoverflow_content(url: str) -> Optional[Dict[str, Any]]:
try:
import html as html_mod
m = re.search(
r"(stackoverflow\.com|superuser\.com|serverfault\.com|"
r"askubuntu\.com|([a-z]+)\.stackexchange\.com)/questions/(\d+)",
url,
)
if not m:
return None
full_site, subdomain, qid = m.group(1), m.group(2), m.group(3)
site_map = {
"stackoverflow.com": "stackoverflow",
"superuser.com": "superuser",
"serverfault.com": "serverfault",
"askubuntu.com": "askubuntu",
}
api_site = site_map.get(full_site, subdomain or "stackoverflow")
api = (
f"https://api.stackexchange.com/2.3/questions/{qid}"
f"?order=desc&sort=votes&site={api_site}&filter=withbody"
)
async with aiohttp.ClientSession() as s:
async with s.get(api, timeout=_TIMEOUT, headers=_HEADERS) as r:
if r.status != 200:
return None
d = await r.json()
items = d.get("items", [])
if not items:
return None
q = items[0]
body = re.sub(r"<[^>]+>", " ", q.get("body", ""))
body = html_mod.unescape(body)
body = " ".join(body.split())[:1500]
result: Dict[str, Any] = {
"site": api_site,
"question_id": int(qid),
"title": html_mod.unescape(q.get("title", "No title")),
"body": body,
"score": q.get("score", 0),
"answers": q.get("answer_count", 0),
"top_answer": None,
"tags": q.get("tags", [])[:5],
}
if result["answers"] > 0:
ans_api = (
f"https://api.stackexchange.com/2.3/questions/"
f"{qid}/answers?order=desc&sort=votes"
f"&site={api_site}&filter=withbody&pagesize=1"
)
async with s.get(
ans_api, timeout=_TIMEOUT, headers=_HEADERS
) as ar:
if ar.status == 200:
ad = await ar.json()
ai = ad.get("items", [])
if ai:
ab = re.sub(
r"<[^>]+>", " ",
ai[0].get("body", "")
)
ab = html_mod.unescape(ab)
ab = " ".join(ab.split())[:1000]
result["top_answer"] = {
"body": ab,
"score": ai[0].get("score", 0),
"is_accepted": ai[0].get(
"is_accepted", False
),
}
return result
except asyncio.TimeoutError:
logger.error("Timeout fetching SO from %s", url)
except Exception:
logger.exception("Error fetching SO from %s", url)
return None
[docs]
async def get_nvd_cve_content(url: str) -> Optional[Dict[str, Any]]:
try:
m = re.search(
r"nvd\.nist\.gov/vuln/detail/(CVE-\d{4}-\d{4,})",
url,
re.IGNORECASE,
)
if not m:
return None
cve_id = m.group(1).upper()
api = (
f"https://services.nvd.nist.gov/rest/json/cves/2.0"
f"?cveId={cve_id}"
)
async with aiohttp.ClientSession() as s:
async with s.get(
api, timeout=_LONG_TIMEOUT, headers=_HEADERS
) as r:
if r.status != 200:
return None
d = await r.json()
vulns = d.get("vulnerabilities", [])
if not vulns:
return None
cve = vulns[0].get("cve", {})
desc = ""
for dd in cve.get("descriptions", []):
if dd.get("lang") == "en":
desc = dd.get("value", "")
break
if not desc and cve.get("descriptions"):
desc = cve["descriptions"][0].get("value", "")
metrics = cve.get("metrics", {})
v3s, v3sev = None, None
for key in ("cvssMetricV31", "cvssMetricV30"):
entries = metrics.get(key, [])
if entries:
cd = entries[0].get("cvssData", {})
v3s = cd.get("baseScore")
v3sev = cd.get("baseSeverity")
break
v2s, v2sev = None, None
v2 = metrics.get("cvssMetricV2", [])
if v2:
cd = v2[0].get("cvssData", {})
v2s = cd.get("baseScore")
v2sev = v2[0].get("baseSeverity")
cwes: list[str] = []
for w in cve.get("weaknesses", []):
for wd in w.get("description", []):
if wd.get("lang") == "en":
v = wd.get("value", "")
if v and v not in cwes:
cwes.append(v)
refs = [
ref.get("url", "")
for ref in cve.get("references", [])[:5]
if ref.get("url")
]
products: list[str] = []
for cfg in cve.get("configurations", [])[:3]:
for node in cfg.get("nodes", [])[:3]:
for cm in node.get("cpeMatch", [])[:5]:
parts = cm.get("criteria", "").split(":")
if len(parts) >= 5:
ps = f"{parts[3]}:{parts[4]}"
if len(parts) > 5 and parts[5] != "*":
ps += f" ({parts[5]})"
if ps not in products:
products.append(ps)
if len(products) >= 5:
break
return {
"cve_id": cve_id,
"description": desc[:1500] or "No description available",
"published": cve.get("published", "Unknown"),
"last_modified": cve.get("lastModified", "Unknown"),
"cvss_v3_score": v3s,
"cvss_v3_severity": v3sev,
"cvss_v2_score": v2s,
"cvss_v2_severity": v2sev,
"cwe_ids": cwes[:5],
"references": refs,
"affected_products": products,
}
except asyncio.TimeoutError:
logger.error("Timeout fetching NVD CVE from %s", url)
except Exception:
logger.exception("Error fetching NVD CVE from %s", url)
return None