Source code for tools.feature_atlas.discover_features

"""Discover features from an unknown codebase.

When no canonical_features are defined (or --discover-features is passed),
this module uses the Gemini Flash swarm to autonomously discover and propose
canonical features from the repo symbol index.

This is the "inhale protocol" -- point at ANY repo and let the swarm
identify organs from scratch.

Usage:
    python -m tools.feature_atlas.discover_features

# skull fire spider -- THE SWARM DISCOVERS UNKNOWN ANATOMY
"""

from __future__ import annotations

import asyncio
import json
import logging
import sys
import time
from pathlib import Path
from typing import Any

import yaml

_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(_PROJECT_ROOT))

logger = logging.getLogger(__name__)

_ATLAS_DIR = Path(__file__).resolve().parent
_CONFIG_PATH = _ATLAS_DIR / "config.yaml"
_SYMBOLS_PATH = _ATLAS_DIR / "outputs" / "repo_symbols.json"
_DISCOVERED_PATH = _ATLAS_DIR / "outputs" / "discovered_features.json"


[docs] def load_config() -> dict[str, Any]: """Load the Feature Atlas configuration from ``config.yaml``. Reads the module-level ``_CONFIG_PATH`` (``tools/feature_atlas/config.yaml``) and parses it with ``yaml.safe_load``. The returned mapping supplies swarm concurrency and model settings consumed downstream by :func:`discover_features` and the batch discovery calls. This is a pure filesystem read with no Redis, knowledge-graph, LLM, or HTTP side effects. Invoked by :func:`async_main` in this module; no other internal callers were found. Returns: The parsed configuration as a dictionary. Raises: FileNotFoundError: If ``config.yaml`` does not exist at the expected path. yaml.YAMLError: If the file is present but cannot be parsed as YAML. """ with open(_CONFIG_PATH, "r", encoding="utf-8") as f: return yaml.safe_load(f)
[docs] def load_symbols() -> list[dict[str, Any]]: """Load the repository symbol index, failing loudly if it is missing. Reads ``outputs/repo_symbols.json`` (the module-level ``_SYMBOLS_PATH``), the per-file symbol records emitted by the repo symbol extractor. If the file is absent, raises a descriptive ``FileNotFoundError`` pointing the operator at ``extract_repo_symbols.py``. The records feed :func:`discover_features`, which groups them by directory before handing them to the discovery swarm. This is a pure filesystem read with no external side effects. Invoked by :func:`discover_features` in this module; no other internal callers were found. Returns: The list of per-file symbol records loaded from the symbol index JSON. Raises: FileNotFoundError: If the symbol index file is missing, with guidance to run ``extract_repo_symbols.py`` first. json.JSONDecodeError: If the file contents are not valid JSON. """ if not _SYMBOLS_PATH.exists(): raise FileNotFoundError( f"Repo symbols not found at {_SYMBOLS_PATH}. " "Run extract_repo_symbols.py first." ) with open(_SYMBOLS_PATH, "r", encoding="utf-8") as f: return json.load(f)
def _build_directory_summary( dir_name: str, files: list[dict[str, Any]], ) -> str: """Render a directory and its files into a compact summary for the LLM. Produces the per-directory context block that the discovery swarm reasons over: a header with the directory name and file count, a tally of file types, and up to 25 representative file lines that surface key Python classes and functions or TypeScript exports and components plus a truncated docstring. Directories with more than 25 files get an ellipsis line noting how many were omitted. This keeps the prompt small enough to batch several directories per LLM call while still conveying each directory's shape. Pure string formatting with no Redis, knowledge-graph, LLM, HTTP, or filesystem side effects. Invoked by :func:`discover_features` in this module; no other internal callers were found. Args: dir_name: The top-level directory name being summarized. files: The symbol records for the files in that directory. Returns: A newline-joined plain-text summary of the directory and its files. """ parts = [f"Directory: {dir_name}/ ({len(files)} files)"] # Count file types from collections import Counter types = Counter(f.get("type", "?") for f in files) parts.append(f" File types: {dict(types)}") # List key files for f in files[:25]: name = f.get("file", "?") ftype = f.get("type", "?") line = f" - {name} ({ftype})" # Add key symbols if ftype == "python": classes = [c.get("name", "") for c in f.get("classes", [])[:5]] funcs = [fn.get("name", "") for fn in f.get("functions", [])[:5]] if classes: line += f" classes=[{', '.join(classes)}]" if funcs: line += f" funcs=[{', '.join(funcs)}]" elif ftype in ("typescript", "javascript"): exports = f.get("exports", [])[:5] components = f.get("components", [])[:5] if exports: line += f" exports=[{', '.join(exports)}]" if components: line += f" components=[{', '.join(components)}]" doc = f.get("docstring", "") if doc: line += f" doc=\"{doc[:80]}\"" parts.append(line) if len(files) > 25: parts.append(f" ... and {len(files) - 25} more files") return "\n".join(parts) async def _discover_features_for_batch( dir_summaries: str, config: dict[str, Any], semaphore: asyncio.Semaphore, ) -> list[dict[str, Any]]: """Ask the LLM to propose candidate features for one batch of directories. The per-batch unit of work in the discovery swarm. It builds an architecture-analysis prompt around the supplied directory summaries, instructs the model to return a JSON array of subsystem candidates (id, human name, category, description, hint files, evidence, and confidence), and calls the shared Gemini Flash client :func:`tools.feature_atlas.extract_features_swarm._gemini_generate` to run the inference. The raw response is cleaned with :func:`tools.feature_atlas.extract_features_swarm._scrub_llm_json` before parsing. The supplied semaphore bounds how many of these calls run concurrently. JSON-decode errors and any other exception are caught and logged, yielding an empty list so one bad batch never aborts the swarm. The only side effect is the outbound LLM/HTTP call made by the shared client. Invoked by :func:`discover_features` in this module, which fans out one such coroutine per directory batch; no other internal callers were found. Args: dir_summaries: The concatenated directory summary text for this batch. config: The atlas configuration, forwarded to the Gemini client for model and request settings. semaphore: Concurrency limiter shared across all batches in the swarm. Returns: The list of discovered feature dictionaries for this batch, or an empty list when the model returns nothing usable or an error occurs. """ # 💀 Import from swarm -- reuse the battle-tested LLM caller from tools.feature_atlas.extract_features_swarm import _gemini_generate async with semaphore: prompt = f"""You are analyzing an unknown codebase to discover its major subsystems and features. Below are directory summaries with their files and symbols: {dir_summaries} Your job: identify the major SUBSYSTEMS (organs/features) this codebase has. Rules: - A feature/organ is a MEANINGFUL SUBSYSTEM, not individual files - Group related files into coherent features - Each feature should have a clear responsibility - Don't create features for test files, config files, or utility helpers - Don't create features that only have 1-2 small files unless they're clearly standalone - Focus on: core engines, data layers, API surfaces, UI systems, processing pipelines, integration layers, tool suites, background workers, authentication, etc. For each discovered feature, output: - "id": PascalCase feature ID (e.g. "CoreMemory", "PaymentEngine") - "human_name": readable name - "category": one of [memory, engine, routing, platform, infra, tools, persona, game, ncm, api, data, ui] - "description": 1-2 sentence description of what this subsystem does - "hint_files": list of key files that belong to this feature - "evidence": brief explanation of why you identified this as a feature - "confidence": 0.0-1.0 Return ONLY a JSON array of feature objects. No markdown, no explanation.""" system = ( "You are a precise software architecture analysis agent. " "You identify meaningful subsystems in codebases. " "You output only valid JSON arrays. " "You never hallucinate file names or symbols." ) try: raw = await _gemini_generate(prompt, config, system) if not raw or not raw.strip(): return [] from tools.feature_atlas.extract_features_swarm import _scrub_llm_json raw = _scrub_llm_json(raw) if not raw: return [] features = json.loads(raw) if isinstance(features, list): return features return [] except json.JSONDecodeError as e: logger.warning("Failed to parse discovered features JSON: %s", e) return [] except Exception as e: logger.warning("Feature discovery failed for batch: %s", e) return [] async def _merge_and_deduplicate( all_features: list[dict[str, Any]], config: dict[str, Any], ) -> list[dict[str, Any]]: """Collapse overlapping discovered features into a clean, non-overlapping set. Runs a two-stage reconciliation over the raw candidates emitted by every batch of the discovery swarm. The first stage is a deterministic dedup by feature ID that unions the ``hint_files`` of colliding entries. If that already yields 80 or fewer features the result is returned as-is; otherwise a second LLM pass (via :func:`tools.feature_atlas.extract_features_swarm._gemini_generate`, with the response cleaned by :func:`tools.feature_atlas.extract_features_swarm._scrub_llm_json`) merges redundant or overly granular subsystems down toward a 30-100 feature target. The LLM merge is best-effort: if it returns nothing or fails to parse into a list of more than five items, the deterministically deduped list is returned instead. The only side effect is the optional outbound LLM/HTTP call. Invoked by :func:`discover_features` in this module; no other internal callers were found. Args: all_features: The concatenated raw feature candidates from all batches. config: The atlas configuration forwarded to the Gemini client. Returns: The merged, deduplicated list of feature dictionaries. """ from tools.feature_atlas.extract_features_swarm import _gemini_generate # First pass: simple dedup by ID seen: dict[str, dict[str, Any]] = {} for feat in all_features: fid = feat.get("id", "") if not fid: continue if fid not in seen: seen[fid] = feat else: # Merge hint_files existing = seen[fid] existing_hints = set(existing.get("hint_files", [])) new_hints = set(feat.get("hint_files", [])) existing["hint_files"] = list(existing_hints | new_hints) deduped = list(seen.values()) # If small enough, return as-is if len(deduped) <= 80: return deduped # LLM merge pass for large results feature_dump = json.dumps(deduped, indent=2)[:15000] prompt = f"""You have {len(deduped)} discovered features from a codebase analysis. Some may be duplicates, overlaps, or too granular. Merge them into a clean, non-overlapping set of major subsystems. Combine features that clearly belong together. Remove features that are too small or trivial. Keep the total between 30-100 features. Current features: {feature_dump} Return ONLY a JSON array of the merged feature objects with the same schema. Keep the best ID, human_name, and merge hint_files.""" raw = await _gemini_generate(prompt, config, "You merge software features. Output only valid JSON arrays.") if not raw: return deduped from tools.feature_atlas.extract_features_swarm import _scrub_llm_json raw = _scrub_llm_json(raw) try: merged = json.loads(raw) if isinstance(merged, list) and len(merged) > 5: return merged except json.JSONDecodeError: pass return deduped
[docs] async def discover_features(config: dict[str, Any]) -> list[dict[str, Any]]: """Drive the inhale-protocol pipeline that discovers features from a repo. The top-level discovery routine: it loads the repo symbol index via :func:`load_symbols`, groups files by top-level directory, renders each group with :func:`_build_directory_summary`, and batches roughly four directories per LLM call. It then fans those batches out concurrently through :func:`_discover_features_for_batch` (bounded by an ``asyncio.Semaphore`` sized from the ``swarm.max_concurrent`` config), gathers the raw candidates while logging any failed batch, and reconciles them through :func:`_merge_and_deduplicate`. Progress is logged at each stage. Side effects are the filesystem read of the symbol index and the outbound LLM/HTTP calls made by the swarm; this function itself does not touch Redis or the knowledge graph. Invoked by :func:`async_main` in this module; no other internal callers were found. Args: config: The atlas configuration controlling swarm concurrency and the model settings passed down to the Gemini client. Returns: The final, merged list of discovered feature dictionaries. """ symbols = load_symbols() # Group by top-level directory groups: dict[str, list[dict[str, Any]]] = {} for record in symbols: filepath = record.get("file", "") parts = filepath.split("/") if len(parts) > 1: group = parts[0] else: group = "root" groups.setdefault(group, []).append(record) logger.info( "Discovering features from %d files across %d directories", len(symbols), len(groups), ) # Build directory summaries and batch them dir_summaries = [] for dir_name, files in sorted(groups.items()): dir_summaries.append(_build_directory_summary(dir_name, files)) # Batch directories into chunks (3-5 dirs per LLM call for context) batch_size = 4 batches = [] for i in range(0, len(dir_summaries), batch_size): batch_text = "\n\n---\n\n".join(dir_summaries[i:i + batch_size]) batches.append(batch_text) # Run discovery swarm max_concurrent = config.get("swarm", {}).get("max_concurrent", 5) semaphore = asyncio.Semaphore(max_concurrent) tasks = [ _discover_features_for_batch(batch, config, semaphore) for batch in batches ] all_features: list[dict[str, Any]] = [] results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, list): all_features.extend(result) elif isinstance(result, Exception): logger.warning("Batch discovery failed: %s", result) logger.info("Raw discovery found %d features, deduplicating...", len(all_features)) # Merge and deduplicate final_features = await _merge_and_deduplicate(all_features, config) logger.info("Final discovered features: %d", len(final_features)) return final_features
[docs] async def async_main() -> None: """Run the feature-discovery step and persist its results to disk. Orchestrates the discovery half of the atlas pipeline: it configures INFO logging, loads the atlas config via :func:`load_config`, runs the full :func:`discover_features` pipeline, and writes the proposed features to ``outputs/discovered_features.json`` (re-encoding through UTF-8 with replacement to stay JSON-safe). It then prints a summary that includes the feature count, a per-category breakdown, timing, the output path, and a preview of the top discovered features. Side effects are the filesystem write of the output JSON plus the transitive LLM/HTTP calls and symbol-index read performed inside :func:`discover_features`. Invoked by :func:`main` in this module's ``__main__`` guard and imported as ``async_main`` by :func:`tools.feature_atlas.run_atlas.step_discover_features` (the ``discover-features`` step of the atlas runner); no other internal callers were found. Returns: None. """ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) t0 = time.time() config = load_config() features = await discover_features(config) # Save output _DISCOVERED_PATH.parent.mkdir(parents=True, exist_ok=True) raw_json = json.dumps(features, indent=2, ensure_ascii=False) raw_json = raw_json.encode("utf-8", errors="replace").decode("utf-8") with open(_DISCOVERED_PATH, "w", encoding="utf-8") as f: f.write(raw_json) elapsed = time.time() - t0 # Summary print(f"\n{'=' * 60}") print(f" FEATURE DISCOVERY COMPLETE") print(f"{'=' * 60}") print(f" Features discovered: {len(features)}") if features: from collections import Counter cats = Counter(f.get("category", "?") for f in features) for cat, count in cats.most_common(): print(f" {cat}: {count}") print(f" Time elapsed: {elapsed:.1f}s") print(f" Output: {_DISCOVERED_PATH}") print(f"{'=' * 60}\n") # Show top features if features: print(" Top discovered features:") for feat in features[:20]: print(f" - {feat.get('id', '?')}: {feat.get('human_name', '?')}") print(f" {feat.get('description', '')[:100]}") if len(features) > 20: print(f" ... and {len(features) - 20} more")
[docs] def main() -> None: """Synchronous entry point for the feature-discovery step. Configures root logging at INFO level and drives the async pipeline by calling ``asyncio.run(async_main())``, which scans the symbol index, runs the Gemini Flash discovery swarm, and writes ``outputs/discovered_features.json``. All Redis, LLM/HTTP proxy, and filesystem side effects happen transitively inside :func:`async_main` and :func:`discover_features`; this wrapper only sets up logging and starts the event loop. Invoked from the module's ``if __name__ == "__main__"`` guard via ``python -m tools.feature_atlas.discover_features``; no other internal callers were found. Returns: None. """ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) asyncio.run(async_main())
if __name__ == "__main__": main()