Source code for tools.feature_atlas.load_features_to_falkor

"""Step 2: Load Feature nodes into FalkorDB.

Reads ``outputs/feature_registry.json`` and MERGEs each feature as a
node into the ``stargazer_feature_interaction_atlas`` graph.

Fully idempotent -- safe to re-run.

Usage:
    python -m tools.feature_atlas.load_features_to_falkor

# skull -- BONES INTO THE GRAPH
"""

from __future__ import annotations

import asyncio
import json
import logging
import sys
import time
from pathlib import Path
from typing import Any

_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(_PROJECT_ROOT))

logger = logging.getLogger(__name__)

_ATLAS_DIR = Path(__file__).resolve().parent
_FEATURE_REGISTRY_PATH = _ATLAS_DIR / "outputs" / "feature_registry.json"


[docs] def load_feature_registry() -> list[dict[str, Any]]: """Read and parse the feature registry produced by the extraction swarm. Loads the canonical list of features discovered upstream so the loader has something to MERGE into the graph. It is the single source of truth for "which features exist" at the start of Step 2, which is why it fails loudly rather than returning an empty list when the registry is absent. This is a pure local-filesystem read with no service interactions: it opens ``outputs/feature_registry.json`` from the module-level ``_FEATURE_REGISTRY_PATH`` and returns the deserialized JSON list. Called by :func:`load_features` (as the default source when no ``features`` argument is supplied) and by :func:`async_main` (to obtain the total count for the summary report). Returns: list[dict[str, Any]]: One feature dict per registry entry, each typically carrying ``id``, ``confidence``, ``files`` and ``symbols`` keys consumed by ``merge_feature``. Raises: FileNotFoundError: If ``_FEATURE_REGISTRY_PATH`` does not exist, signalling that ``extract_features_swarm.py`` has not been run yet. json.JSONDecodeError: If the registry file is not valid JSON. """ if not _FEATURE_REGISTRY_PATH.exists(): raise FileNotFoundError( f"Feature registry not found at {_FEATURE_REGISTRY_PATH}. " "Run extract_features_swarm.py first." ) with open(_FEATURE_REGISTRY_PATH, "r", encoding="utf-8") as f: return json.load(f)
[docs] async def load_features(features: list[dict[str, Any]] | None = None) -> int: """MERGE every Feature into the atlas knowledge graph in FalkorDB. This is Step 2 of the Feature Atlas pipeline: it turns the registry of discovered features into actual ``Feature`` nodes in the graph so later steps (interaction detection, prompt generation, querying) have vertices to attach edges and analyses to. The whole operation is idempotent and safe to re-run, which is the contract callers and the orchestrator rely on. Opens a connection to the ``stargazer_feature_interaction_atlas`` graph via ``get_atlas_graph`` (FalkorDB over the shared Redis connection pool), then MERGEs each feature through ``merge_feature`` and logs a per-feature line with its confidence and file/symbol counts. When no ``features`` argument is supplied it falls back to :func:`load_feature_registry`, reading ``outputs/feature_registry.json`` from the local filesystem. Per-feature failures are logged and skipped rather than aborting the batch, and after the loop it runs a read-only ``count(Feature)`` query to log a verification total before closing the Redis client. Called by :func:`async_main` in this module, which is dispatched as Step 2 by ``run_atlas.py``\\ 's ``step_load_features`` (imported there as ``async_main as run``). Args: features: Optional pre-loaded list of feature dicts. When ``None``, the features are read from ``outputs/feature_registry.json`` via :func:`load_feature_registry`. Returns: int: The number of features successfully MERGEd into the atlas graph (features that raised during the merge are excluded from the count). """ from tools.feature_atlas.atlas_connection import ( get_atlas_graph, merge_feature, ) if features is None: features = load_feature_registry() graph, rc = await get_atlas_graph() loaded = 0 for feat in features: try: await merge_feature(graph, feat) loaded += 1 logger.info( "Loaded feature: %s (confidence=%.2f, %d files, %d symbols)", feat["id"], feat.get("confidence", 0), len(feat.get("files", [])), len(feat.get("symbols", [])), ) except Exception as e: logger.error("Failed to load feature %s: %s", feat.get("id", "?"), e) # Verify try: result = await graph.ro_query( "MATCH (f:Feature) RETURN count(f) AS cnt" ) count = result.result_set[0][0] if result.result_set else 0 logger.info("Verification: %d Feature nodes in atlas graph", count) except Exception as e: logger.warning("Verification query failed: %s", e) await rc.aclose() return loaded
[docs] async def async_main() -> None: """Run Step 2 end to end: load the registry and import features into FalkorDB. This is the coroutine that actually performs the feature load for command-line and orchestrated runs. It reads the registry once for an accurate denominator, drives the FalkorDB import, and prints a human-readable summary so an operator can see how the step fared at a glance. Reads ``outputs/feature_registry.json`` via :func:`load_feature_registry`, delegates the graph writes to :func:`load_features` (which connects to the ``stargazer_feature_interaction_atlas`` graph in FalkorDB), times the run, and writes a formatted report to stdout. It is invoked both by :func:`main` here (via ``asyncio.run``) and by ``run_atlas.py``\\ 's ``step_load_features``, which imports it as ``async_main as run`` and awaits it directly. Returns: None: All output is side effects -- the FalkorDB writes performed by :func:`load_features` and the summary printed to stdout. """ t0 = time.time() features = load_feature_registry() loaded = await load_features(features) elapsed = time.time() - t0 print(f"\n{'=' * 60}") print(f" FEATURE NODES LOADED TO FALKORDB") print(f"{'=' * 60}") print(f" Features loaded: {loaded}/{len(features)}") print(f" Time elapsed: {elapsed:.1f}s") print(f" Graph: stargazer_feature_interaction_atlas") print(f"{'=' * 60}\n")
[docs] def main() -> None: """Synchronous CLI entry point for the Step 2 feature load. Provides the ``python -m tools.feature_atlas.load_features_to_falkor`` entry point so the feature load can be run standalone. It exists to bridge the synchronous shell invocation into the async pipeline and to make sure progress is visible on the console. Configures root logging at INFO level (so the per-feature load lines and the verification count emitted under :func:`load_features` are shown), then runs :func:`async_main` to completion via ``asyncio.run``. Called only from this module's ``if __name__ == "__main__"`` guard; the orchestrated path in ``run_atlas.py`` awaits :func:`async_main` directly and does not go through this function. Returns: None. """ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) asyncio.run(async_main())
if __name__ == "__main__": main()