Source code for tools.feature_atlas.extract_repo_symbols

"""Step 1a: Multi-language repository symbol extraction.

Walks the stargazer-v3 repo and extracts symbols from:
- Python (.py): classes, functions, constants, imports, env vars via ``ast``
- TypeScript/JavaScript (.ts, .tsx, .js, .jsx): exports, classes, functions,
  interfaces, types, constants, imports, React components via regex
- CSS (.css): class selectors, custom properties, keyframes
- HTML (.html): component references, element IDs
- YAML (.yaml, .yml): top-level keys
- Jinja2 (.j2): template variables, block names
- JSON (.json): top-level keys

Outputs ``outputs/repo_symbols.json`` which the feature extraction swarm
uses as its evidence base.

Usage:
    python -m tools.feature_atlas.extract_repo_symbols

# fire skull spider -- CATALOGING ALL THE BONES IN EVERY LANGUAGE
"""

from __future__ import annotations

import ast
import json
import logging
import os
import re
import sys
import time
from pathlib import Path
from typing import Any

import yaml

_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(_PROJECT_ROOT))

logger = logging.getLogger(__name__)

_ATLAS_DIR = Path(__file__).resolve().parent
_CONFIG_PATH = _ATLAS_DIR / "config.yaml"
_OUTPUT_PATH = _ATLAS_DIR / "outputs" / "repo_symbols.json"

# Regex for CONSTANT_NAME pattern (UPPER_SNAKE_CASE, at least 2 chars)
_CONSTANT_RE = re.compile(r"^[A-Z][A-Z0-9_]{1,}$")

# Regex for os.environ references
_ENV_VAR_RE = re.compile(
    r"""os\.environ\.get\(\s*['\"]([A-Z_][A-Z0-9_]*)['\"]"""
    r"""|os\.environ\[['\"]([A-Z_][A-Z0-9_]*)['\"]"""
    r"""|os\.getenv\(\s*['\"]([A-Z_][A-Z0-9_]*)['\"]""",
)


