Source code for tools.feature_atlas.run_atlas

"""Master orchestrator for the Feature Interaction Atlas pipeline.

Runs all steps in order, or a single step via --step.

Usage:
    # Full pipeline
    python -m tools.feature_atlas.run_atlas

    # Individual steps
    python -m tools.feature_atlas.run_atlas --step extract-symbols
    python -m tools.feature_atlas.run_atlas --step discover-features
    python -m tools.feature_atlas.run_atlas --step extract-features
    python -m tools.feature_atlas.run_atlas --step load-features
    python -m tools.feature_atlas.run_atlas --step detect-interactions
    python -m tools.feature_atlas.run_atlas --step generate-prompts
    python -m tools.feature_atlas.run_atlas --step analyze-top50
    python -m tools.feature_atlas.run_atlas --step import-analyses
    python -m tools.feature_atlas.run_atlas --step export-demo

    # Discover features from unknown codebase (no canonical list needed)
    python -m tools.feature_atlas.run_atlas --discover-features

    # Query CLI
    python -m tools.feature_atlas.run_atlas --step query

# fire skull spider infinity heart -- THE FULL BODYGRAPH PIPELINE
"""

from __future__ import annotations

import argparse
import asyncio
import logging
import sys
import time
from pathlib import Path

_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(_PROJECT_ROOT))

logger = logging.getLogger(__name__)

VALID_STEPS = [
    "extract-symbols",
    "discover-features",
    "extract-features",
    "load-features",
    "detect-interactions",
    "generate-prompts",
    "analyze-top50",
    "import-analyses",
    "export-demo",
    "query",
    "all",
]

# Steps that do NOT require FalkorDB (can run offline)
OFFLINE_STEPS = {"extract-symbols", "extract-features", "discover-features"}

# Steps that require the Gemini API
GEMINI_STEPS = {"extract-features", "analyze-top50", "discover-features"}


def _banner() -> None:
    """Print the ASCII art banner for the Feature Interaction Atlas pipeline.

    Writes the multi-line "Bodygraph Demo" header straight to stdout with
    ``print`` as a purely cosmetic side effect; it touches no Redis streams,
    FalkorDB, the knowledge graph, or any LLM, and returns nothing.

    Called by ``run_pipeline`` in this module at the start of every full or
    partial run so the operator sees the title block before steps execute. Note
    that the sibling ``query_atlas.py`` CLI uses its own ``print_banner`` helper,
    which is unrelated to this function.
    """
    print("""
+======================================================================+
|  STARGAZER FEATURE INTERACTION ATLAS v0                              |
|  "The Bodygraph Demo"                                                |
|                                                                      |
|  Stargazer inspects her own codebase, identifies her major organs,   |
|  maps which organs touch, and shows the highest-risk/highest-value   |
|  feature interactions.                                               |
+======================================================================+
""")


