"""Step 6: Import interaction analyses into FalkorDB.
Reads ``outputs/interaction_analyses.jsonl`` and MERGEs each analysis
as an InteractionAnalysis node linked from its InteractionPrompt.
Usage:
python -m tools.feature_atlas.import_interaction_analysis
# skull -- WRITING THE DIAGNOSIS
"""
from __future__ import annotations
import asyncio
import json
import logging
import sys
import time
from pathlib import Path
from typing import Any
_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
logger = logging.getLogger(__name__)
_ATLAS_DIR = Path(__file__).resolve().parent
_ANALYSES_PATH = _ATLAS_DIR / "outputs" / "interaction_analyses.jsonl"
def _load_analyses() -> list[dict[str, Any]]:
"""Read and parse the interaction-analysis JSONL produced by the swarm.
Loads ``outputs/interaction_analyses.jsonl`` (the file written upstream by
``run_interaction_analysis_swarm.py``) from the module-level
``_ANALYSES_PATH`` and deserializes each non-blank line as a JSON object,
returning the full list of analysis dicts to be imported into the atlas
graph.
This is a pure local-filesystem read with no service interactions; it opens
the path in ``_ANALYSES_PATH``, strips and skips blank lines, and calls
``json.loads`` on each. It is called by :func:`import_analyses` (as the
default source when no ``analyses`` argument is supplied) and by
:func:`async_main` (to obtain the total count for the summary report).
Returns:
list[dict[str, Any]]: One parsed analysis object per JSONL record, each
typically carrying ``source_id``/``target_id`` keys consumed by
``merge_interaction_analysis``.
Raises:
FileNotFoundError: If ``_ANALYSES_PATH`` does not exist, signalling that
the analysis swarm has not been run yet.
json.JSONDecodeError: If any non-blank line is not valid JSON.
"""
if not _ANALYSES_PATH.exists():
raise FileNotFoundError(
f"Analyses not found at {_ANALYSES_PATH}. "
"Run run_interaction_analysis_swarm.py first."
)
analyses = []
with open(_ANALYSES_PATH, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
analyses.append(json.loads(line))
return analyses
[docs]
async def import_analyses(analyses: list[dict[str, Any]] | None = None) -> int:
"""Persist every interaction analysis into the atlas knowledge graph.
This is Step 6 of the Feature Atlas pipeline: it takes the per-pair
analyses produced by the Gemini analysis swarm and writes each one into
FalkorDB so the atlas captures *how* feature pairs interact, not just that
they do. Persisting the analyses here is what makes the downstream query
and demo-export steps able to surface real interaction insight.
Opens a connection to the ``stargazer_feature_interaction_atlas`` graph via
``get_atlas_graph`` (which talks to FalkorDB over the shared Redis
connection pool), then MERGEs each record through
``merge_interaction_analysis`` so the import is idempotent and safe to
re-run. When no ``analyses`` argument is supplied it falls back to
:func:`_load_analyses`, reading ``outputs/interaction_analyses.jsonl`` from
the local filesystem. Per-record failures are logged and skipped rather
than aborting the batch, and after the loop it runs a read-only
``count(InteractionAnalysis)`` query to log a verification total before
closing the Redis client. Called by :func:`async_main` in this module,
which is in turn dispatched as Step 6 by ``run_atlas.py``\\ 's
``step_import_analyses`` (imported there as ``async_main as run``).
Args:
analyses: Optional pre-loaded list of analysis dicts (each typically
carrying ``source_id``/``target_id`` keys). When ``None``, the
analyses are read from ``outputs/interaction_analyses.jsonl`` via
:func:`_load_analyses`.
Returns:
int: The number of analyses successfully MERGEd into the atlas graph
(records that raised during the merge are excluded from the count).
"""
from tools.feature_atlas.atlas_connection import (
get_atlas_graph,
merge_interaction_analysis,
)
if analyses is None:
analyses = _load_analyses()
graph, rc = await get_atlas_graph()
loaded = 0
for analysis in analyses:
try:
await merge_interaction_analysis(graph, analysis)
loaded += 1
except Exception as e:
logger.error(
"Failed to import analysis %s -> %s: %s",
analysis.get("source_id"),
analysis.get("target_id"),
e,
)
# Verify
try:
result = await graph.ro_query(
"MATCH (a:InteractionAnalysis) RETURN count(a) AS cnt"
)
count = result.result_set[0][0] if result.result_set else 0
logger.info(
"Verification: %d InteractionAnalysis nodes in atlas graph",
count,
)
except Exception as e:
logger.warning("Verification failed: %s", e)
await rc.aclose()
return loaded
[docs]
async def async_main() -> None:
"""Run Step 6 end to end: load the analyses and import them into FalkorDB.
This is the coroutine that actually performs the import for command-line
and orchestrated runs. It reads the analysis JSONL once for an accurate
denominator, drives the FalkorDB import, and prints a human-readable
summary so an operator can see how the step fared at a glance.
Reads ``outputs/interaction_analyses.jsonl`` via :func:`_load_analyses`,
delegates the graph writes to :func:`import_analyses` (which connects to
the ``stargazer_feature_interaction_atlas`` graph in FalkorDB), times the
run, and writes a formatted report to stdout. It is invoked both by
:func:`main` here (via ``asyncio.run``) and by ``run_atlas.py``\\ 's
``step_import_analyses``, which imports it as ``async_main as run`` and
awaits it directly.
Returns:
None: All output is side effects -- the FalkorDB writes performed by
:func:`import_analyses` and the summary printed to stdout.
"""
t0 = time.time()
analyses = _load_analyses()
loaded = await import_analyses(analyses)
elapsed = time.time() - t0
print(f"\n{'=' * 60}")
print(f" INTERACTION ANALYSES IMPORTED TO FALKORDB")
print(f"{'=' * 60}")
print(f" Analyses imported: {loaded}/{len(analyses)}")
print(f" Time elapsed: {elapsed:.1f}s")
print(f" Graph: stargazer_feature_interaction_atlas")
print(f"{'=' * 60}\n")
[docs]
def main() -> None:
"""Synchronous CLI entry point for the Step 6 import.
Provides the ``python -m tools.feature_atlas.import_interaction_analysis``
entry point so the import can be run standalone. It exists to bridge the
synchronous shell invocation into the async pipeline and to make sure
progress is visible on the console.
Configures root logging at INFO level (so the per-record import logs and
the verification count emitted under :func:`import_analyses` are shown),
then runs :func:`async_main` to completion via ``asyncio.run``. Called only
from this module's ``if __name__ == "__main__"`` guard; the orchestrated
path in ``run_atlas.py`` awaits :func:`async_main` directly and does not go
through this function.
Returns:
None.
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
asyncio.run(async_main())
if __name__ == "__main__":
main()