Source code for tools.feature_atlas.extract_features_swarm

"""Step 1b: Gemini Flash swarm for feature mapping.

Takes the repo symbol index from Step 1a and uses a Gemini Flash swarm
to map actual code evidence onto the canonical features defined in
``config.yaml``.

Architecture:
  - MasterOrchestrator: coordinates the run
  - N FeatureExtractor agents: map evidence onto features (parallel)
  - 1 Curator agent: deduplicates, validates, normalizes

Uses local OpenAI-compatible proxy at localhost:3000 (no API keys needed).

Usage:
    python -m tools.feature_atlas.extract_features_swarm

# skull fire -- THE SWARM NAMES THE ORGANS
"""

from __future__ import annotations

import asyncio
import json
import logging
import os
import sys
import time
from pathlib import Path
from typing import Any

import yaml

_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(_PROJECT_ROOT))

logger = logging.getLogger(__name__)

_ATLAS_DIR = Path(__file__).resolve().parent
_CONFIG_PATH = _ATLAS_DIR / "config.yaml"
_SYMBOLS_PATH = _ATLAS_DIR / "outputs" / "repo_symbols.json"
_OUTPUT_PATH = _ATLAS_DIR / "outputs" / "feature_registry.json"

# 💀 Regex to strip LLM wrapper garbage before JSON parsing
import re

# Matches <thinking>...</thinking> blocks (greedy, dotall)
_THINKING_RE = re.compile(r"<thinking>.*?</thinking>", re.DOTALL | re.IGNORECASE)
# Matches <thought>...</thought> too (some models use this)
_THOUGHT_RE = re.compile(r"<thought>.*?</thought>", re.DOTALL | re.IGNORECASE)
# Matches any remaining XML-ish tags that aren't part of JSON
_XML_TAG_RE = re.compile(r"</?(?:thinking|thought|output|response|result|answer)[^>]*>", re.IGNORECASE)


def _scrub_llm_json(raw: str) -> str:
    """Strip LLM wrapper garbage to extract clean JSON.

    Handles:
    - <thinking>...</thinking> blocks (Gemini loves these)
    - <thought>...</thought> variants
    - ```json ... ``` markdown code fences (with or without language tag)
    - Leading/trailing whitespace and newlines
    - Stray XML tags around the JSON

    Returns the cleaned string (should be valid JSON if the LLM cooperated).
    """
    if not raw:
        return ""

    text = raw.strip()

    # 1. Strip <thinking>...</thinking> and <thought>...</thought> blocks
    text = _THINKING_RE.sub("", text)
    text = _THOUGHT_RE.sub("", text)

    # 2. Strip any remaining stray XML tags
    text = _XML_TAG_RE.sub("", text)

    text = text.strip()

    # 3. Strip markdown code fences: ```json ... ``` or ``` ... ```
    if text.startswith("```"):
        # Remove opening fence (with optional language tag)
        first_newline = text.find("\n")
        if first_newline != -1:
            text = text[first_newline + 1:]
        else:
            text = text[3:]
        # Remove closing fence
        if text.rstrip().endswith("```"):
            text = text.rstrip()
            text = text[:-3]
        text = text.strip()

    # 4. If there are MULTIPLE code fence blocks, extract the last JSON one
    if "```" in text:
        # Find all fenced blocks
        blocks = re.findall(r"```(?:json)?\s*\n(.*?)```", text, re.DOTALL)
        if blocks:
            # Use the last block that looks like JSON
            for block in reversed(blocks):
                block = block.strip()
                if block.startswith("[") or block.startswith("{"):
                    text = block
                    break

    # 5. Final strip
    text = text.strip()

    # 6. If text still doesn't start with [ or {, try to find JSON in it
    if text and not text.startswith(("[", "{")):
        # Look for first [ or { and extract from there
        bracket_idx = -1
        for i, ch in enumerate(text):
            if ch in ("[", "{"):
                bracket_idx = i
                break
        if bracket_idx >= 0:
            text = text[bracket_idx:]

    return text