[docs] async def step_extract_symbols() -> None: """Run pipeline step 1a: AST-based repo symbol extraction. Prints a progress header to stdout and then delegates to the synchronous ``main`` of ``tools.feature_atlas.extract_repo_symbols``, which walks the repository with Python's ``ast`` module and writes the symbol index to disk. This step is offline (no FalkorDB or Gemini) and is the first stage that feeds every downstream feature-mapping step. The module is imported lazily inside the function so a partial run that skips this step never pays its import cost. Called by ``run_pipeline`` via the ``STEP_MAP``/``PIPELINE_ORDER`` dispatch in this module; it is not invoked directly elsewhere. """ print("\n>>> Step 1a: Extracting repo symbols (AST)...\n") from tools.feature_atlas.extract_repo_symbols import main as run run()
[docs] async def step_extract_features() -> None: """Run pipeline step 1b: the Gemini Flash feature-mapping swarm. Prints a progress header and awaits the ``async_main`` of ``tools.feature_atlas.extract_features_swarm``, which fans symbol clusters out to Gemini Flash to map source symbols onto canonical features. This is a Gemini-dependent step (see ``GEMINI_STEPS``) that requires the extracted symbol index from step 1a; the swarm module is imported lazily on demand. Called by ``run_pipeline`` via the ``STEP_MAP``/``PIPELINE_ORDER`` dispatch; not invoked directly elsewhere. """ print("\n>>> Step 1b: Running feature extraction swarm (Gemini Flash)...\n") from tools.feature_atlas.extract_features_swarm import async_main as run await run()
[docs] async def step_discover_features() -> None: """Run pipeline step 1c: discover features from an unknown codebase. Prints a progress header and awaits the ``async_main`` of ``tools.feature_atlas.discover_features``, which runs the "inhale protocol": instead of mapping symbols onto a known canonical feature list, it lets Gemini propose the feature set directly from the symbol index. This is a Gemini-dependent, offline-eligible step used for fresh repositories where no canonical features exist; the module is imported lazily on demand. Called by ``run_pipeline`` via the ``STEP_MAP`` dispatch (and reachable through the ``--discover-features`` flag handled in ``async_main``); it is not invoked directly elsewhere. """ print("\n>>> Step 1c: Running feature discovery swarm (inhale protocol)...\n") from tools.feature_atlas.discover_features import async_main as run await run()
[docs] async def step_load_features() -> None: """Run pipeline step 2: load Feature nodes into FalkorDB. Prints a progress header and awaits the ``async_main`` of ``tools.feature_atlas.load_features_to_falkor``, which reads the extracted feature records and writes them as ``Feature`` nodes into the FalkorDB graph so later steps can attach interactions and analyses to them. This step requires FalkorDB (it is not in ``OFFLINE_STEPS``) and must follow feature extraction; the loader module is imported lazily on demand. Called by ``run_pipeline`` via the ``STEP_MAP``/``PIPELINE_ORDER`` dispatch; not invoked directly elsewhere. """ print("\n>>> Step 2: Loading features into FalkorDB...\n") from tools.feature_atlas.load_features_to_falkor import async_main as run await run()
[docs] async def step_detect_interactions() -> None: """Run pipeline step 3: detect code interactions between features. Prints a progress header and awaits the ``async_main`` of ``tools.feature_atlas.detect_code_interactions``, which uses the symbol index and the loaded ``Feature`` nodes to find where features touch one another in code and records those interaction edges in FalkorDB. This FalkorDB-dependent step must follow feature loading; the detector module is imported lazily on demand. Called by ``run_pipeline`` via the ``STEP_MAP``/``PIPELINE_ORDER`` dispatch; not invoked directly elsewhere. """ print("\n>>> Step 3: Detecting code interactions...\n") from tools.feature_atlas.detect_code_interactions import async_main as run await run()
[docs] async def step_generate_prompts() -> None: """Run pipeline step 4: generate interaction-analysis prompts. Prints a progress header and awaits the ``async_main`` of ``tools.feature_atlas.generate_interaction_prompts``, which reads the detected interaction edges from FalkorDB and renders the per-pair prompt payloads that the later analysis swarm sends to the LLM. This step bridges interaction detection and the top-50 analysis stage; the prompt-builder module is imported lazily on demand. Called by ``run_pipeline`` via the ``STEP_MAP``/``PIPELINE_ORDER`` dispatch; not invoked directly elsewhere. """ print("\n>>> Step 4: Generating interaction prompts...\n") from tools.feature_atlas.generate_interaction_prompts import async_main as run await run()
[docs] async def step_analyze_top50() -> None: """Run pipeline step 5: analyze the top 50 interaction pairs via Gemini. Prints a progress header and awaits the ``async_main`` of ``tools.feature_atlas.run_interaction_analysis_swarm``, which sends the generated prompts for the highest-ranked feature pairs out to the Gemini Flash swarm and collects their interaction analyses. This is a Gemini-dependent step (see ``GEMINI_STEPS``) that consumes the prompts built in step 4; the swarm module is imported lazily on demand. Called by ``run_pipeline`` via the ``STEP_MAP``/``PIPELINE_ORDER`` dispatch; not invoked directly elsewhere. """ print("\n>>> Step 5: Running interaction analysis swarm (Gemini Flash)...\n") from tools.feature_atlas.run_interaction_analysis_swarm import async_main as run await run()
[docs] async def step_import_analyses() -> None: """Run pipeline step 6: import interaction analyses into FalkorDB. Prints a progress header and awaits the ``async_main`` of ``tools.feature_atlas.import_interaction_analysis``, which reads the analysis output produced by the step 5 swarm and writes it back onto the matching interaction edges in the FalkorDB graph. This FalkorDB-dependent step persists the LLM's findings so the export and query steps can surface them; the importer module is imported lazily on demand. Called by ``run_pipeline`` via the ``STEP_MAP``/``PIPELINE_ORDER`` dispatch; not invoked directly elsewhere. """ print("\n>>> Step 6: Importing analyses into FalkorDB...\n") from tools.feature_atlas.import_interaction_analysis import async_main as run await run()
[docs] async def step_export_demo() -> None: """Run pipeline step 8: generate the Sarah demo report. Prints a progress header and awaits the ``async_main`` of ``tools.feature_atlas.export_sarah_demo``, which reads the fully populated atlas from FalkorDB and writes out the human-facing "Bodygraph Demo" report of the highest-risk and highest-value feature interactions. This is the final stage of the pipeline order; the exporter module is imported lazily on demand. (Step 7 is the interactive query CLI in ``step_query`` and is not part of the linear pipeline.) Called by ``run_pipeline`` via the ``STEP_MAP``/``PIPELINE_ORDER`` dispatch; not invoked directly elsewhere. """ print("\n>>> Step 8: Generating Sarah demo report...\n") from tools.feature_atlas.export_sarah_demo import async_main as run await run()
[docs] async def step_query() -> None: """Run pipeline step 7: launch the interactive query CLI. Awaits the ``async_main`` of ``tools.feature_atlas.query_atlas``, which opens an interactive command-line session against the populated FalkorDB atlas so an operator can explore features and their interactions. Unlike the other steps it prints no header here (the query CLI manages its own banner) and is excluded from ``PIPELINE_ORDER`` because it is an interactive tool rather than a batch stage; the query module is imported lazily on demand. Called by ``run_pipeline`` through the ``STEP_MAP`` dispatch, and also invoked directly by ``async_main`` when ``--step query`` is passed. """ from tools.feature_atlas.query_atlas import async_main as run await run()
STEP_MAP = { "extract-symbols": step_extract_symbols, "discover-features": step_discover_features, "extract-features": step_extract_features, "load-features": step_load_features, "detect-interactions": step_detect_interactions, "generate-prompts": step_generate_prompts, "analyze-top50": step_analyze_top50, "import-analyses": step_import_analyses, "export-demo": step_export_demo, "query": step_query, } # Pipeline order (all steps except query) PIPELINE_ORDER = [ "extract-symbols", "extract-features", "load-features", "detect-interactions", "generate-prompts", "analyze-top50", "import-analyses", "export-demo", ]
[docs] async def run_pipeline(steps: list[str] | None = None) -> None: """Run the full Feature Atlas pipeline or a chosen subset of steps. Prints the banner, then awaits each requested step's coroutine in order, resolving each name through ``STEP_MAP`` and timing it. Steps are deliberately fault-tolerant: a failure is logged (via the module ``logger``) and printed but does not abort the run, so the loop continues with the remaining steps and a completed/failed summary is printed at the end. All real work (FalkorDB writes, Gemini calls, file output) happens inside the individual ``step_*`` coroutines this dispatches to; this function itself only orchestrates and writes progress to stdout. Called by ``async_main`` in this module with the appropriate step list (the full ``PIPELINE_ORDER``, a resumed slice, the discover-features pair, or a single ``--step``); it is not invoked from other modules. Args: steps: Ordered list of step names to run. Each must be a key of ``STEP_MAP``; unknown names are reported and counted as failures. When ``None`` (the default), the full ``PIPELINE_ORDER`` is run. """ if steps is None: steps = PIPELINE_ORDER _banner() total_t0 = time.time() completed = 0 failed = 0 for step_name in steps: step_fn = STEP_MAP.get(step_name) if step_fn is None: print(f"Unknown step: {step_name}") failed += 1 continue t0 = time.time() try: await step_fn() elapsed = time.time() - t0 print(f" [{step_name}] completed in {elapsed:.1f}s\n") completed += 1 except Exception as e: elapsed = time.time() - t0 print(f" [{step_name}] FAILED after {elapsed:.1f}s: {e}\n") logger.error("Step '%s' failed: %s", step_name, e, exc_info=True) failed += 1 # Continue with remaining steps even if one fails continue total_elapsed = time.time() - total_t0 print(f"\n{'=' * 60}") print(f" PIPELINE COMPLETE") print(f"{'=' * 60}") print(f" Steps completed: {completed}/{completed + failed}") print(f" Steps failed: {failed}") print(f" Total time: {total_elapsed:.1f}s") print(f"{'=' * 60}\n")
[docs] async def async_main() -> None: """Async entry point that parses CLI arguments and dispatches the pipeline. Builds the ``argparse`` parser for the ``--step``, ``--from-step``, and ``--discover-features`` options, then routes accordingly: the discover flag runs the symbol-extraction plus feature-discovery pair, ``--step query`` launches the interactive CLI directly, ``--step all`` runs the full ``PIPELINE_ORDER`` (optionally sliced from ``--from-step``), and any other single step runs just that one. All actual execution is delegated to ``run_pipeline`` (or ``step_query``); this function only reads ``sys.argv`` and dispatches. Called by ``main`` in this module via ``asyncio.run(async_main())``; it is not invoked from other modules. """ parser = argparse.ArgumentParser( description="Stargazer Feature Interaction Atlas -- Pipeline Orchestrator" ) parser.add_argument( "--step", choices=VALID_STEPS, default="all", help="Run a specific step instead of the full pipeline", ) parser.add_argument( "--from-step", choices=PIPELINE_ORDER, help="Resume pipeline from this step (inclusive)", ) parser.add_argument( "--discover-features", action="store_true", help="Run feature discovery (inhale protocol) instead of using canonical features", ) args = parser.parse_args() if args.discover_features: # Inhale protocol: scan + discover await run_pipeline(["extract-symbols", "discover-features"]) elif args.step == "query": await step_query() elif args.step == "all": if args.from_step: idx = PIPELINE_ORDER.index(args.from_step) steps = PIPELINE_ORDER[idx:] else: steps = PIPELINE_ORDER await run_pipeline(steps) else: await run_pipeline([args.step])
[docs] def main() -> None: """Synchronous console entry point for the Feature Atlas orchestrator. Configures root logging at ``INFO`` with a timestamped format and then drives the whole CLI by calling ``asyncio.run(async_main())``, which parses arguments and runs the requested pipeline steps. This is the plain command-line surface for ``python -m tools.feature_atlas.run_atlas``. Called by the module's ``__main__`` guard at the bottom of this file; it is not invoked from other modules. """ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) asyncio.run(async_main())
if __name__ == "__main__": main()