"""Discover features from an unknown codebase.
When no canonical_features are defined (or --discover-features is passed),
this module uses the Gemini Flash swarm to autonomously discover and propose
canonical features from the repo symbol index.
This is the "inhale protocol" -- point at ANY repo and let the swarm
identify organs from scratch.
Usage:
python -m tools.feature_atlas.discover_features
# skull fire spider -- THE SWARM DISCOVERS UNKNOWN ANATOMY
"""
from __future__ import annotations
import asyncio
import json
import logging
import sys
import time
from pathlib import Path
from typing import Any
import yaml
_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
logger = logging.getLogger(__name__)
_ATLAS_DIR = Path(__file__).resolve().parent
_CONFIG_PATH = _ATLAS_DIR / "config.yaml"
_SYMBOLS_PATH = _ATLAS_DIR / "outputs" / "repo_symbols.json"
_DISCOVERED_PATH = _ATLAS_DIR / "outputs" / "discovered_features.json"
[docs]
def load_config() -> dict[str, Any]:
"""Load the Feature Atlas configuration from ``config.yaml``.
Reads the module-level ``_CONFIG_PATH``
(``tools/feature_atlas/config.yaml``) and parses it with
``yaml.safe_load``. The returned mapping supplies swarm concurrency and
model settings consumed downstream by :func:`discover_features` and the
batch discovery calls. This is a pure filesystem read with no Redis,
knowledge-graph, LLM, or HTTP side effects.
Invoked by :func:`async_main` in this module; no other internal callers
were found.
Returns:
The parsed configuration as a dictionary.
Raises:
FileNotFoundError: If ``config.yaml`` does not exist at the expected path.
yaml.YAMLError: If the file is present but cannot be parsed as YAML.
"""
with open(_CONFIG_PATH, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
[docs]
def load_symbols() -> list[dict[str, Any]]:
"""Load the repository symbol index, failing loudly if it is missing.
Reads ``outputs/repo_symbols.json`` (the module-level ``_SYMBOLS_PATH``),
the per-file symbol records emitted by the repo symbol extractor. If the
file is absent, raises a descriptive ``FileNotFoundError`` pointing the
operator at ``extract_repo_symbols.py``. The records feed
:func:`discover_features`, which groups them by directory before handing
them to the discovery swarm. This is a pure filesystem read with no
external side effects.
Invoked by :func:`discover_features` in this module; no other internal
callers were found.
Returns:
The list of per-file symbol records loaded from the symbol index JSON.
Raises:
FileNotFoundError: If the symbol index file is missing, with guidance
to run ``extract_repo_symbols.py`` first.
json.JSONDecodeError: If the file contents are not valid JSON.
"""
if not _SYMBOLS_PATH.exists():
raise FileNotFoundError(
f"Repo symbols not found at {_SYMBOLS_PATH}. "
"Run extract_repo_symbols.py first."
)
with open(_SYMBOLS_PATH, "r", encoding="utf-8") as f:
return json.load(f)
def _build_directory_summary(
dir_name: str,
files: list[dict[str, Any]],
) -> str:
"""Render a directory and its files into a compact summary for the LLM.
Produces the per-directory context block that the discovery swarm reasons
over: a header with the directory name and file count, a tally of file
types, and up to 25 representative file lines that surface key Python
classes and functions or TypeScript exports and components plus a truncated
docstring. Directories with more than 25 files get an ellipsis line noting
how many were omitted. This keeps the prompt small enough to batch several
directories per LLM call while still conveying each directory's shape. Pure
string formatting with no Redis, knowledge-graph, LLM, HTTP, or filesystem
side effects.
Invoked by :func:`discover_features` in this module; no other internal
callers were found.
Args:
dir_name: The top-level directory name being summarized.
files: The symbol records for the files in that directory.
Returns:
A newline-joined plain-text summary of the directory and its files.
"""
parts = [f"Directory: {dir_name}/ ({len(files)} files)"]
# Count file types
from collections import Counter
types = Counter(f.get("type", "?") for f in files)
parts.append(f" File types: {dict(types)}")
# List key files
for f in files[:25]:
name = f.get("file", "?")
ftype = f.get("type", "?")
line = f" - {name} ({ftype})"
# Add key symbols
if ftype == "python":
classes = [c.get("name", "") for c in f.get("classes", [])[:5]]
funcs = [fn.get("name", "") for fn in f.get("functions", [])[:5]]
if classes:
line += f" classes=[{', '.join(classes)}]"
if funcs:
line += f" funcs=[{', '.join(funcs)}]"
elif ftype in ("typescript", "javascript"):
exports = f.get("exports", [])[:5]
components = f.get("components", [])[:5]
if exports:
line += f" exports=[{', '.join(exports)}]"
if components:
line += f" components=[{', '.join(components)}]"
doc = f.get("docstring", "")
if doc:
line += f" doc=\"{doc[:80]}\""
parts.append(line)
if len(files) > 25:
parts.append(f" ... and {len(files) - 25} more files")
return "\n".join(parts)
async def _discover_features_for_batch(
dir_summaries: str,
config: dict[str, Any],
semaphore: asyncio.Semaphore,
) -> list[dict[str, Any]]:
"""Ask the LLM to propose candidate features for one batch of directories.
The per-batch unit of work in the discovery swarm. It builds an
architecture-analysis prompt around the supplied directory summaries,
instructs the model to return a JSON array of subsystem candidates (id,
human name, category, description, hint files, evidence, and confidence),
and calls the shared Gemini Flash client
:func:`tools.feature_atlas.extract_features_swarm._gemini_generate` to run
the inference. The raw response is cleaned with
:func:`tools.feature_atlas.extract_features_swarm._scrub_llm_json` before
parsing. The supplied semaphore bounds how many of these calls run
concurrently. JSON-decode errors and any other exception are caught and
logged, yielding an empty list so one bad batch never aborts the swarm. The
only side effect is the outbound LLM/HTTP call made by the shared client.
Invoked by :func:`discover_features` in this module, which fans out one such
coroutine per directory batch; no other internal callers were found.
Args:
dir_summaries: The concatenated directory summary text for this batch.
config: The atlas configuration, forwarded to the Gemini client for
model and request settings.
semaphore: Concurrency limiter shared across all batches in the swarm.
Returns:
The list of discovered feature dictionaries for this batch, or an empty
list when the model returns nothing usable or an error occurs.
"""
# 💀 Import from swarm -- reuse the battle-tested LLM caller
from tools.feature_atlas.extract_features_swarm import _gemini_generate
async with semaphore:
prompt = f"""You are analyzing an unknown codebase to discover its major subsystems and features.
Below are directory summaries with their files and symbols:
{dir_summaries}
Your job: identify the major SUBSYSTEMS (organs/features) this codebase has.
Rules:
- A feature/organ is a MEANINGFUL SUBSYSTEM, not individual files
- Group related files into coherent features
- Each feature should have a clear responsibility
- Don't create features for test files, config files, or utility helpers
- Don't create features that only have 1-2 small files unless they're clearly standalone
- Focus on: core engines, data layers, API surfaces, UI systems, processing pipelines,
integration layers, tool suites, background workers, authentication, etc.
For each discovered feature, output:
- "id": PascalCase feature ID (e.g. "CoreMemory", "PaymentEngine")
- "human_name": readable name
- "category": one of [memory, engine, routing, platform, infra, tools, persona, game, ncm, api, data, ui]
- "description": 1-2 sentence description of what this subsystem does
- "hint_files": list of key files that belong to this feature
- "evidence": brief explanation of why you identified this as a feature
- "confidence": 0.0-1.0
Return ONLY a JSON array of feature objects. No markdown, no explanation."""
system = (
"You are a precise software architecture analysis agent. "
"You identify meaningful subsystems in codebases. "
"You output only valid JSON arrays. "
"You never hallucinate file names or symbols."
)
try:
raw = await _gemini_generate(prompt, config, system)
if not raw or not raw.strip():
return []
from tools.feature_atlas.extract_features_swarm import _scrub_llm_json
raw = _scrub_llm_json(raw)
if not raw:
return []
features = json.loads(raw)
if isinstance(features, list):
return features
return []
except json.JSONDecodeError as e:
logger.warning("Failed to parse discovered features JSON: %s", e)
return []
except Exception as e:
logger.warning("Feature discovery failed for batch: %s", e)
return []
async def _merge_and_deduplicate(
all_features: list[dict[str, Any]],
config: dict[str, Any],
) -> list[dict[str, Any]]:
"""Collapse overlapping discovered features into a clean, non-overlapping set.
Runs a two-stage reconciliation over the raw candidates emitted by every
batch of the discovery swarm. The first stage is a deterministic dedup by
feature ID that unions the ``hint_files`` of colliding entries. If that
already yields 80 or fewer features the result is returned as-is;
otherwise a second LLM pass (via
:func:`tools.feature_atlas.extract_features_swarm._gemini_generate`, with
the response cleaned by
:func:`tools.feature_atlas.extract_features_swarm._scrub_llm_json`) merges
redundant or overly granular subsystems down toward a 30-100 feature
target. The LLM merge is best-effort: if it returns nothing or fails to
parse into a list of more than five items, the deterministically deduped
list is returned instead. The only side effect is the optional outbound
LLM/HTTP call.
Invoked by :func:`discover_features` in this module; no other internal
callers were found.
Args:
all_features: The concatenated raw feature candidates from all batches.
config: The atlas configuration forwarded to the Gemini client.
Returns:
The merged, deduplicated list of feature dictionaries.
"""
from tools.feature_atlas.extract_features_swarm import _gemini_generate
# First pass: simple dedup by ID
seen: dict[str, dict[str, Any]] = {}
for feat in all_features:
fid = feat.get("id", "")
if not fid:
continue
if fid not in seen:
seen[fid] = feat
else:
# Merge hint_files
existing = seen[fid]
existing_hints = set(existing.get("hint_files", []))
new_hints = set(feat.get("hint_files", []))
existing["hint_files"] = list(existing_hints | new_hints)
deduped = list(seen.values())
# If small enough, return as-is
if len(deduped) <= 80:
return deduped
# LLM merge pass for large results
feature_dump = json.dumps(deduped, indent=2)[:15000]
prompt = f"""You have {len(deduped)} discovered features from a codebase analysis.
Some may be duplicates, overlaps, or too granular.
Merge them into a clean, non-overlapping set of major subsystems.
Combine features that clearly belong together.
Remove features that are too small or trivial.
Keep the total between 30-100 features.
Current features:
{feature_dump}
Return ONLY a JSON array of the merged feature objects with the same schema.
Keep the best ID, human_name, and merge hint_files."""
raw = await _gemini_generate(prompt, config,
"You merge software features. Output only valid JSON arrays.")
if not raw:
return deduped
from tools.feature_atlas.extract_features_swarm import _scrub_llm_json
raw = _scrub_llm_json(raw)
try:
merged = json.loads(raw)
if isinstance(merged, list) and len(merged) > 5:
return merged
except json.JSONDecodeError:
pass
return deduped
[docs]
async def discover_features(config: dict[str, Any]) -> list[dict[str, Any]]:
"""Drive the inhale-protocol pipeline that discovers features from a repo.
The top-level discovery routine: it loads the repo symbol index via
:func:`load_symbols`, groups files by top-level directory, renders each
group with :func:`_build_directory_summary`, and batches roughly four
directories per LLM call. It then fans those batches out concurrently
through :func:`_discover_features_for_batch` (bounded by an
``asyncio.Semaphore`` sized from the ``swarm.max_concurrent`` config),
gathers the raw candidates while logging any failed batch, and reconciles
them through :func:`_merge_and_deduplicate`. Progress is logged at each
stage. Side effects are the filesystem read of the symbol index and the
outbound LLM/HTTP calls made by the swarm; this function itself does not
touch Redis or the knowledge graph.
Invoked by :func:`async_main` in this module; no other internal callers
were found.
Args:
config: The atlas configuration controlling swarm concurrency and the
model settings passed down to the Gemini client.
Returns:
The final, merged list of discovered feature dictionaries.
"""
symbols = load_symbols()
# Group by top-level directory
groups: dict[str, list[dict[str, Any]]] = {}
for record in symbols:
filepath = record.get("file", "")
parts = filepath.split("/")
if len(parts) > 1:
group = parts[0]
else:
group = "root"
groups.setdefault(group, []).append(record)
logger.info(
"Discovering features from %d files across %d directories",
len(symbols), len(groups),
)
# Build directory summaries and batch them
dir_summaries = []
for dir_name, files in sorted(groups.items()):
dir_summaries.append(_build_directory_summary(dir_name, files))
# Batch directories into chunks (3-5 dirs per LLM call for context)
batch_size = 4
batches = []
for i in range(0, len(dir_summaries), batch_size):
batch_text = "\n\n---\n\n".join(dir_summaries[i:i + batch_size])
batches.append(batch_text)
# Run discovery swarm
max_concurrent = config.get("swarm", {}).get("max_concurrent", 5)
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [
_discover_features_for_batch(batch, config, semaphore)
for batch in batches
]
all_features: list[dict[str, Any]] = []
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, list):
all_features.extend(result)
elif isinstance(result, Exception):
logger.warning("Batch discovery failed: %s", result)
logger.info("Raw discovery found %d features, deduplicating...", len(all_features))
# Merge and deduplicate
final_features = await _merge_and_deduplicate(all_features, config)
logger.info("Final discovered features: %d", len(final_features))
return final_features
[docs]
async def async_main() -> None:
"""Run the feature-discovery step and persist its results to disk.
Orchestrates the discovery half of the atlas pipeline: it configures INFO
logging, loads the atlas config via :func:`load_config`, runs the full
:func:`discover_features` pipeline, and writes the proposed features to
``outputs/discovered_features.json`` (re-encoding through UTF-8 with
replacement to stay JSON-safe). It then prints a summary that includes the
feature count, a per-category breakdown, timing, the output path, and a
preview of the top discovered features. Side effects are the filesystem
write of the output JSON plus the transitive LLM/HTTP calls and symbol-index
read performed inside :func:`discover_features`.
Invoked by :func:`main` in this module's ``__main__`` guard and imported as
``async_main`` by
:func:`tools.feature_atlas.run_atlas.step_discover_features` (the
``discover-features`` step of the atlas runner); no other internal callers
were found.
Returns:
None.
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
t0 = time.time()
config = load_config()
features = await discover_features(config)
# Save output
_DISCOVERED_PATH.parent.mkdir(parents=True, exist_ok=True)
raw_json = json.dumps(features, indent=2, ensure_ascii=False)
raw_json = raw_json.encode("utf-8", errors="replace").decode("utf-8")
with open(_DISCOVERED_PATH, "w", encoding="utf-8") as f:
f.write(raw_json)
elapsed = time.time() - t0
# Summary
print(f"\n{'=' * 60}")
print(f" FEATURE DISCOVERY COMPLETE")
print(f"{'=' * 60}")
print(f" Features discovered: {len(features)}")
if features:
from collections import Counter
cats = Counter(f.get("category", "?") for f in features)
for cat, count in cats.most_common():
print(f" {cat}: {count}")
print(f" Time elapsed: {elapsed:.1f}s")
print(f" Output: {_DISCOVERED_PATH}")
print(f"{'=' * 60}\n")
# Show top features
if features:
print(" Top discovered features:")
for feat in features[:20]:
print(f" - {feat.get('id', '?')}: {feat.get('human_name', '?')}")
print(f" {feat.get('description', '')[:100]}")
if len(features) > 20:
print(f" ... and {len(features) - 20} more")
[docs]
def main() -> None:
"""Synchronous entry point for the feature-discovery step.
Configures root logging at INFO level and drives the async pipeline by
calling ``asyncio.run(async_main())``, which scans the symbol index,
runs the Gemini Flash discovery swarm, and writes
``outputs/discovered_features.json``. All Redis, LLM/HTTP proxy, and
filesystem side effects happen transitively inside
:func:`async_main` and :func:`discover_features`; this wrapper only sets
up logging and starts the event loop.
Invoked from the module's ``if __name__ == "__main__"`` guard via
``python -m tools.feature_atlas.discover_features``; no other internal
callers were found.
Returns:
None.
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
asyncio.run(async_main())
if __name__ == "__main__":
main()