[docs] def load_config() -> dict[str, Any]: """Load the Feature Atlas configuration from ``config.yaml``. Reads and parses the atlas config that drives the whole swarm: the ``swarm`` block (Gemini model name, local proxy URL, temperature, concurrency, retry budget) and the ``canonical_features`` list that every extractor agent maps code evidence onto. This is a pure filesystem read of ``_CONFIG_PATH`` (``tools/feature_atlas/config.yaml``) parsed with ``yaml.safe_load``; it touches no Redis, knowledge graph, or network. Called by this module's :func:`run_extraction` (when no config is passed) and :func:`async_main`; a same-named helper also appears in the sibling atlas scripts ``extract_repo_symbols.py`` and ``discover_features.py``, each loading its own module copy. Returns: The parsed config as a dictionary, typically carrying ``swarm`` and ``canonical_features`` keys. """ with open(_CONFIG_PATH, "r", encoding="utf-8") as f: return yaml.safe_load(f)
[docs] def load_symbols() -> list[dict[str, Any]]: """Load the repo symbol index produced by Step 1a. Reads the per-file symbol records (classes, functions, constants, imports, template/CSS/HTML metadata) that ``extract_repo_symbols.py`` writes to ``outputs/repo_symbols.json``; these records are the raw evidence the swarm later maps onto canonical features. This is a pure filesystem read of ``_SYMBOLS_PATH`` parsed with ``json.load``, with no Redis, knowledge graph, or network access. Called by this module's :func:`run_extraction`; a same-named helper also exists in the sibling ``discover_features.py`` script. Returns: A list of per-file symbol record dictionaries. Raises: FileNotFoundError: If ``outputs/repo_symbols.json`` is missing, signalling that ``extract_repo_symbols.py`` has not been run yet. """ if not _SYMBOLS_PATH.exists(): raise FileNotFoundError( f"Repo symbols not found at {_SYMBOLS_PATH}. " "Run extract_repo_symbols.py first." ) with open(_SYMBOLS_PATH, "r", encoding="utf-8") as f: return json.load(f)
async def _gemini_generate( prompt: str, config: dict[str, Any], system_instruction: str | None = None, ) -> str: """Call Gemini Flash via local OpenAI-compatible proxy. Uses the proxy at localhost:3000 (same as flash_dyadic_mirror.py). OpenAI chat completions format, no API keys needed -- proxy handles auth. Error handling modeled after anamnesis_engine.py: - Exponential backoff: 5s base, doubling each retry (5, 10, 20, 40) - Empty body / empty choices defense (proxy likes to 200-with-nothing) - JSON decode defense - Never raises -- returns empty string on total failure """ import httpx swarm_cfg = config.get("swarm", {}) model = swarm_cfg.get("model", "gemini-3.1-flash-lite") proxy_url = swarm_cfg.get("proxy_url", "http://localhost:3000/openai/chat/completions") temperature = swarm_cfg.get("temperature", 0.2) max_tokens = swarm_cfg.get("max_output_tokens", 8192) # Exponential backoff config (matches anamnesis_engine pattern) max_retries = swarm_cfg.get("retry_attempts", 5) base_delay = 5.0 # seconds; doubles each retry: 5, 10, 20, 40 # Build messages in OpenAI chat format messages: list[dict[str, str]] = [] if system_instruction: messages.append({"role": "system", "content": system_instruction}) messages.append({"role": "user", "content": prompt}) payload = { "model": model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, } async with httpx.AsyncClient(timeout=120.0) as client: for attempt in range(max_retries): delay = base_delay * (2 ** attempt) try: resp = await client.post( proxy_url, json=payload, headers={"Content-Type": "application/json"}, ) # -- 429 rate limit -- if resp.status_code == 429: logger.warning( "Flash proxy 429, retrying in %.0fs (attempt %d/%d)", delay, attempt + 1, max_retries, ) await asyncio.sleep(delay) continue # -- Non-200 HTTP errors -- if resp.status_code != 200: logger.warning( "Flash proxy HTTP %d, retrying in %.0fs (attempt %d/%d)", resp.status_code, delay, attempt + 1, max_retries, ) if attempt < max_retries - 1: await asyncio.sleep(delay) continue return "" # -- Empty body defense (the exact bug that killed the run) -- raw_body = resp.text.strip() if not raw_body: logger.warning( "Flash proxy returned 200 with EMPTY BODY " "(attempt %d/%d, retrying in %.0fs)", attempt + 1, max_retries, delay, ) if attempt < max_retries - 1: await asyncio.sleep(delay) continue return "" # -- JSON decode defense -- try: data = resp.json() except Exception as json_err: logger.warning( "Flash proxy returned non-JSON body: %.200s " "(attempt %d/%d, retrying in %.0fs)", raw_body[:200], attempt + 1, max_retries, delay, ) if attempt < max_retries - 1: await asyncio.sleep(delay) continue return "" # -- Error field in response -- if "error" in data: logger.warning( "Flash proxy error field: %s (attempt %d/%d)", data["error"], attempt + 1, max_retries, ) if attempt < max_retries - 1: await asyncio.sleep(delay) continue return "" # -- Empty choices defense -- choices = data.get("choices", []) if not choices: logger.warning( "Flash proxy returned empty choices: %.500s " "(attempt %d/%d, retrying in %.0fs)", json.dumps(data)[:500], attempt + 1, max_retries, delay, ) if attempt < max_retries - 1: await asyncio.sleep(delay) continue return "" text = choices[0].get("message", {}).get("content", "") if not text: logger.warning( "Flash proxy returned empty content " "(attempt %d/%d, retrying in %.0fs)", attempt + 1, max_retries, delay, ) if attempt < max_retries - 1: await asyncio.sleep(delay) continue return "" return text except httpx.ConnectError: logger.warning( "Flash proxy unreachable at %s " "(attempt %d/%d, retrying in %.0fs)", proxy_url, attempt + 1, max_retries, delay, ) if attempt < max_retries - 1: await asyncio.sleep(delay) else: return "" except httpx.TimeoutException: logger.warning( "Flash proxy timeout (attempt %d/%d, retrying in %.0fs)", attempt + 1, max_retries, delay, ) if attempt < max_retries - 1: await asyncio.sleep(delay) else: return "" except Exception as e: logger.warning( "Flash proxy unexpected error: %s " "(attempt %d/%d, retrying in %.0fs)", e, attempt + 1, max_retries, delay, ) if attempt < max_retries - 1: await asyncio.sleep(delay) else: return "" return "" def _build_file_summary(symbol_record: dict[str, Any]) -> str: """Render one file's symbol record into a compact text summary for the LLM. Flattens a single :func:`load_symbols` record into a few human-readable lines so a Gemini Flash extractor can reason about the file without seeing its source. It selectively pulls and truncates the fields that matter for feature mapping: the file path and docstring, classes (with bases and methods), functions, constants, project-internal imports (filtering out stdlib and common third-party packages), env vars, and language-specific metadata for YAML/JSON, Jinja2, TS/JS, CSS, and HTML files. This is pure string assembly with no side effects. Called only by :func:`_extract_features_for_group`, which joins the summaries for every file in a directory group into one prompt. Args: symbol_record: A single per-file symbol record from :func:`load_symbols`. Returns: A newline-joined summary string describing the file's notable symbols. """ parts = [f"File: {symbol_record['file']}"] doc = symbol_record.get("docstring", "") if doc: parts.append(f" Docstring: {doc[:300]}") classes = symbol_record.get("classes", []) if classes: for cls in classes[:10]: methods = ", ".join(cls.get("methods", [])[:15]) bases = ", ".join(cls.get("bases", [])[:5]) parts.append( f" Class: {cls['name']}" f"{f' (extends {bases})' if bases else ''}" f"{f' methods=[{methods}]' if methods else ''}" ) functions = symbol_record.get("functions", []) if functions: func_names = ", ".join(f.get("name", "") for f in functions[:20]) parts.append(f" Functions: {func_names}") constants = symbol_record.get("constants", []) if constants: parts.append(f" Constants: {', '.join(constants[:20])}") imports = symbol_record.get("imports", []) if imports: # Only show project-internal imports internal = [ i for i in imports if not i.startswith(("os", "sys", "json", "re", "time", "logging", "asyncio", "typing", "pathlib", "datetime", "collections", "functools", "itertools", "dataclasses", "abc", "enum", "copy", "hashlib", "base64", "uuid", "textwrap", "io", "struct", "math", "random", "threading", "concurrent", "multiprocessing", "http", "socket", "ssl", "email", "urllib", "html", "xml", "httpx", "aiohttp", "redis", "discord", "numpy", "yaml", "pydantic", "fastapi", "jinja2", "PIL", "openai")) ] if internal: parts.append(f" Internal imports: {', '.join(internal[:20])}") env_vars = symbol_record.get("env_vars", []) if env_vars: parts.append(f" Env vars: {', '.join(env_vars[:10])}") # YAML/JSON files keys = symbol_record.get("top_level_keys", []) if keys: parts.append(f" Top-level keys: {', '.join(str(k) for k in keys[:20])}") # Jinja2 templates variables = symbol_record.get("variables", []) if variables: parts.append(f" Template vars: {', '.join(variables[:30])}") blocks = symbol_record.get("blocks", []) if blocks: parts.append(f" Template blocks: {', '.join(blocks[:20])}") # TS/JS files exports = symbol_record.get("exports", []) if exports: parts.append(f" Exports: {', '.join(exports[:20])}") interfaces = symbol_record.get("interfaces", []) if interfaces: parts.append(f" Interfaces: {', '.join(interfaces[:15])}") types = symbol_record.get("types", []) if types: parts.append(f" Types: {', '.join(types[:15])}") components = symbol_record.get("components", []) if components: parts.append(f" React Components: {', '.join(components[:15])}") # CSS files css_classes = symbol_record.get("classes", []) if symbol_record.get("type") == "css" else [] if css_classes: parts.append(f" CSS Classes: {', '.join(css_classes[:30])}") custom_props = symbol_record.get("custom_properties", []) if custom_props: parts.append(f" CSS Custom Properties: {', '.join(custom_props[:15])}") keyframes = symbol_record.get("keyframes", []) if keyframes: parts.append(f" CSS Keyframes: {', '.join(keyframes[:10])}") # HTML files html_components = symbol_record.get("components", []) if symbol_record.get("type") == "html" else [] if html_components: parts.append(f" HTML Components: {', '.join(html_components[:20])}") ids = symbol_record.get("ids", []) if ids: parts.append(f" Element IDs: {', '.join(ids[:15])}") return "\n".join(parts) def _group_files_by_directory( symbols: list[dict[str, Any]], ) -> dict[str, list[dict[str, Any]]]: """Bucket symbol records by their top-level directory. Partitions the flat symbol list into per-directory groups so the swarm can fan out one extractor agent per group, keeping each prompt scoped to a coherent slice of the repo. Each record's ``file`` path is split on ``/`` and assigned to its first path segment; top-level files, and any first segment that looks like a code file (matching the ``_CODE_EXTS`` suffixes) rather than a directory, fall into the ``"root"`` bucket. This is pure in-memory bucketing with no I/O. Called once by :func:`run_extraction`, just before it spawns the per-group extractor tasks. Args: symbols: The full list of per-file symbol records from :func:`load_symbols`. Returns: A mapping of directory name (or ``"root"``) to the list of symbol records that live under it. """ # Extensions that indicate a file, not a directory _CODE_EXTS = {".py", ".ts", ".tsx", ".js", ".jsx", ".css", ".html", ".yaml", ".yml", ".j2", ".json"} groups: dict[str, list[dict[str, Any]]] = {} for record in symbols: filepath = record.get("file", "") parts = filepath.split("/") # Group by first directory, or "root" for top-level files if len(parts) > 1: first = parts[0] # Check it's actually a directory, not a dotfile if not any(first.endswith(ext) for ext in _CODE_EXTS): group = first else: group = "root" else: group = "root" groups.setdefault(group, []).append(record) return groups async def _extract_features_for_group( group_name: str, group_files: list[dict[str, Any]], canonical_features: list[dict[str, Any]], config: dict[str, Any], semaphore: asyncio.Semaphore, ) -> list[dict[str, Any]]: """Run one FeatureExtractor agent over a directory group of files. This is the per-group worker of the swarm. Under the shared concurrency ``semaphore`` it builds a prompt from the group's file summaries (via :func:`_build_file_summary`, capped at 40 files) and the canonical feature list, then calls :func:`_gemini_generate` to invoke Gemini Flash through the local OpenAI-compatible proxy (an HTTP/LLM side effect; no Redis or knowledge-graph access). The model is asked to emit a JSON array of feature-to-file mappings, which is scrubbed of wrapper text by :func:`_scrub_llm_json` and parsed. The function is defensive end to end: empty responses, non-JSON output, and non-list results are all logged and coerced to an empty list so a single bad group never aborts the run. Called by :func:`run_extraction`, which schedules one task per directory group and gathers them concurrently. Args: group_name: The directory bucket name being processed (e.g. ``tools`` or ``root``). group_files: The symbol records for files in this group. canonical_features: The canonical feature definitions from config that files may be mapped onto. config: The atlas config, forwarded to :func:`_gemini_generate` for model and proxy settings. semaphore: The shared concurrency limiter bounding parallel proxy load. Returns: A list of mapping dictionaries (feature id, file, symbols, data stores, evidence, confidence), or an empty list on any failure. """ async with semaphore: # Build file summaries file_summaries = "\n\n".join( _build_file_summary(f) for f in group_files[:40] ) # Build canonical feature list for the prompt feature_list = "\n".join( f"- {feat['id']}: {feat['human_name']} " f"(category: {feat.get('category', '?')}, " f"hint files: {', '.join(feat.get('hint_files', [])[:3])})" for feat in canonical_features ) prompt = f"""You are analyzing the Stargazer codebase to map code evidence onto canonical features. Below are the files in the "{group_name}" directory/group with their extracted symbols: {file_summaries} Here are the {len(canonical_features)} canonical Stargazer features you need to map evidence onto: {feature_list} For each file that belongs to one or more of these features, output a JSON array of mappings. Each mapping should have: - "feature_id": the canonical feature ID - "file": the file path - "classes": list of class names in this file that belong to this feature - "functions": list of function names that belong to this feature - "constants": list of constants that belong to this feature - "data_stores": list of data stores this file accesses (e.g. "falkordb:knowledge", "redis:0", "chromadb:spiral_goddess") - "evidence": a brief sentence explaining why this file belongs to this feature - "confidence": 0.0-1.0 confidence score Rules: - Only map files to features they genuinely belong to based on code evidence - A file can map to multiple features - Do NOT invent features not in the canonical list - Do NOT map files that have no real connection to a feature - Focus on classes, functions, imports, and data access patterns - confidence should be HIGH (>0.8) when the file is clearly part of the feature - confidence should be LOW (<0.5) when the connection is indirect Return ONLY a JSON array of mapping objects. No markdown, no explanation.""" system_instruction = ( "You are a precise code analysis agent. " "You output only valid JSON arrays. " "You never hallucinate code symbols or file names." ) try: raw = await _gemini_generate(prompt, config, system_instruction) # -- Empty response defense -- if not raw or not raw.strip(): logger.warning( "Group '%s': LLM returned empty response for %d files " "(proxy may be flaky, skipping group)", group_name, len(group_files), ) return [] # Parse JSON from response -- scrub LLM wrapper garbage first raw = _scrub_llm_json(raw) if not raw: logger.warning( "Group '%s': LLM response was only wrapper tags/fences, no JSON content", group_name, ) return [] mappings = json.loads(raw) if not isinstance(mappings, list): logger.warning( "Group '%s': LLM returned %s instead of list, skipping", group_name, type(mappings).__name__, ) mappings = [] logger.info( "Group '%s': %d mappings extracted from %d files", group_name, len(mappings), len(group_files), ) return mappings except json.JSONDecodeError as e: logger.warning( "Group '%s': JSON parse failed (skipping): %s raw[:200]=%s", group_name, e, repr(raw[:200]) if raw else "(empty)", ) return [] except Exception as e: logger.warning( "Group '%s': unexpected error (skipping): %s", group_name, e, ) return [] def _aggregate_mappings( all_mappings: list[dict[str, Any]], canonical_features: list[dict[str, Any]], ) -> list[dict[str, Any]]: """Curate the raw extractor mappings into a deduplicated feature registry. This is the Curator stage of the swarm. It seeds one registry shell per canonical feature (carrying id, human name, category, and hint files), then folds every mapping emitted by the :func:`_extract_features_for_group` agents into the matching shell: union-ing files, symbols (classes, functions, constants), and data stores; collecting distinct evidence sentences; recording tool files (any path under ``tools/``) as external tools; and keeping the maximum reported confidence per feature. Mappings that reference an unknown feature id are dropped. This is pure in-memory aggregation with no I/O. Called once by :func:`run_extraction`, after all extractor groups have been gathered. Args: all_mappings: The flattened list of mapping dictionaries from every extractor group. canonical_features: The canonical feature definitions used to seed the registry shells. Returns: A list of feature registry dictionaries, one per canonical feature. """ # Build feature shells from config feature_map: dict[str, dict[str, Any]] = {} for feat in canonical_features: feature_map[feat["id"]] = { "id": feat["id"], "human_name": feat["human_name"], "category": feat.get("category", ""), "description": "", "files": list(feat.get("hint_files", [])), "symbols": [], "entrypoints": [], "data_stores": [], "external_tools": [], "confidence": 0.0, "evidence": [], } # Merge mappings into features for mapping in all_mappings: fid = mapping.get("feature_id", "") if fid not in feature_map: continue feat = feature_map[fid] filepath = mapping.get("file", "") if filepath and filepath not in feat["files"]: feat["files"].append(filepath) for cls in mapping.get("classes", []): if cls not in feat["symbols"]: feat["symbols"].append(cls) for func in mapping.get("functions", []): if func not in feat["symbols"]: feat["symbols"].append(func) for const in mapping.get("constants", []): if const not in feat["symbols"]: feat["symbols"].append(const) for ds in mapping.get("data_stores", []): if ds not in feat["data_stores"]: feat["data_stores"].append(ds) evidence = mapping.get("evidence", "") if evidence and evidence not in feat["evidence"]: feat["evidence"].append(evidence) # Track tool files if filepath.startswith("tools/") and filepath not in feat["external_tools"]: feat["external_tools"].append(filepath) # Update confidence (take max) conf = float(mapping.get("confidence", 0.0)) if conf > feat["confidence"]: feat["confidence"] = conf return list(feature_map.values()) async def _generate_descriptions( features: list[dict[str, Any]], config: dict[str, Any], semaphore: asyncio.Semaphore, ) -> list[dict[str, Any]]: """Generate a technical description for every feature, concurrently. The final enrichment stage of the swarm. For each curated feature it schedules the nested :func:`_describe_one` coroutine, which acquires the shared ``semaphore`` and calls Gemini Flash via :func:`_gemini_generate` (the local OpenAI-compatible proxy) to write a 2-3 sentence description onto the feature's ``description`` field in place. All per-feature tasks run together under ``asyncio.gather``, so descriptions are produced in parallel up to the concurrency limit; the only side effects are HTTP/LLM calls and mutation of the passed-in feature dictionaries. Called once by :func:`run_extraction`, after :func:`_aggregate_mappings` has built the registry. Args: features: The curated feature registry records to describe (mutated in place with a populated ``description`` field). config: The atlas config, forwarded to :func:`_gemini_generate`. semaphore: The shared concurrency limiter bounding parallel proxy load. Returns: The same list of feature dictionaries, each now carrying a generated ``description``. """ async def _describe_one(feat: dict[str, Any]) -> dict[str, Any]: """Generate and attach a technical description for one feature. Closure over the enclosing ``config`` and ``semaphore`` from :func:`_generate_descriptions`. Acquires the shared concurrency ``semaphore`` to bound parallel proxy load, builds a prompt from the feature's id, files, symbols, data stores, and accumulated evidence, then calls :func:`_gemini_generate` to invoke Gemini Flash through the local OpenAI-compatible proxy (an HTTP/LLM side effect; no Redis or knowledge-graph access). The returned text is stripped and written back onto the feature's ``description`` key in place. On any failure it logs a warning and falls back to a ``"<human_name> subsystem."`` placeholder so the pipeline never aborts. This nested helper is invoked only by its enclosing function :func:`_generate_descriptions` (one task per feature, gathered with ``asyncio.gather``); no other internal callers were found. Args: feat: The feature registry record to describe. Mutated in place: its ``description`` field is set to the generated text (or the fallback placeholder on error). Returns: The same ``feat`` dictionary, now carrying a populated ``description`` field. """ async with semaphore: prompt = f"""Generate a concise 2-3 sentence technical description for this Stargazer subsystem. Feature: {feat['id']} ({feat['human_name']}) Category: {feat['category']} Files: {', '.join(feat['files'][:10])} Key symbols: {', '.join(feat['symbols'][:15])} Data stores: {', '.join(feat['data_stores'][:5])} Evidence: {' | '.join(feat['evidence'][:5])} Write a precise technical description of what this subsystem does. Focus on its actual function, not marketing language. Return ONLY the description text, no JSON, no markdown.""" try: desc = await _gemini_generate(prompt, config) feat["description"] = desc.strip() except Exception as e: logger.warning("Failed to generate description for %s: %s", feat["id"], e) feat["description"] = f"{feat['human_name']} subsystem." return feat tasks = [_describe_one(f) for f in features] return await asyncio.gather(*tasks)
[docs] async def run_extraction(config: dict[str, Any] | None = None) -> list[dict[str, Any]]: """Run the full Step 1b feature-extraction swarm end to end. Orchestrates every stage in order: loads config (via :func:`load_config` when none is passed) and the symbol index (:func:`load_symbols`), buckets files by directory (:func:`_group_files_by_directory`), then fans out one :func:`_extract_features_for_group` extractor task per group under a shared ``asyncio.Semaphore`` and gathers them with ``return_exceptions=True`` so a failed group is logged rather than fatal. The collected mappings are folded into a registry by :func:`_aggregate_mappings` and enriched with LLM-written blurbs by :func:`_generate_descriptions`. Side effects are confined to the filesystem reads and the Gemini Flash HTTP/LLM calls those helpers make; no Redis or knowledge-graph access. Called by this module's :func:`async_main`. A same-named coroutine exists in other modules (``build_kg.py``, ``memories_port/import_memories.py``) but those are unrelated functions with different signatures. Args: config: An optional pre-loaded atlas config; when ``None`` it is loaded from ``config.yaml`` via :func:`load_config`. Returns: The feature registry as a list of feature dictionaries, each with files, symbols, data stores, confidence, evidence, and a generated description. Raises: ValueError: If the config defines no ``canonical_features``. """ if config is None: config = load_config() symbols = load_symbols() canonical_features = config.get("canonical_features", []) if not canonical_features: raise ValueError("No canonical_features defined in config.yaml") logger.info( "Starting feature extraction: %d files, %d canonical features", len(symbols), len(canonical_features), ) swarm_cfg = config.get("swarm", {}) max_concurrency = swarm_cfg.get("max_concurrency", 10) semaphore = asyncio.Semaphore(max_concurrency) # Group files by directory groups = _group_files_by_directory(symbols) logger.info("File groups: %s", {k: len(v) for k, v in groups.items()}) # Run FeatureExtractor agents in parallel tasks = [ _extract_features_for_group( group_name, group_files, canonical_features, config, semaphore ) for group_name, group_files in groups.items() ] results = await asyncio.gather(*tasks, return_exceptions=True) all_mappings: list[dict[str, Any]] = [] for result in results: if isinstance(result, list): all_mappings.extend(result) elif isinstance(result, Exception): logger.warning("Feature extraction group failed: %s", result) logger.info("Total mappings collected: %d", len(all_mappings)) # Curator: aggregate into feature registry features = _aggregate_mappings(all_mappings, canonical_features) # Generate descriptions features = await _generate_descriptions(features, config, semaphore) return features
[docs] async def async_main() -> None: """Async entry point: run the swarm and persist the feature registry. Drives a single end-to-end run for command-line use. It loads the config, invokes :func:`run_extraction` to build the feature registry, writes the result as pretty-printed JSON to ``outputs/feature_registry.json`` (``_OUTPUT_PATH``, creating the parent directory if needed), and prints a summary banner with feature, confidence, file, symbol, and timing counts. The filesystem write and stdout output are its only side effects. Called by this module's :func:`main` through ``asyncio.run``; not invoked by other modules (each atlas script defines its own ``async_main``). """ t0 = time.time() config = load_config() features = await run_extraction(config) # Write output _OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) with open(_OUTPUT_PATH, "w", encoding="utf-8") as f: json.dump(features, f, indent=2, ensure_ascii=False) elapsed = time.time() - t0 # Summary high_conf = sum(1 for f in features if f.get("confidence", 0) >= 0.7) low_conf = sum(1 for f in features if f.get("confidence", 0) < 0.5) total_files = sum(len(f.get("files", [])) for f in features) total_symbols = sum(len(f.get("symbols", [])) for f in features) print(f"\n{'=' * 60}") print(f" FEATURE EXTRACTION COMPLETE") print(f"{'=' * 60}") print(f" Features mapped: {len(features)}") print(f" High confidence: {high_conf}") print(f" Low confidence: {low_conf}") print(f" Total files mapped: {total_files}") print(f" Total symbols found: {total_symbols}") print(f" Time elapsed: {elapsed:.1f}s") print(f" Output: {_OUTPUT_PATH}") print(f"{'=' * 60}\n")
[docs] def main() -> None: """Sync entry point: configure logging and run :func:`async_main`. The console entry point for ``python -m tools.feature_atlas.extract_features_swarm``. It sets up basic INFO-level logging with a timestamped format, then drives the async pipeline via ``asyncio.run(async_main())``. Side effects are limited to global logging configuration and whatever :func:`async_main` performs. Called by this module's ``if __name__ == "__main__"`` guard. """ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) asyncio.run(async_main())
if __name__ == "__main__": main()