"""Step 2: Load Feature nodes into FalkorDB.
Reads ``outputs/feature_registry.json`` and MERGEs each feature as a
node into the ``stargazer_feature_interaction_atlas`` graph.
Fully idempotent -- safe to re-run.
Usage:
python -m tools.feature_atlas.load_features_to_falkor
# skull -- BONES INTO THE GRAPH
"""
from __future__ import annotations
import asyncio
import json
import logging
import sys
import time
from pathlib import Path
from typing import Any
_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
logger = logging.getLogger(__name__)
_ATLAS_DIR = Path(__file__).resolve().parent
_FEATURE_REGISTRY_PATH = _ATLAS_DIR / "outputs" / "feature_registry.json"
[docs]
def load_feature_registry() -> list[dict[str, Any]]:
"""Read and parse the feature registry produced by the extraction swarm.
Loads the canonical list of features discovered upstream so the loader has
something to MERGE into the graph. It is the single source of truth for
"which features exist" at the start of Step 2, which is why it fails loudly
rather than returning an empty list when the registry is absent.
This is a pure local-filesystem read with no service interactions: it opens
``outputs/feature_registry.json`` from the module-level
``_FEATURE_REGISTRY_PATH`` and returns the deserialized JSON list. Called
by :func:`load_features` (as the default source when no ``features``
argument is supplied) and by :func:`async_main` (to obtain the total count
for the summary report).
Returns:
list[dict[str, Any]]: One feature dict per registry entry, each
typically carrying ``id``, ``confidence``, ``files`` and ``symbols``
keys consumed by ``merge_feature``.
Raises:
FileNotFoundError: If ``_FEATURE_REGISTRY_PATH`` does not exist,
signalling that ``extract_features_swarm.py`` has not been run yet.
json.JSONDecodeError: If the registry file is not valid JSON.
"""
if not _FEATURE_REGISTRY_PATH.exists():
raise FileNotFoundError(
f"Feature registry not found at {_FEATURE_REGISTRY_PATH}. "
"Run extract_features_swarm.py first."
)
with open(_FEATURE_REGISTRY_PATH, "r", encoding="utf-8") as f:
return json.load(f)
[docs]
async def load_features(features: list[dict[str, Any]] | None = None) -> int:
"""MERGE every Feature into the atlas knowledge graph in FalkorDB.
This is Step 2 of the Feature Atlas pipeline: it turns the registry of
discovered features into actual ``Feature`` nodes in the graph so later
steps (interaction detection, prompt generation, querying) have vertices to
attach edges and analyses to. The whole operation is idempotent and safe to
re-run, which is the contract callers and the orchestrator rely on.
Opens a connection to the ``stargazer_feature_interaction_atlas`` graph via
``get_atlas_graph`` (FalkorDB over the shared Redis connection pool), then
MERGEs each feature through ``merge_feature`` and logs a per-feature line
with its confidence and file/symbol counts. When no ``features`` argument is
supplied it falls back to :func:`load_feature_registry`, reading
``outputs/feature_registry.json`` from the local filesystem. Per-feature
failures are logged and skipped rather than aborting the batch, and after
the loop it runs a read-only ``count(Feature)`` query to log a verification
total before closing the Redis client. Called by :func:`async_main` in this
module, which is dispatched as Step 2 by ``run_atlas.py``\\ 's
``step_load_features`` (imported there as ``async_main as run``).
Args:
features: Optional pre-loaded list of feature dicts. When ``None``, the
features are read from ``outputs/feature_registry.json`` via
:func:`load_feature_registry`.
Returns:
int: The number of features successfully MERGEd into the atlas graph
(features that raised during the merge are excluded from the count).
"""
from tools.feature_atlas.atlas_connection import (
get_atlas_graph,
merge_feature,
)
if features is None:
features = load_feature_registry()
graph, rc = await get_atlas_graph()
loaded = 0
for feat in features:
try:
await merge_feature(graph, feat)
loaded += 1
logger.info(
"Loaded feature: %s (confidence=%.2f, %d files, %d symbols)",
feat["id"],
feat.get("confidence", 0),
len(feat.get("files", [])),
len(feat.get("symbols", [])),
)
except Exception as e:
logger.error("Failed to load feature %s: %s", feat.get("id", "?"), e)
# Verify
try:
result = await graph.ro_query(
"MATCH (f:Feature) RETURN count(f) AS cnt"
)
count = result.result_set[0][0] if result.result_set else 0
logger.info("Verification: %d Feature nodes in atlas graph", count)
except Exception as e:
logger.warning("Verification query failed: %s", e)
await rc.aclose()
return loaded
[docs]
async def async_main() -> None:
"""Run Step 2 end to end: load the registry and import features into FalkorDB.
This is the coroutine that actually performs the feature load for
command-line and orchestrated runs. It reads the registry once for an
accurate denominator, drives the FalkorDB import, and prints a
human-readable summary so an operator can see how the step fared at a
glance.
Reads ``outputs/feature_registry.json`` via :func:`load_feature_registry`,
delegates the graph writes to :func:`load_features` (which connects to the
``stargazer_feature_interaction_atlas`` graph in FalkorDB), times the run,
and writes a formatted report to stdout. It is invoked both by :func:`main`
here (via ``asyncio.run``) and by ``run_atlas.py``\\ 's ``step_load_features``,
which imports it as ``async_main as run`` and awaits it directly.
Returns:
None: All output is side effects -- the FalkorDB writes performed by
:func:`load_features` and the summary printed to stdout.
"""
t0 = time.time()
features = load_feature_registry()
loaded = await load_features(features)
elapsed = time.time() - t0
print(f"\n{'=' * 60}")
print(f" FEATURE NODES LOADED TO FALKORDB")
print(f"{'=' * 60}")
print(f" Features loaded: {loaded}/{len(features)}")
print(f" Time elapsed: {elapsed:.1f}s")
print(f" Graph: stargazer_feature_interaction_atlas")
print(f"{'=' * 60}\n")
[docs]
def main() -> None:
"""Synchronous CLI entry point for the Step 2 feature load.
Provides the ``python -m tools.feature_atlas.load_features_to_falkor``
entry point so the feature load can be run standalone. It exists to bridge
the synchronous shell invocation into the async pipeline and to make sure
progress is visible on the console.
Configures root logging at INFO level (so the per-feature load lines and the
verification count emitted under :func:`load_features` are shown), then runs
:func:`async_main` to completion via ``asyncio.run``. Called only from this
module's ``if __name__ == "__main__"`` guard; the orchestrated path in
``run_atlas.py`` awaits :func:`async_main` directly and does not go through
this function.
Returns:
None.
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
asyncio.run(async_main())
if __name__ == "__main__":
main()