"""Step 1b: Gemini Flash swarm for feature mapping.
Takes the repo symbol index from Step 1a and uses a Gemini Flash swarm
to map actual code evidence onto the canonical features defined in
``config.yaml``.
Architecture:
- MasterOrchestrator: coordinates the run
- N FeatureExtractor agents: map evidence onto features (parallel)
- 1 Curator agent: deduplicates, validates, normalizes
Uses local OpenAI-compatible proxy at localhost:3000 (no API keys needed).
Usage:
python -m tools.feature_atlas.extract_features_swarm
# skull fire -- THE SWARM NAMES THE ORGANS
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import sys
import time
from pathlib import Path
from typing import Any
import yaml
_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
logger = logging.getLogger(__name__)
_ATLAS_DIR = Path(__file__).resolve().parent
_CONFIG_PATH = _ATLAS_DIR / "config.yaml"
_SYMBOLS_PATH = _ATLAS_DIR / "outputs" / "repo_symbols.json"
_OUTPUT_PATH = _ATLAS_DIR / "outputs" / "feature_registry.json"
# 💀 Regex to strip LLM wrapper garbage before JSON parsing
import re
# Matches <thinking>...</thinking> blocks (greedy, dotall)
_THINKING_RE = re.compile(r"<thinking>.*?</thinking>", re.DOTALL | re.IGNORECASE)
# Matches <thought>...</thought> too (some models use this)
_THOUGHT_RE = re.compile(r"<thought>.*?</thought>", re.DOTALL | re.IGNORECASE)
# Matches any remaining XML-ish tags that aren't part of JSON
_XML_TAG_RE = re.compile(r"</?(?:thinking|thought|output|response|result|answer)[^>]*>", re.IGNORECASE)
def _scrub_llm_json(raw: str) -> str:
"""Strip LLM wrapper garbage to extract clean JSON.
Handles:
- <thinking>...</thinking> blocks (Gemini loves these)
- <thought>...</thought> variants
- ```json ... ``` markdown code fences (with or without language tag)
- Leading/trailing whitespace and newlines
- Stray XML tags around the JSON
Returns the cleaned string (should be valid JSON if the LLM cooperated).
"""
if not raw:
return ""
text = raw.strip()
# 1. Strip <thinking>...</thinking> and <thought>...</thought> blocks
text = _THINKING_RE.sub("", text)
text = _THOUGHT_RE.sub("", text)
# 2. Strip any remaining stray XML tags
text = _XML_TAG_RE.sub("", text)
text = text.strip()
# 3. Strip markdown code fences: ```json ... ``` or ``` ... ```
if text.startswith("```"):
# Remove opening fence (with optional language tag)
first_newline = text.find("\n")
if first_newline != -1:
text = text[first_newline + 1:]
else:
text = text[3:]
# Remove closing fence
if text.rstrip().endswith("```"):
text = text.rstrip()
text = text[:-3]
text = text.strip()
# 4. If there are MULTIPLE code fence blocks, extract the last JSON one
if "```" in text:
# Find all fenced blocks
blocks = re.findall(r"```(?:json)?\s*\n(.*?)```", text, re.DOTALL)
if blocks:
# Use the last block that looks like JSON
for block in reversed(blocks):
block = block.strip()
if block.startswith("[") or block.startswith("{"):
text = block
break
# 5. Final strip
text = text.strip()
# 6. If text still doesn't start with [ or {, try to find JSON in it
if text and not text.startswith(("[", "{")):
# Look for first [ or { and extract from there
bracket_idx = -1
for i, ch in enumerate(text):
if ch in ("[", "{"):
bracket_idx = i
break
if bracket_idx >= 0:
text = text[bracket_idx:]
return text
[docs]
def load_config() -> dict[str, Any]:
"""Load the Feature Atlas configuration from ``config.yaml``.
Reads and parses the atlas config that drives the whole swarm: the
``swarm`` block (Gemini model name, local proxy URL, temperature,
concurrency, retry budget) and the ``canonical_features`` list that every
extractor agent maps code evidence onto. This is a pure filesystem read of
``_CONFIG_PATH`` (``tools/feature_atlas/config.yaml``) parsed with
``yaml.safe_load``; it touches no Redis, knowledge graph, or network.
Called by this module's :func:`run_extraction` (when no config is passed)
and :func:`async_main`; a same-named helper also appears in the sibling
atlas scripts ``extract_repo_symbols.py`` and ``discover_features.py``,
each loading its own module copy.
Returns:
The parsed config as a dictionary, typically carrying ``swarm`` and
``canonical_features`` keys.
"""
with open(_CONFIG_PATH, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
[docs]
def load_symbols() -> list[dict[str, Any]]:
"""Load the repo symbol index produced by Step 1a.
Reads the per-file symbol records (classes, functions, constants, imports,
template/CSS/HTML metadata) that ``extract_repo_symbols.py`` writes to
``outputs/repo_symbols.json``; these records are the raw evidence the
swarm later maps onto canonical features. This is a pure filesystem read
of ``_SYMBOLS_PATH`` parsed with ``json.load``, with no Redis, knowledge
graph, or network access.
Called by this module's :func:`run_extraction`; a same-named helper also
exists in the sibling ``discover_features.py`` script.
Returns:
A list of per-file symbol record dictionaries.
Raises:
FileNotFoundError: If ``outputs/repo_symbols.json`` is missing,
signalling that ``extract_repo_symbols.py`` has not been run yet.
"""
if not _SYMBOLS_PATH.exists():
raise FileNotFoundError(
f"Repo symbols not found at {_SYMBOLS_PATH}. "
"Run extract_repo_symbols.py first."
)
with open(_SYMBOLS_PATH, "r", encoding="utf-8") as f:
return json.load(f)
async def _gemini_generate(
prompt: str,
config: dict[str, Any],
system_instruction: str | None = None,
) -> str:
"""Call Gemini Flash via local OpenAI-compatible proxy.
Uses the proxy at localhost:3000 (same as flash_dyadic_mirror.py).
OpenAI chat completions format, no API keys needed -- proxy handles auth.
Error handling modeled after anamnesis_engine.py:
- Exponential backoff: 5s base, doubling each retry (5, 10, 20, 40)
- Empty body / empty choices defense (proxy likes to 200-with-nothing)
- JSON decode defense
- Never raises -- returns empty string on total failure
"""
import httpx
swarm_cfg = config.get("swarm", {})
model = swarm_cfg.get("model", "gemini-3.1-flash-lite")
proxy_url = swarm_cfg.get("proxy_url", "http://localhost:3000/openai/chat/completions")
temperature = swarm_cfg.get("temperature", 0.2)
max_tokens = swarm_cfg.get("max_output_tokens", 8192)
# Exponential backoff config (matches anamnesis_engine pattern)
max_retries = swarm_cfg.get("retry_attempts", 5)
base_delay = 5.0 # seconds; doubles each retry: 5, 10, 20, 40
# Build messages in OpenAI chat format
messages: list[dict[str, str]] = []
if system_instruction:
messages.append({"role": "system", "content": system_instruction})
messages.append({"role": "user", "content": prompt})
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
}
async with httpx.AsyncClient(timeout=120.0) as client:
for attempt in range(max_retries):
delay = base_delay * (2 ** attempt)
try:
resp = await client.post(
proxy_url,
json=payload,
headers={"Content-Type": "application/json"},
)
# -- 429 rate limit --
if resp.status_code == 429:
logger.warning(
"Flash proxy 429, retrying in %.0fs (attempt %d/%d)",
delay, attempt + 1, max_retries,
)
await asyncio.sleep(delay)
continue
# -- Non-200 HTTP errors --
if resp.status_code != 200:
logger.warning(
"Flash proxy HTTP %d, retrying in %.0fs (attempt %d/%d)",
resp.status_code, delay, attempt + 1, max_retries,
)
if attempt < max_retries - 1:
await asyncio.sleep(delay)
continue
return ""
# -- Empty body defense (the exact bug that killed the run) --
raw_body = resp.text.strip()
if not raw_body:
logger.warning(
"Flash proxy returned 200 with EMPTY BODY "
"(attempt %d/%d, retrying in %.0fs)",
attempt + 1, max_retries, delay,
)
if attempt < max_retries - 1:
await asyncio.sleep(delay)
continue
return ""
# -- JSON decode defense --
try:
data = resp.json()
except Exception as json_err:
logger.warning(
"Flash proxy returned non-JSON body: %.200s "
"(attempt %d/%d, retrying in %.0fs)",
raw_body[:200], attempt + 1, max_retries, delay,
)
if attempt < max_retries - 1:
await asyncio.sleep(delay)
continue
return ""
# -- Error field in response --
if "error" in data:
logger.warning(
"Flash proxy error field: %s (attempt %d/%d)",
data["error"], attempt + 1, max_retries,
)
if attempt < max_retries - 1:
await asyncio.sleep(delay)
continue
return ""
# -- Empty choices defense --
choices = data.get("choices", [])
if not choices:
logger.warning(
"Flash proxy returned empty choices: %.500s "
"(attempt %d/%d, retrying in %.0fs)",
json.dumps(data)[:500],
attempt + 1, max_retries, delay,
)
if attempt < max_retries - 1:
await asyncio.sleep(delay)
continue
return ""
text = choices[0].get("message", {}).get("content", "")
if not text:
logger.warning(
"Flash proxy returned empty content "
"(attempt %d/%d, retrying in %.0fs)",
attempt + 1, max_retries, delay,
)
if attempt < max_retries - 1:
await asyncio.sleep(delay)
continue
return ""
return text
except httpx.ConnectError:
logger.warning(
"Flash proxy unreachable at %s "
"(attempt %d/%d, retrying in %.0fs)",
proxy_url, attempt + 1, max_retries, delay,
)
if attempt < max_retries - 1:
await asyncio.sleep(delay)
else:
return ""
except httpx.TimeoutException:
logger.warning(
"Flash proxy timeout (attempt %d/%d, retrying in %.0fs)",
attempt + 1, max_retries, delay,
)
if attempt < max_retries - 1:
await asyncio.sleep(delay)
else:
return ""
except Exception as e:
logger.warning(
"Flash proxy unexpected error: %s "
"(attempt %d/%d, retrying in %.0fs)",
e, attempt + 1, max_retries, delay,
)
if attempt < max_retries - 1:
await asyncio.sleep(delay)
else:
return ""
return ""
def _build_file_summary(symbol_record: dict[str, Any]) -> str:
"""Render one file's symbol record into a compact text summary for the LLM.
Flattens a single :func:`load_symbols` record into a few human-readable
lines so a Gemini Flash extractor can reason about the file without seeing
its source. It selectively pulls and truncates the fields that matter for
feature mapping: the file path and docstring, classes (with bases and
methods), functions, constants, project-internal imports (filtering out
stdlib and common third-party packages), env vars, and language-specific
metadata for YAML/JSON, Jinja2, TS/JS, CSS, and HTML files. This is pure
string assembly with no side effects.
Called only by :func:`_extract_features_for_group`, which joins the
summaries for every file in a directory group into one prompt.
Args:
symbol_record: A single per-file symbol record from
:func:`load_symbols`.
Returns:
A newline-joined summary string describing the file's notable symbols.
"""
parts = [f"File: {symbol_record['file']}"]
doc = symbol_record.get("docstring", "")
if doc:
parts.append(f" Docstring: {doc[:300]}")
classes = symbol_record.get("classes", [])
if classes:
for cls in classes[:10]:
methods = ", ".join(cls.get("methods", [])[:15])
bases = ", ".join(cls.get("bases", [])[:5])
parts.append(
f" Class: {cls['name']}"
f"{f' (extends {bases})' if bases else ''}"
f"{f' methods=[{methods}]' if methods else ''}"
)
functions = symbol_record.get("functions", [])
if functions:
func_names = ", ".join(f.get("name", "") for f in functions[:20])
parts.append(f" Functions: {func_names}")
constants = symbol_record.get("constants", [])
if constants:
parts.append(f" Constants: {', '.join(constants[:20])}")
imports = symbol_record.get("imports", [])
if imports:
# Only show project-internal imports
internal = [
i for i in imports
if not i.startswith(("os", "sys", "json", "re", "time", "logging",
"asyncio", "typing", "pathlib", "datetime",
"collections", "functools", "itertools",
"dataclasses", "abc", "enum", "copy",
"hashlib", "base64", "uuid", "textwrap",
"io", "struct", "math", "random",
"threading", "concurrent", "multiprocessing",
"http", "socket", "ssl", "email",
"urllib", "html", "xml",
"httpx", "aiohttp", "redis", "discord",
"numpy", "yaml", "pydantic", "fastapi",
"jinja2", "PIL", "openai"))
]
if internal:
parts.append(f" Internal imports: {', '.join(internal[:20])}")
env_vars = symbol_record.get("env_vars", [])
if env_vars:
parts.append(f" Env vars: {', '.join(env_vars[:10])}")
# YAML/JSON files
keys = symbol_record.get("top_level_keys", [])
if keys:
parts.append(f" Top-level keys: {', '.join(str(k) for k in keys[:20])}")
# Jinja2 templates
variables = symbol_record.get("variables", [])
if variables:
parts.append(f" Template vars: {', '.join(variables[:30])}")
blocks = symbol_record.get("blocks", [])
if blocks:
parts.append(f" Template blocks: {', '.join(blocks[:20])}")
# TS/JS files
exports = symbol_record.get("exports", [])
if exports:
parts.append(f" Exports: {', '.join(exports[:20])}")
interfaces = symbol_record.get("interfaces", [])
if interfaces:
parts.append(f" Interfaces: {', '.join(interfaces[:15])}")
types = symbol_record.get("types", [])
if types:
parts.append(f" Types: {', '.join(types[:15])}")
components = symbol_record.get("components", [])
if components:
parts.append(f" React Components: {', '.join(components[:15])}")
# CSS files
css_classes = symbol_record.get("classes", []) if symbol_record.get("type") == "css" else []
if css_classes:
parts.append(f" CSS Classes: {', '.join(css_classes[:30])}")
custom_props = symbol_record.get("custom_properties", [])
if custom_props:
parts.append(f" CSS Custom Properties: {', '.join(custom_props[:15])}")
keyframes = symbol_record.get("keyframes", [])
if keyframes:
parts.append(f" CSS Keyframes: {', '.join(keyframes[:10])}")
# HTML files
html_components = symbol_record.get("components", []) if symbol_record.get("type") == "html" else []
if html_components:
parts.append(f" HTML Components: {', '.join(html_components[:20])}")
ids = symbol_record.get("ids", [])
if ids:
parts.append(f" Element IDs: {', '.join(ids[:15])}")
return "\n".join(parts)
def _group_files_by_directory(
symbols: list[dict[str, Any]],
) -> dict[str, list[dict[str, Any]]]:
"""Bucket symbol records by their top-level directory.
Partitions the flat symbol list into per-directory groups so the swarm can
fan out one extractor agent per group, keeping each prompt scoped to a
coherent slice of the repo. Each record's ``file`` path is split on ``/``
and assigned to its first path segment; top-level files, and any first
segment that looks like a code file (matching the ``_CODE_EXTS`` suffixes)
rather than a directory, fall into the ``"root"`` bucket. This is pure
in-memory bucketing with no I/O.
Called once by :func:`run_extraction`, just before it spawns the
per-group extractor tasks.
Args:
symbols: The full list of per-file symbol records from
:func:`load_symbols`.
Returns:
A mapping of directory name (or ``"root"``) to the list of symbol
records that live under it.
"""
# Extensions that indicate a file, not a directory
_CODE_EXTS = {".py", ".ts", ".tsx", ".js", ".jsx", ".css", ".html",
".yaml", ".yml", ".j2", ".json"}
groups: dict[str, list[dict[str, Any]]] = {}
for record in symbols:
filepath = record.get("file", "")
parts = filepath.split("/")
# Group by first directory, or "root" for top-level files
if len(parts) > 1:
first = parts[0]
# Check it's actually a directory, not a dotfile
if not any(first.endswith(ext) for ext in _CODE_EXTS):
group = first
else:
group = "root"
else:
group = "root"
groups.setdefault(group, []).append(record)
return groups
async def _extract_features_for_group(
group_name: str,
group_files: list[dict[str, Any]],
canonical_features: list[dict[str, Any]],
config: dict[str, Any],
semaphore: asyncio.Semaphore,
) -> list[dict[str, Any]]:
"""Run one FeatureExtractor agent over a directory group of files.
This is the per-group worker of the swarm. Under the shared concurrency
``semaphore`` it builds a prompt from the group's file summaries (via
:func:`_build_file_summary`, capped at 40 files) and the canonical feature
list, then calls :func:`_gemini_generate` to invoke Gemini Flash through
the local OpenAI-compatible proxy (an HTTP/LLM side effect; no Redis or
knowledge-graph access). The model is asked to emit a JSON array of
feature-to-file mappings, which is scrubbed of wrapper text by
:func:`_scrub_llm_json` and parsed. The function is defensive end to end:
empty responses, non-JSON output, and non-list results are all logged and
coerced to an empty list so a single bad group never aborts the run.
Called by :func:`run_extraction`, which schedules one task per directory
group and gathers them concurrently.
Args:
group_name: The directory bucket name being processed (e.g. ``tools``
or ``root``).
group_files: The symbol records for files in this group.
canonical_features: The canonical feature definitions from config that
files may be mapped onto.
config: The atlas config, forwarded to :func:`_gemini_generate` for
model and proxy settings.
semaphore: The shared concurrency limiter bounding parallel proxy load.
Returns:
A list of mapping dictionaries (feature id, file, symbols, data
stores, evidence, confidence), or an empty list on any failure.
"""
async with semaphore:
# Build file summaries
file_summaries = "\n\n".join(
_build_file_summary(f) for f in group_files[:40]
)
# Build canonical feature list for the prompt
feature_list = "\n".join(
f"- {feat['id']}: {feat['human_name']} "
f"(category: {feat.get('category', '?')}, "
f"hint files: {', '.join(feat.get('hint_files', [])[:3])})"
for feat in canonical_features
)
prompt = f"""You are analyzing the Stargazer codebase to map code evidence onto canonical features.
Below are the files in the "{group_name}" directory/group with their extracted symbols:
{file_summaries}
Here are the {len(canonical_features)} canonical Stargazer features you need to map evidence onto:
{feature_list}
For each file that belongs to one or more of these features, output a JSON array of mappings.
Each mapping should have:
- "feature_id": the canonical feature ID
- "file": the file path
- "classes": list of class names in this file that belong to this feature
- "functions": list of function names that belong to this feature
- "constants": list of constants that belong to this feature
- "data_stores": list of data stores this file accesses (e.g. "falkordb:knowledge", "redis:0", "chromadb:spiral_goddess")
- "evidence": a brief sentence explaining why this file belongs to this feature
- "confidence": 0.0-1.0 confidence score
Rules:
- Only map files to features they genuinely belong to based on code evidence
- A file can map to multiple features
- Do NOT invent features not in the canonical list
- Do NOT map files that have no real connection to a feature
- Focus on classes, functions, imports, and data access patterns
- confidence should be HIGH (>0.8) when the file is clearly part of the feature
- confidence should be LOW (<0.5) when the connection is indirect
Return ONLY a JSON array of mapping objects. No markdown, no explanation."""
system_instruction = (
"You are a precise code analysis agent. "
"You output only valid JSON arrays. "
"You never hallucinate code symbols or file names."
)
try:
raw = await _gemini_generate(prompt, config, system_instruction)
# -- Empty response defense --
if not raw or not raw.strip():
logger.warning(
"Group '%s': LLM returned empty response for %d files "
"(proxy may be flaky, skipping group)",
group_name,
len(group_files),
)
return []
# Parse JSON from response -- scrub LLM wrapper garbage first
raw = _scrub_llm_json(raw)
if not raw:
logger.warning(
"Group '%s': LLM response was only wrapper tags/fences, no JSON content",
group_name,
)
return []
mappings = json.loads(raw)
if not isinstance(mappings, list):
logger.warning(
"Group '%s': LLM returned %s instead of list, skipping",
group_name,
type(mappings).__name__,
)
mappings = []
logger.info(
"Group '%s': %d mappings extracted from %d files",
group_name,
len(mappings),
len(group_files),
)
return mappings
except json.JSONDecodeError as e:
logger.warning(
"Group '%s': JSON parse failed (skipping): %s raw[:200]=%s",
group_name,
e,
repr(raw[:200]) if raw else "(empty)",
)
return []
except Exception as e:
logger.warning(
"Group '%s': unexpected error (skipping): %s",
group_name,
e,
)
return []
def _aggregate_mappings(
all_mappings: list[dict[str, Any]],
canonical_features: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Curate the raw extractor mappings into a deduplicated feature registry.
This is the Curator stage of the swarm. It seeds one registry shell per
canonical feature (carrying id, human name, category, and hint files), then
folds every mapping emitted by the :func:`_extract_features_for_group`
agents into the matching shell: union-ing files, symbols (classes,
functions, constants), and data stores; collecting distinct evidence
sentences; recording tool files (any path under ``tools/``) as external
tools; and keeping the maximum reported confidence per feature. Mappings
that reference an unknown feature id are dropped. This is pure in-memory
aggregation with no I/O.
Called once by :func:`run_extraction`, after all extractor groups have
been gathered.
Args:
all_mappings: The flattened list of mapping dictionaries from every
extractor group.
canonical_features: The canonical feature definitions used to seed the
registry shells.
Returns:
A list of feature registry dictionaries, one per canonical feature.
"""
# Build feature shells from config
feature_map: dict[str, dict[str, Any]] = {}
for feat in canonical_features:
feature_map[feat["id"]] = {
"id": feat["id"],
"human_name": feat["human_name"],
"category": feat.get("category", ""),
"description": "",
"files": list(feat.get("hint_files", [])),
"symbols": [],
"entrypoints": [],
"data_stores": [],
"external_tools": [],
"confidence": 0.0,
"evidence": [],
}
# Merge mappings into features
for mapping in all_mappings:
fid = mapping.get("feature_id", "")
if fid not in feature_map:
continue
feat = feature_map[fid]
filepath = mapping.get("file", "")
if filepath and filepath not in feat["files"]:
feat["files"].append(filepath)
for cls in mapping.get("classes", []):
if cls not in feat["symbols"]:
feat["symbols"].append(cls)
for func in mapping.get("functions", []):
if func not in feat["symbols"]:
feat["symbols"].append(func)
for const in mapping.get("constants", []):
if const not in feat["symbols"]:
feat["symbols"].append(const)
for ds in mapping.get("data_stores", []):
if ds not in feat["data_stores"]:
feat["data_stores"].append(ds)
evidence = mapping.get("evidence", "")
if evidence and evidence not in feat["evidence"]:
feat["evidence"].append(evidence)
# Track tool files
if filepath.startswith("tools/") and filepath not in feat["external_tools"]:
feat["external_tools"].append(filepath)
# Update confidence (take max)
conf = float(mapping.get("confidence", 0.0))
if conf > feat["confidence"]:
feat["confidence"] = conf
return list(feature_map.values())
async def _generate_descriptions(
features: list[dict[str, Any]],
config: dict[str, Any],
semaphore: asyncio.Semaphore,
) -> list[dict[str, Any]]:
"""Generate a technical description for every feature, concurrently.
The final enrichment stage of the swarm. For each curated feature it
schedules the nested :func:`_describe_one` coroutine, which acquires the
shared ``semaphore`` and calls Gemini Flash via :func:`_gemini_generate`
(the local OpenAI-compatible proxy) to write a 2-3 sentence description
onto the feature's ``description`` field in place. All per-feature tasks
run together under ``asyncio.gather``, so descriptions are produced in
parallel up to the concurrency limit; the only side effects are HTTP/LLM
calls and mutation of the passed-in feature dictionaries.
Called once by :func:`run_extraction`, after :func:`_aggregate_mappings`
has built the registry.
Args:
features: The curated feature registry records to describe (mutated in
place with a populated ``description`` field).
config: The atlas config, forwarded to :func:`_gemini_generate`.
semaphore: The shared concurrency limiter bounding parallel proxy load.
Returns:
The same list of feature dictionaries, each now carrying a generated
``description``.
"""
async def _describe_one(feat: dict[str, Any]) -> dict[str, Any]:
"""Generate and attach a technical description for one feature.
Closure over the enclosing ``config`` and ``semaphore`` from
:func:`_generate_descriptions`. Acquires the shared concurrency
``semaphore`` to bound parallel proxy load, builds a prompt from the
feature's id, files, symbols, data stores, and accumulated evidence,
then calls :func:`_gemini_generate` to invoke Gemini Flash through the
local OpenAI-compatible proxy (an HTTP/LLM side effect; no Redis or
knowledge-graph access). The returned text is stripped and written
back onto the feature's ``description`` key in place. On any failure
it logs a warning and falls back to a ``"<human_name> subsystem."``
placeholder so the pipeline never aborts.
This nested helper is invoked only by its enclosing function
:func:`_generate_descriptions` (one task per feature, gathered with
``asyncio.gather``); no other internal callers were found.
Args:
feat: The feature registry record to describe. Mutated in place:
its ``description`` field is set to the generated text (or the
fallback placeholder on error).
Returns:
The same ``feat`` dictionary, now carrying a populated
``description`` field.
"""
async with semaphore:
prompt = f"""Generate a concise 2-3 sentence technical description for this Stargazer subsystem.
Feature: {feat['id']} ({feat['human_name']})
Category: {feat['category']}
Files: {', '.join(feat['files'][:10])}
Key symbols: {', '.join(feat['symbols'][:15])}
Data stores: {', '.join(feat['data_stores'][:5])}
Evidence: {' | '.join(feat['evidence'][:5])}
Write a precise technical description of what this subsystem does.
Focus on its actual function, not marketing language.
Return ONLY the description text, no JSON, no markdown."""
try:
desc = await _gemini_generate(prompt, config)
feat["description"] = desc.strip()
except Exception as e:
logger.warning("Failed to generate description for %s: %s", feat["id"], e)
feat["description"] = f"{feat['human_name']} subsystem."
return feat
tasks = [_describe_one(f) for f in features]
return await asyncio.gather(*tasks)
[docs]
async def async_main() -> None:
"""Async entry point: run the swarm and persist the feature registry.
Drives a single end-to-end run for command-line use. It loads the config,
invokes :func:`run_extraction` to build the feature registry, writes the
result as pretty-printed JSON to ``outputs/feature_registry.json``
(``_OUTPUT_PATH``, creating the parent directory if needed), and prints a
summary banner with feature, confidence, file, symbol, and timing counts.
The filesystem write and stdout output are its only side effects.
Called by this module's :func:`main` through ``asyncio.run``; not invoked
by other modules (each atlas script defines its own ``async_main``).
"""
t0 = time.time()
config = load_config()
features = await run_extraction(config)
# Write output
_OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(_OUTPUT_PATH, "w", encoding="utf-8") as f:
json.dump(features, f, indent=2, ensure_ascii=False)
elapsed = time.time() - t0
# Summary
high_conf = sum(1 for f in features if f.get("confidence", 0) >= 0.7)
low_conf = sum(1 for f in features if f.get("confidence", 0) < 0.5)
total_files = sum(len(f.get("files", [])) for f in features)
total_symbols = sum(len(f.get("symbols", [])) for f in features)
print(f"\n{'=' * 60}")
print(f" FEATURE EXTRACTION COMPLETE")
print(f"{'=' * 60}")
print(f" Features mapped: {len(features)}")
print(f" High confidence: {high_conf}")
print(f" Low confidence: {low_conf}")
print(f" Total files mapped: {total_files}")
print(f" Total symbols found: {total_symbols}")
print(f" Time elapsed: {elapsed:.1f}s")
print(f" Output: {_OUTPUT_PATH}")
print(f"{'=' * 60}\n")
[docs]
def main() -> None:
"""Sync entry point: configure logging and run :func:`async_main`.
The console entry point for ``python -m
tools.feature_atlas.extract_features_swarm``. It sets up basic
INFO-level logging with a timestamped format, then drives the async
pipeline via ``asyncio.run(async_main())``. Side effects are limited to
global logging configuration and whatever :func:`async_main` performs.
Called by this module's ``if __name__ == "__main__"`` guard.
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
asyncio.run(async_main())
if __name__ == "__main__":
main()