Source code for tools.feature_atlas.import_interaction_analysis

"""Step 6: Import interaction analyses into FalkorDB.

Reads ``outputs/interaction_analyses.jsonl`` and MERGEs each analysis
as an InteractionAnalysis node linked from its InteractionPrompt.

Usage:
    python -m tools.feature_atlas.import_interaction_analysis

# skull -- WRITING THE DIAGNOSIS
"""

from __future__ import annotations

import asyncio
import json
import logging
import sys
import time
from pathlib import Path
from typing import Any

_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(_PROJECT_ROOT))

logger = logging.getLogger(__name__)

_ATLAS_DIR = Path(__file__).resolve().parent
_ANALYSES_PATH = _ATLAS_DIR / "outputs" / "interaction_analyses.jsonl"


def _load_analyses() -> list[dict[str, Any]]:
    """Read and parse the interaction-analysis JSONL produced by the swarm.

    Loads ``outputs/interaction_analyses.jsonl`` (the file written upstream by
    ``run_interaction_analysis_swarm.py``) from the module-level
    ``_ANALYSES_PATH`` and deserializes each non-blank line as a JSON object,
    returning the full list of analysis dicts to be imported into the atlas
    graph.

    This is a pure local-filesystem read with no service interactions; it opens
    the path in ``_ANALYSES_PATH``, strips and skips blank lines, and calls
    ``json.loads`` on each. It is called by :func:`import_analyses` (as the
    default source when no ``analyses`` argument is supplied) and by
    :func:`async_main` (to obtain the total count for the summary report).

    Returns:
        list[dict[str, Any]]: One parsed analysis object per JSONL record, each
        typically carrying ``source_id``/``target_id`` keys consumed by
        ``merge_interaction_analysis``.

    Raises:
        FileNotFoundError: If ``_ANALYSES_PATH`` does not exist, signalling that
            the analysis swarm has not been run yet.
        json.JSONDecodeError: If any non-blank line is not valid JSON.
    """
    if not _ANALYSES_PATH.exists():
        raise FileNotFoundError(
            f"Analyses not found at {_ANALYSES_PATH}. "
            "Run run_interaction_analysis_swarm.py first."
        )
    analyses = []
    with open(_ANALYSES_PATH, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if line:
                analyses.append(json.loads(line))
    return analyses


[docs] async def import_analyses(analyses: list[dict[str, Any]] | None = None) -> int: """Persist every interaction analysis into the atlas knowledge graph. This is Step 6 of the Feature Atlas pipeline: it takes the per-pair analyses produced by the Gemini analysis swarm and writes each one into FalkorDB so the atlas captures *how* feature pairs interact, not just that they do. Persisting the analyses here is what makes the downstream query and demo-export steps able to surface real interaction insight. Opens a connection to the ``stargazer_feature_interaction_atlas`` graph via ``get_atlas_graph`` (which talks to FalkorDB over the shared Redis connection pool), then MERGEs each record through ``merge_interaction_analysis`` so the import is idempotent and safe to re-run. When no ``analyses`` argument is supplied it falls back to :func:`_load_analyses`, reading ``outputs/interaction_analyses.jsonl`` from the local filesystem. Per-record failures are logged and skipped rather than aborting the batch, and after the loop it runs a read-only ``count(InteractionAnalysis)`` query to log a verification total before closing the Redis client. Called by :func:`async_main` in this module, which is in turn dispatched as Step 6 by ``run_atlas.py``\\ 's ``step_import_analyses`` (imported there as ``async_main as run``). Args: analyses: Optional pre-loaded list of analysis dicts (each typically carrying ``source_id``/``target_id`` keys). When ``None``, the analyses are read from ``outputs/interaction_analyses.jsonl`` via :func:`_load_analyses`. Returns: int: The number of analyses successfully MERGEd into the atlas graph (records that raised during the merge are excluded from the count). """ from tools.feature_atlas.atlas_connection import ( get_atlas_graph, merge_interaction_analysis, ) if analyses is None: analyses = _load_analyses() graph, rc = await get_atlas_graph() loaded = 0 for analysis in analyses: try: await merge_interaction_analysis(graph, analysis) loaded += 1 except Exception as e: logger.error( "Failed to import analysis %s -> %s: %s", analysis.get("source_id"), analysis.get("target_id"), e, ) # Verify try: result = await graph.ro_query( "MATCH (a:InteractionAnalysis) RETURN count(a) AS cnt" ) count = result.result_set[0][0] if result.result_set else 0 logger.info( "Verification: %d InteractionAnalysis nodes in atlas graph", count, ) except Exception as e: logger.warning("Verification failed: %s", e) await rc.aclose() return loaded
[docs] async def async_main() -> None: """Run Step 6 end to end: load the analyses and import them into FalkorDB. This is the coroutine that actually performs the import for command-line and orchestrated runs. It reads the analysis JSONL once for an accurate denominator, drives the FalkorDB import, and prints a human-readable summary so an operator can see how the step fared at a glance. Reads ``outputs/interaction_analyses.jsonl`` via :func:`_load_analyses`, delegates the graph writes to :func:`import_analyses` (which connects to the ``stargazer_feature_interaction_atlas`` graph in FalkorDB), times the run, and writes a formatted report to stdout. It is invoked both by :func:`main` here (via ``asyncio.run``) and by ``run_atlas.py``\\ 's ``step_import_analyses``, which imports it as ``async_main as run`` and awaits it directly. Returns: None: All output is side effects -- the FalkorDB writes performed by :func:`import_analyses` and the summary printed to stdout. """ t0 = time.time() analyses = _load_analyses() loaded = await import_analyses(analyses) elapsed = time.time() - t0 print(f"\n{'=' * 60}") print(f" INTERACTION ANALYSES IMPORTED TO FALKORDB") print(f"{'=' * 60}") print(f" Analyses imported: {loaded}/{len(analyses)}") print(f" Time elapsed: {elapsed:.1f}s") print(f" Graph: stargazer_feature_interaction_atlas") print(f"{'=' * 60}\n")
[docs] def main() -> None: """Synchronous CLI entry point for the Step 6 import. Provides the ``python -m tools.feature_atlas.import_interaction_analysis`` entry point so the import can be run standalone. It exists to bridge the synchronous shell invocation into the async pipeline and to make sure progress is visible on the console. Configures root logging at INFO level (so the per-record import logs and the verification count emitted under :func:`import_analyses` are shown), then runs :func:`async_main` to completion via ``asyncio.run``. Called only from this module's ``if __name__ == "__main__"`` guard; the orchestrated path in ``run_atlas.py`` awaits :func:`async_main` directly and does not go through this function. Returns: None. """ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) asyncio.run(async_main())
if __name__ == "__main__": main()