"""Master orchestrator for the Feature Interaction Atlas pipeline.
Runs all steps in order, or a single step via --step.
Usage:
# Full pipeline
python -m tools.feature_atlas.run_atlas
# Individual steps
python -m tools.feature_atlas.run_atlas --step extract-symbols
python -m tools.feature_atlas.run_atlas --step discover-features
python -m tools.feature_atlas.run_atlas --step extract-features
python -m tools.feature_atlas.run_atlas --step load-features
python -m tools.feature_atlas.run_atlas --step detect-interactions
python -m tools.feature_atlas.run_atlas --step generate-prompts
python -m tools.feature_atlas.run_atlas --step analyze-top50
python -m tools.feature_atlas.run_atlas --step import-analyses
python -m tools.feature_atlas.run_atlas --step export-demo
# Discover features from unknown codebase (no canonical list needed)
python -m tools.feature_atlas.run_atlas --discover-features
# Query CLI
python -m tools.feature_atlas.run_atlas --step query
# fire skull spider infinity heart -- THE FULL BODYGRAPH PIPELINE
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import sys
import time
from pathlib import Path
_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
logger = logging.getLogger(__name__)
VALID_STEPS = [
"extract-symbols",
"discover-features",
"extract-features",
"load-features",
"detect-interactions",
"generate-prompts",
"analyze-top50",
"import-analyses",
"export-demo",
"query",
"all",
]
# Steps that do NOT require FalkorDB (can run offline)
OFFLINE_STEPS = {"extract-symbols", "extract-features", "discover-features"}
# Steps that require the Gemini API
GEMINI_STEPS = {"extract-features", "analyze-top50", "discover-features"}
def _banner() -> None:
"""Print the ASCII art banner for the Feature Interaction Atlas pipeline.
Writes the multi-line "Bodygraph Demo" header straight to stdout with
``print`` as a purely cosmetic side effect; it touches no Redis streams,
FalkorDB, the knowledge graph, or any LLM, and returns nothing.
Called by ``run_pipeline`` in this module at the start of every full or
partial run so the operator sees the title block before steps execute. Note
that the sibling ``query_atlas.py`` CLI uses its own ``print_banner`` helper,
which is unrelated to this function.
"""
print("""
+======================================================================+
| STARGAZER FEATURE INTERACTION ATLAS v0 |
| "The Bodygraph Demo" |
| |
| Stargazer inspects her own codebase, identifies her major organs, |
| maps which organs touch, and shows the highest-risk/highest-value |
| feature interactions. |
+======================================================================+
""")
[docs]
async def step_discover_features() -> None:
"""Run pipeline step 1c: discover features from an unknown codebase.
Prints a progress header and awaits the ``async_main`` of
``tools.feature_atlas.discover_features``, which runs the "inhale protocol":
instead of mapping symbols onto a known canonical feature list, it lets
Gemini propose the feature set directly from the symbol index. This is a
Gemini-dependent, offline-eligible step used for fresh repositories where no
canonical features exist; the module is imported lazily on demand.
Called by ``run_pipeline`` via the ``STEP_MAP`` dispatch (and reachable
through the ``--discover-features`` flag handled in ``async_main``); it is
not invoked directly elsewhere.
"""
print("\n>>> Step 1c: Running feature discovery swarm (inhale protocol)...\n")
from tools.feature_atlas.discover_features import async_main as run
await run()
[docs]
async def step_load_features() -> None:
"""Run pipeline step 2: load Feature nodes into FalkorDB.
Prints a progress header and awaits the ``async_main`` of
``tools.feature_atlas.load_features_to_falkor``, which reads the extracted
feature records and writes them as ``Feature`` nodes into the FalkorDB graph
so later steps can attach interactions and analyses to them. This step
requires FalkorDB (it is not in ``OFFLINE_STEPS``) and must follow feature
extraction; the loader module is imported lazily on demand.
Called by ``run_pipeline`` via the ``STEP_MAP``/``PIPELINE_ORDER`` dispatch;
not invoked directly elsewhere.
"""
print("\n>>> Step 2: Loading features into FalkorDB...\n")
from tools.feature_atlas.load_features_to_falkor import async_main as run
await run()
[docs]
async def step_detect_interactions() -> None:
"""Run pipeline step 3: detect code interactions between features.
Prints a progress header and awaits the ``async_main`` of
``tools.feature_atlas.detect_code_interactions``, which uses the symbol
index and the loaded ``Feature`` nodes to find where features touch one
another in code and records those interaction edges in FalkorDB. This
FalkorDB-dependent step must follow feature loading; the detector module is
imported lazily on demand.
Called by ``run_pipeline`` via the ``STEP_MAP``/``PIPELINE_ORDER`` dispatch;
not invoked directly elsewhere.
"""
print("\n>>> Step 3: Detecting code interactions...\n")
from tools.feature_atlas.detect_code_interactions import async_main as run
await run()
[docs]
async def step_generate_prompts() -> None:
"""Run pipeline step 4: generate interaction-analysis prompts.
Prints a progress header and awaits the ``async_main`` of
``tools.feature_atlas.generate_interaction_prompts``, which reads the
detected interaction edges from FalkorDB and renders the per-pair prompt
payloads that the later analysis swarm sends to the LLM. This step bridges
interaction detection and the top-50 analysis stage; the prompt-builder
module is imported lazily on demand.
Called by ``run_pipeline`` via the ``STEP_MAP``/``PIPELINE_ORDER`` dispatch;
not invoked directly elsewhere.
"""
print("\n>>> Step 4: Generating interaction prompts...\n")
from tools.feature_atlas.generate_interaction_prompts import async_main as run
await run()
[docs]
async def step_analyze_top50() -> None:
"""Run pipeline step 5: analyze the top 50 interaction pairs via Gemini.
Prints a progress header and awaits the ``async_main`` of
``tools.feature_atlas.run_interaction_analysis_swarm``, which sends the
generated prompts for the highest-ranked feature pairs out to the Gemini
Flash swarm and collects their interaction analyses. This is a
Gemini-dependent step (see ``GEMINI_STEPS``) that consumes the prompts built
in step 4; the swarm module is imported lazily on demand.
Called by ``run_pipeline`` via the ``STEP_MAP``/``PIPELINE_ORDER`` dispatch;
not invoked directly elsewhere.
"""
print("\n>>> Step 5: Running interaction analysis swarm (Gemini Flash)...\n")
from tools.feature_atlas.run_interaction_analysis_swarm import async_main as run
await run()
[docs]
async def step_import_analyses() -> None:
"""Run pipeline step 6: import interaction analyses into FalkorDB.
Prints a progress header and awaits the ``async_main`` of
``tools.feature_atlas.import_interaction_analysis``, which reads the analysis
output produced by the step 5 swarm and writes it back onto the matching
interaction edges in the FalkorDB graph. This FalkorDB-dependent step
persists the LLM's findings so the export and query steps can surface them;
the importer module is imported lazily on demand.
Called by ``run_pipeline`` via the ``STEP_MAP``/``PIPELINE_ORDER`` dispatch;
not invoked directly elsewhere.
"""
print("\n>>> Step 6: Importing analyses into FalkorDB...\n")
from tools.feature_atlas.import_interaction_analysis import async_main as run
await run()
[docs]
async def step_export_demo() -> None:
"""Run pipeline step 8: generate the Sarah demo report.
Prints a progress header and awaits the ``async_main`` of
``tools.feature_atlas.export_sarah_demo``, which reads the fully populated
atlas from FalkorDB and writes out the human-facing "Bodygraph Demo" report
of the highest-risk and highest-value feature interactions. This is the
final stage of the pipeline order; the exporter module is imported lazily on
demand. (Step 7 is the interactive query CLI in ``step_query`` and is not
part of the linear pipeline.)
Called by ``run_pipeline`` via the ``STEP_MAP``/``PIPELINE_ORDER`` dispatch;
not invoked directly elsewhere.
"""
print("\n>>> Step 8: Generating Sarah demo report...\n")
from tools.feature_atlas.export_sarah_demo import async_main as run
await run()
[docs]
async def step_query() -> None:
"""Run pipeline step 7: launch the interactive query CLI.
Awaits the ``async_main`` of ``tools.feature_atlas.query_atlas``, which opens
an interactive command-line session against the populated FalkorDB atlas so
an operator can explore features and their interactions. Unlike the other
steps it prints no header here (the query CLI manages its own banner) and is
excluded from ``PIPELINE_ORDER`` because it is an interactive tool rather
than a batch stage; the query module is imported lazily on demand.
Called by ``run_pipeline`` through the ``STEP_MAP`` dispatch, and also
invoked directly by ``async_main`` when ``--step query`` is passed.
"""
from tools.feature_atlas.query_atlas import async_main as run
await run()
STEP_MAP = {
"extract-symbols": step_extract_symbols,
"discover-features": step_discover_features,
"extract-features": step_extract_features,
"load-features": step_load_features,
"detect-interactions": step_detect_interactions,
"generate-prompts": step_generate_prompts,
"analyze-top50": step_analyze_top50,
"import-analyses": step_import_analyses,
"export-demo": step_export_demo,
"query": step_query,
}
# Pipeline order (all steps except query)
PIPELINE_ORDER = [
"extract-symbols",
"extract-features",
"load-features",
"detect-interactions",
"generate-prompts",
"analyze-top50",
"import-analyses",
"export-demo",
]
[docs]
async def run_pipeline(steps: list[str] | None = None) -> None:
"""Run the full Feature Atlas pipeline or a chosen subset of steps.
Prints the banner, then awaits each requested step's coroutine in order,
resolving each name through ``STEP_MAP`` and timing it. Steps are
deliberately fault-tolerant: a failure is logged (via the module ``logger``)
and printed but does not abort the run, so the loop continues with the
remaining steps and a completed/failed summary is printed at the end. All
real work (FalkorDB writes, Gemini calls, file output) happens inside the
individual ``step_*`` coroutines this dispatches to; this function itself
only orchestrates and writes progress to stdout.
Called by ``async_main`` in this module with the appropriate step list (the
full ``PIPELINE_ORDER``, a resumed slice, the discover-features pair, or a
single ``--step``); it is not invoked from other modules.
Args:
steps: Ordered list of step names to run. Each must be a key of
``STEP_MAP``; unknown names are reported and counted as failures.
When ``None`` (the default), the full ``PIPELINE_ORDER`` is run.
"""
if steps is None:
steps = PIPELINE_ORDER
_banner()
total_t0 = time.time()
completed = 0
failed = 0
for step_name in steps:
step_fn = STEP_MAP.get(step_name)
if step_fn is None:
print(f"Unknown step: {step_name}")
failed += 1
continue
t0 = time.time()
try:
await step_fn()
elapsed = time.time() - t0
print(f" [{step_name}] completed in {elapsed:.1f}s\n")
completed += 1
except Exception as e:
elapsed = time.time() - t0
print(f" [{step_name}] FAILED after {elapsed:.1f}s: {e}\n")
logger.error("Step '%s' failed: %s", step_name, e, exc_info=True)
failed += 1
# Continue with remaining steps even if one fails
continue
total_elapsed = time.time() - total_t0
print(f"\n{'=' * 60}")
print(f" PIPELINE COMPLETE")
print(f"{'=' * 60}")
print(f" Steps completed: {completed}/{completed + failed}")
print(f" Steps failed: {failed}")
print(f" Total time: {total_elapsed:.1f}s")
print(f"{'=' * 60}\n")
[docs]
async def async_main() -> None:
"""Async entry point that parses CLI arguments and dispatches the pipeline.
Builds the ``argparse`` parser for the ``--step``, ``--from-step``, and
``--discover-features`` options, then routes accordingly: the discover flag
runs the symbol-extraction plus feature-discovery pair, ``--step query``
launches the interactive CLI directly, ``--step all`` runs the full
``PIPELINE_ORDER`` (optionally sliced from ``--from-step``), and any other
single step runs just that one. All actual execution is delegated to
``run_pipeline`` (or ``step_query``); this function only reads ``sys.argv``
and dispatches.
Called by ``main`` in this module via ``asyncio.run(async_main())``; it is
not invoked from other modules.
"""
parser = argparse.ArgumentParser(
description="Stargazer Feature Interaction Atlas -- Pipeline Orchestrator"
)
parser.add_argument(
"--step",
choices=VALID_STEPS,
default="all",
help="Run a specific step instead of the full pipeline",
)
parser.add_argument(
"--from-step",
choices=PIPELINE_ORDER,
help="Resume pipeline from this step (inclusive)",
)
parser.add_argument(
"--discover-features",
action="store_true",
help="Run feature discovery (inhale protocol) instead of using canonical features",
)
args = parser.parse_args()
if args.discover_features:
# Inhale protocol: scan + discover
await run_pipeline(["extract-symbols", "discover-features"])
elif args.step == "query":
await step_query()
elif args.step == "all":
if args.from_step:
idx = PIPELINE_ORDER.index(args.from_step)
steps = PIPELINE_ORDER[idx:]
else:
steps = PIPELINE_ORDER
await run_pipeline(steps)
else:
await run_pipeline([args.step])
[docs]
def main() -> None:
"""Synchronous console entry point for the Feature Atlas orchestrator.
Configures root logging at ``INFO`` with a timestamped format and then drives
the whole CLI by calling ``asyncio.run(async_main())``, which parses
arguments and runs the requested pipeline steps. This is the plain
command-line surface for ``python -m tools.feature_atlas.run_atlas``.
Called by the module's ``__main__`` guard at the bottom of this file; it is
not invoked from other modules.
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
asyncio.run(async_main())
if __name__ == "__main__":
main()