[docs] def load_config() -> dict[str, Any]: """Load the Feature Atlas configuration from ``config.yaml``. Reads the module-level ``_CONFIG_PATH`` (``tools/feature_atlas/config.yaml``) and parses it with ``yaml.safe_load``, returning the scan settings (include extensions, excluded dirs and files) that drive :func:`scan_repository`. This is a pure filesystem read with no Redis, knowledge-graph, LLM, or HTTP side effects. Invoked by :func:`main` in this module. The same ``load_config`` name is also defined and used in the sibling atlas steps (``discover_features.py`` and ``extract_features_swarm.py``), but those are each their own module-level function, not this one. Returns: The parsed configuration as a dictionary. Raises: FileNotFoundError: If ``config.yaml`` does not exist at the expected path. yaml.YAMLError: If the file is present but cannot be parsed as YAML. """ with open(_CONFIG_PATH, "r", encoding="utf-8") as f: return yaml.safe_load(f)
def _should_exclude(path: Path, config: dict[str, Any]) -> bool: """Decide whether a path should be skipped during the repo walk. Applies the config-driven exclusion rules so the symbol extractor never descends into vendored, generated, or otherwise irrelevant trees. A path is excluded if any of its path components matches an entry in ``scan.exclude_dirs``, or if the path matches any ``scan.exclude_files`` glob pattern via :meth:`pathlib.Path.match`. Pure in-memory string and glob matching with no I/O or external side effects. Called by :func:`scan_repository` twice per iteration: once against the relative directory (to prune whole subtrees) and once against each relative file path before it is parsed. Args: path: A repo-relative path (directory or file) to test. config: The parsed atlas config; its ``scan`` section supplies ``exclude_dirs`` and ``exclude_files``. Returns: ``True`` if the path matches an exclusion rule, ``False`` otherwise. """ scan = config.get("scan", {}) exclude_dirs = set(scan.get("exclude_dirs", [])) exclude_files = scan.get("exclude_files", []) # Check directory exclusions for part in path.parts: if part in exclude_dirs: return True # Check file exclusions (glob patterns) for pattern in exclude_files: if path.match(pattern): return True return False def _extract_python_symbols(filepath: Path, repo_root: Path) -> dict[str, Any]: """Extract the symbol record for one Python file via AST parsing. Reads the source from disk, scrapes environment-variable references with a regex first (so they survive even when the file fails to parse), then walks the :mod:`ast` tree to collect module-level classes (with bases and method names), top-level functions, UPPER_SNAKE_CASE constants, imports, and the truncated module docstring. Read failures and ``SyntaxError`` are logged as warnings via the module ``logger`` and yield a best-effort partial record rather than raising, so one bad file never aborts the whole scan. This touches only the filesystem (a single ``read_text``); no Redis, knowledge-graph, LLM, or HTTP interaction. Called by :func:`scan_repository` for each ``.py`` file it visits. Args: filepath: Absolute path to the Python file to analyze. repo_root: Repository root, used to compute the stored relative path. Returns: A dict with the relative ``file`` path, ``type`` of ``"python"``, and the extracted ``classes``, ``functions``, ``constants``, ``imports``, ``env_vars``, and module ``docstring`` (empty lists/string on failure). """ rel_path = str(filepath.relative_to(repo_root)).replace("\\", "/") result: dict[str, Any] = { "file": rel_path, "type": "python", "classes": [], "functions": [], "constants": [], "imports": [], "env_vars": [], "docstring": "", } try: source = filepath.read_text(encoding="utf-8", errors="replace") except Exception as e: logger.warning("Failed to read %s: %s", filepath, e) return result # Extract env var references via regex (before AST which may fail) for match in _ENV_VAR_RE.finditer(source): env_var = match.group(1) or match.group(2) or match.group(3) if env_var and env_var not in result["env_vars"]: result["env_vars"].append(env_var) try: tree = ast.parse(source, filename=str(filepath)) except SyntaxError as e: logger.warning("SyntaxError parsing %s: %s", filepath, e) return result # Module docstring if ( tree.body and isinstance(tree.body[0], ast.Expr) and isinstance(tree.body[0].value, (ast.Constant, ast.Str)) ): val = tree.body[0].value doc = val.value if isinstance(val, ast.Constant) else val.s # type: ignore[attr-defined] if isinstance(doc, str): # Truncate long docstrings result["docstring"] = doc[:500].strip() for node in ast.walk(tree): # Classes if isinstance(node, ast.ClassDef): class_info: dict[str, Any] = { "name": node.name, "line": node.lineno, "bases": [], "methods": [], } for base in node.bases: if isinstance(base, ast.Name): class_info["bases"].append(base.id) elif isinstance(base, ast.Attribute): class_info["bases"].append(ast.dump(base)) for item in node.body: if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)): class_info["methods"].append(item.name) result["classes"].append(class_info) # Top-level functions (not methods) elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): # Only include if it's at module level if hasattr(node, "col_offset") and node.col_offset == 0: func_info = { "name": node.name, "line": node.lineno, "is_async": isinstance(node, ast.AsyncFunctionDef), } result["functions"].append(func_info) # Top-level assignments (constants) elif isinstance(node, ast.Assign): if hasattr(node, "col_offset") and node.col_offset == 0: for target in node.targets: if isinstance(target, ast.Name) and _CONSTANT_RE.match( target.id ): result["constants"].append(target.id) # Imports elif isinstance(node, ast.Import): for alias in node.names: result["imports"].append(alias.name) elif isinstance(node, ast.ImportFrom): if node.module: result["imports"].append(node.module) return result def _extract_yaml_symbols(filepath: Path, repo_root: Path) -> dict[str, Any]: """Extract the top-level mapping keys from one YAML file. Reads at most the first 50KB of the file (a guard against pathologically large YAML payloads), parses it with ``yaml.safe_load``, and records up to the first 50 top-level keys when the document is a mapping. Parse or read errors are swallowed at debug level via the module ``logger`` and produce an empty key list, so a malformed config never breaks the scan. Filesystem read only; no other side effects. Called by :func:`scan_repository` for each ``.yaml`` or ``.yml`` file. Args: filepath: Absolute path to the YAML file to analyze. repo_root: Repository root, used to compute the stored relative path. Returns: A dict with the relative ``file`` path, ``type`` of ``"yaml"``, and a ``top_level_keys`` list (empty if the file is not a mapping or fails to parse). """ rel_path = str(filepath.relative_to(repo_root)).replace("\\", "/") result: dict[str, Any] = { "file": rel_path, "type": "yaml", "top_level_keys": [], } try: with open(filepath, "r", encoding="utf-8", errors="replace") as f: # Only read first 50KB to avoid massive YAML files content = f.read(50000) data = yaml.safe_load(content) if isinstance(data, dict): result["top_level_keys"] = list(data.keys())[:50] except Exception as e: logger.debug("Failed to parse YAML %s: %s", filepath, e) return result def _extract_jinja2_symbols(filepath: Path, repo_root: Path) -> dict[str, Any]: """Extract variable references and block names from a Jinja2 template. Scans the template text with two regexes: one for ``{{ var }}`` expressions (keeping only the top-level segment of dotted lookups and dropping the literals ``true``, ``false``, ``none``) and one for ``{% block name %}`` declarations. This regex approach intentionally avoids rendering or compiling the template, so it is safe to run against the bot's prompt templates without executing them. Read failures yield an empty record. Filesystem read only; no other side effects. Called by :func:`scan_repository` for each ``.j2`` file. Args: filepath: Absolute path to the Jinja2 template to analyze. repo_root: Repository root, used to compute the stored relative path. Returns: A dict with the relative ``file`` path, ``type`` of ``"jinja2"``, and the de-duplicated ``variables`` and ``blocks`` lists. """ rel_path = str(filepath.relative_to(repo_root)).replace("\\", "/") result: dict[str, Any] = { "file": rel_path, "type": "jinja2", "variables": [], "blocks": [], } try: content = filepath.read_text(encoding="utf-8", errors="replace") except Exception: return result # Extract {{ variable }} references var_re = re.compile(r"\{\{\s*([a-zA-Z_][a-zA-Z0-9_.]*)") for match in var_re.finditer(content): var = match.group(1).split(".")[0] # top-level variable if var not in result["variables"] and var not in ("true", "false", "none"): result["variables"].append(var) # Extract {% block name %} references block_re = re.compile(r"\{%[-\s]*block\s+(\w+)") for match in block_re.finditer(content): result["blocks"].append(match.group(1)) return result def _extract_json_symbols(filepath: Path, repo_root: Path) -> dict[str, Any]: """Extract the top-level object keys from one JSON file. Reads at most the first 50KB, parses with ``json.loads``, and records up to the first 50 keys when the document is an object (e.g. the bot's ``prompts/*.json`` payloads). Any read or decode error is silently tolerated and yields an empty key list, so the scan is never aborted by a truncated or non-object JSON file. Filesystem read only; no other side effects. Called by :func:`scan_repository` for each ``.json`` file. Args: filepath: Absolute path to the JSON file to analyze. repo_root: Repository root, used to compute the stored relative path. Returns: A dict with the relative ``file`` path, ``type`` of ``"json"``, and a ``top_level_keys`` list (empty if the document is not an object or cannot be parsed). """ rel_path = str(filepath.relative_to(repo_root)).replace("\\", "/") result: dict[str, Any] = { "file": rel_path, "type": "json", "top_level_keys": [], } try: with open(filepath, "r", encoding="utf-8", errors="replace") as f: content = f.read(50000) data = json.loads(content) if isinstance(data, dict): result["top_level_keys"] = list(data.keys())[:50] except Exception: pass return result # 💀 TS/JS regex patterns -- no tree-sitter, just regex scalpels _TS_EXPORT_RE = re.compile( r"export\s+(?:default\s+)?(?:async\s+)?" r"(?:function|class|const|let|var|interface|type|enum)\s+" r"([A-Za-z_$][A-Za-z0-9_$]*)", ) _TS_FUNCTION_RE = re.compile( r"(?:^|\n)\s*(?:export\s+)?(?:async\s+)?function\s+" r"([A-Za-z_$][A-Za-z0-9_$]*)", ) _TS_CLASS_RE = re.compile( r"(?:^|\n)\s*(?:export\s+)?(?:abstract\s+)?class\s+" r"([A-Za-z_$][A-Za-z0-9_$]*)", ) _TS_INTERFACE_RE = re.compile( r"(?:^|\n)\s*(?:export\s+)?interface\s+" r"([A-Za-z_$][A-Za-z0-9_$]*)", ) _TS_TYPE_RE = re.compile( r"(?:^|\n)\s*(?:export\s+)?type\s+" r"([A-Za-z_$][A-Za-z0-9_$]*)\s*=", ) _TS_CONST_RE = re.compile( r"(?:^|\n)\s*(?:export\s+)?const\s+" r"([A-Z][A-Z0-9_]+)\s*[=:]", ) _TS_IMPORT_RE = re.compile( r"(?:import|from)\s+['\"]([^'\"]+)['\"]", ) _TS_COMPONENT_RE = re.compile( r"(?:^|\n)\s*(?:export\s+)?(?:default\s+)?(?:function|const)\s+" r"([A-Z][A-Za-z0-9]+)\s*(?::\s*React\.FC|[=(])", ) def _extract_typescript_symbols(filepath: Path, repo_root: Path) -> dict[str, Any]: """Extract symbols from TypeScript/JavaScript files via regex. No tree-sitter -- just good old regex scalpels. Gets exports, classes, functions, interfaces, types, constants, imports, and React components. """ rel_path = str(filepath.relative_to(repo_root)).replace("\\", "/") ext = filepath.suffix.lower() result: dict[str, Any] = { "file": rel_path, "type": "typescript" if ext in (".ts", ".tsx") else "javascript", "exports": [], "classes": [], "functions": [], "interfaces": [], "types": [], "constants": [], "imports": [], "components": [], } try: content = filepath.read_text(encoding="utf-8", errors="replace") except Exception as e: logger.warning("Failed to read %s: %s", filepath, e) return result # Exports for m in _TS_EXPORT_RE.finditer(content): name = m.group(1) if name not in result["exports"]: result["exports"].append(name) # Classes for m in _TS_CLASS_RE.finditer(content): name = m.group(1) if name not in result["classes"]: result["classes"].append(name) # Functions for m in _TS_FUNCTION_RE.finditer(content): name = m.group(1) if name not in result["functions"]: result["functions"].append(name) # Interfaces (TS only) if ext in (".ts", ".tsx"): for m in _TS_INTERFACE_RE.finditer(content): name = m.group(1) if name not in result["interfaces"]: result["interfaces"].append(name) for m in _TS_TYPE_RE.finditer(content): name = m.group(1) if name not in result["types"]: result["types"].append(name) # Constants (UPPER_SNAKE_CASE) for m in _TS_CONST_RE.finditer(content): name = m.group(1) if name not in result["constants"]: result["constants"].append(name) # Imports for m in _TS_IMPORT_RE.finditer(content): mod = m.group(1) if mod not in result["imports"]: result["imports"].append(mod) # React components (PascalCase function/const) if ext in (".tsx", ".jsx"): for m in _TS_COMPONENT_RE.finditer(content): name = m.group(1) if name not in result["components"]: result["components"].append(name) return result # 🕷️ CSS selectors + custom properties _CSS_CLASS_RE = re.compile(r"\.([-a-zA-Z_][-a-zA-Z0-9_]*)\s*\{") _CSS_VAR_RE = re.compile(r"(--[-a-zA-Z0-9_]+)\s*:") _CSS_KEYFRAME_RE = re.compile(r"@keyframes\s+([-a-zA-Z0-9_]+)") def _extract_css_symbols(filepath: Path, repo_root: Path) -> dict[str, Any]: """Extract class selectors, custom properties, and keyframes from CSS. Runs three module-level regexes over the stylesheet to collect ``.class`` selectors, ``--custom-property`` declarations, and ``@keyframes`` names, de-duplicating each as it goes. The class list is capped at 100 entries to keep noisy utility-class stylesheets from dominating the symbol index. Read failures yield an empty record. Filesystem read only; no other side effects. Called by :func:`scan_repository` for each ``.css`` file. Args: filepath: Absolute path to the stylesheet to analyze. repo_root: Repository root, used to compute the stored relative path. Returns: A dict with the relative ``file`` path, ``type`` of ``"css"``, and the ``classes`` (capped at 100), ``custom_properties``, and ``keyframes`` lists. """ rel_path = str(filepath.relative_to(repo_root)).replace("\\", "/") result: dict[str, Any] = { "file": rel_path, "type": "css", "classes": [], "custom_properties": [], "keyframes": [], } try: content = filepath.read_text(encoding="utf-8", errors="replace") except Exception: return result seen_classes: set[str] = set() for m in _CSS_CLASS_RE.finditer(content): cls = m.group(1) if cls not in seen_classes: seen_classes.add(cls) result["classes"].append(cls) # Cap at 100 to avoid noise result["classes"] = result["classes"][:100] seen_vars: set[str] = set() for m in _CSS_VAR_RE.finditer(content): var = m.group(1) if var not in seen_vars: seen_vars.add(var) result["custom_properties"].append(var) for m in _CSS_KEYFRAME_RE.finditer(content): result["keyframes"].append(m.group(1)) return result _HTML_COMPONENT_RE = re.compile(r"<([A-Z][A-Za-z0-9]+)") _HTML_ID_RE = re.compile(r'id=["\']([^"\']+)["\']') def _extract_html_symbols(filepath: Path, repo_root: Path) -> dict[str, Any]: """Extract component references and element IDs from one HTML file. Uses module-level regexes to find PascalCase tag names (treated as component references, while filtering out ``DOCTYPE`` and ``CDATA``) and ``id="..."`` attribute values. Both lists are de-duplicated where relevant and capped at 100 entries apiece to bound the size of the symbol record. Read failures yield an empty record. Filesystem read only; no other side effects. Called by :func:`scan_repository` for each ``.html`` file. Args: filepath: Absolute path to the HTML file to analyze. repo_root: Repository root, used to compute the stored relative path. Returns: A dict with the relative ``file`` path, ``type`` of ``"html"``, and the ``components`` and ``ids`` lists (each capped at 100). """ rel_path = str(filepath.relative_to(repo_root)).replace("\\", "/") result: dict[str, Any] = { "file": rel_path, "type": "html", "components": [], "ids": [], } try: content = filepath.read_text(encoding="utf-8", errors="replace") except Exception: return result seen: set[str] = set() for m in _HTML_COMPONENT_RE.finditer(content): name = m.group(1) if name not in seen and name not in ("DOCTYPE", "CDATA"): seen.add(name) result["components"].append(name) for m in _HTML_ID_RE.finditer(content): result["ids"].append(m.group(1)) result["components"] = result["components"][:100] result["ids"] = result["ids"][:100] return result
[docs] def scan_repository(config: dict[str, Any]) -> list[dict[str, Any]]: """Scan the entire repository and extract symbols from all files. Returns a list of file symbol records. """ repo_root = _PROJECT_ROOT scan_cfg = config.get("scan", {}) include_exts = set(scan_cfg.get("include_extensions", [".py"])) symbols: list[dict[str, Any]] = [] files_scanned = 0 files_skipped = 0 logger.info("Scanning repository at %s", repo_root) for dirpath_str, dirnames, filenames in os.walk(repo_root): dirpath = Path(dirpath_str) rel_dirpath = dirpath.relative_to(repo_root) # Prune excluded directories if _should_exclude(rel_dirpath, config): dirnames.clear() continue # Also prune from dirnames to prevent os.walk from descending exclude_dirs = set(scan_cfg.get("exclude_dirs", [])) dirnames[:] = [d for d in dirnames if d not in exclude_dirs] for filename in sorted(filenames): filepath = dirpath / filename rel_filepath = filepath.relative_to(repo_root) if _should_exclude(rel_filepath, config): files_skipped += 1 continue ext = filepath.suffix.lower() if ext not in include_exts: files_skipped += 1 continue # Skip very large files (>700KB) -- likely data, not code try: if filepath.stat().st_size > 700_000: files_skipped += 1 continue except OSError: continue if ext == ".py": record = _extract_python_symbols(filepath, repo_root) elif ext in (".yaml", ".yml"): record = _extract_yaml_symbols(filepath, repo_root) elif ext == ".j2": record = _extract_jinja2_symbols(filepath, repo_root) elif ext == ".json": record = _extract_json_symbols(filepath, repo_root) elif ext in (".ts", ".tsx", ".js", ".jsx"): record = _extract_typescript_symbols(filepath, repo_root) elif ext == ".css": record = _extract_css_symbols(filepath, repo_root) elif ext == ".html": record = _extract_html_symbols(filepath, repo_root) else: files_skipped += 1 continue symbols.append(record) files_scanned += 1 logger.info( "Repository scan complete: %d files scanned, %d skipped", files_scanned, files_skipped, ) return symbols
[docs] def main() -> None: """Run the full symbol-extraction pass and write the evidence base. The synchronous entry point for atlas step 1a: it configures logging, loads the config via :func:`load_config`, scans the repo with :func:`scan_repository`, then serializes the collected records to ``outputs/repo_symbols.json`` (the module-level ``_OUTPUT_PATH``), re-encoding through UTF-8 with replacement to strip any lone surrogate characters that ``errors="replace"`` reads can leave behind. Finally it prints aggregate counts (files by type, classes, functions, constants, imports, env vars, and TS/JS totals) and elapsed time to stdout. Side effects are limited to logging, creating the output directory, writing the JSON file, and printing; no Redis, knowledge-graph, LLM, or HTTP calls. Called by the module's ``__main__`` guard at the bottom of this file and dispatched as a subprocess step by ``run_atlas.py`` (which imports it as ``run``). The downstream feature-extraction swarm consumes the ``repo_symbols.json`` it produces. """ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) t0 = time.time() config = load_config() symbols = scan_repository(config) # Write output -- sanitize surrogates that errors='replace' can leave _OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) raw_json = json.dumps(symbols, indent=2, ensure_ascii=False) # Strip surrogate chars that can't be encoded to UTF-8 raw_json = raw_json.encode("utf-8", errors="replace").decode("utf-8") with open(_OUTPUT_PATH, "w", encoding="utf-8") as f: f.write(raw_json) elapsed = time.time() - t0 # Summary stats total_classes = sum(len(s.get("classes", [])) for s in symbols) total_functions = sum(len(s.get("functions", [])) for s in symbols) total_constants = sum(len(s.get("constants", [])) for s in symbols) total_imports = sum(len(s.get("imports", [])) for s in symbols) total_env_vars = sum(len(s.get("env_vars", [])) for s in symbols) total_exports = sum(len(s.get("exports", [])) for s in symbols) total_interfaces = sum(len(s.get("interfaces", [])) for s in symbols) total_types = sum(len(s.get("types", [])) for s in symbols) total_components = sum(len(s.get("components", [])) for s in symbols) # Per-type counts from collections import Counter type_counts = Counter(s.get("type", "unknown") for s in symbols) print(f"\n{'=' * 60}") print(f" REPO SYMBOL EXTRACTION COMPLETE") print(f"{'=' * 60}") print(f" Files scanned: {len(symbols)}") for ftype, count in type_counts.most_common(): print(f" {ftype:16s} {count}") print(f" Classes found: {total_classes}") print(f" Functions found: {total_functions}") print(f" Constants found: {total_constants}") print(f" Imports found: {total_imports}") print(f" Env vars found: {total_env_vars}") if total_exports: print(f" TS/JS exports: {total_exports}") if total_interfaces: print(f" TS interfaces: {total_interfaces}") if total_types: print(f" TS types: {total_types}") if total_components: print(f" React components: {total_components}") print(f" Time elapsed: {elapsed:.1f}s") print(f" Output: {_OUTPUT_PATH}") print(f"{'=' * 60}\n")
if __name__ == "__main__": main()