Source code for tools.feature_atlas.detect_code_interactions

"""Step 3: Detect code-grounded interactions between features.

Static analysis only: analyzes imports, shared files, shared data stores,
and shared env vars between feature pairs using the repo symbol index.

Outputs ``outputs/code_interactions.json`` and loads CODE_INTERACTS_WITH
edges into FalkorDB.

Usage:
    python -m tools.feature_atlas.detect_code_interactions

# fire skull -- FINDING THE NERVES BETWEEN ORGANS
"""

from __future__ import annotations

import asyncio
import json
import logging
import sys
import time
from collections import defaultdict
from pathlib import Path
from typing import Any

import yaml

_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(_PROJECT_ROOT))

logger = logging.getLogger(__name__)

_ATLAS_DIR = Path(__file__).resolve().parent
_CONFIG_PATH = _ATLAS_DIR / "config.yaml"
_FEATURE_REGISTRY_PATH = _ATLAS_DIR / "outputs" / "feature_registry.json"
_SYMBOLS_PATH = _ATLAS_DIR / "outputs" / "repo_symbols.json"
_OUTPUT_PATH = _ATLAS_DIR / "outputs" / "code_interactions.json"


def _load_config() -> dict[str, Any]:
    """Load the Feature Atlas configuration from ``config.yaml``.

    Reads the module-level ``_CONFIG_PATH`` (``tools/feature_atlas/config.yaml``)
    and parses it with ``yaml.safe_load``. The returned mapping holds the
    canonical feature definitions and swarm settings shared across the atlas
    pipeline. This is a pure filesystem read with no Redis, knowledge-graph,
    LLM, or HTTP side effects.

    No internal callers were found; this helper is retained for symmetry with
    the other atlas steps and for ad-hoc/manual use.

    Returns:
        The parsed configuration as a dictionary.

    Raises:
        FileNotFoundError: If ``config.yaml`` does not exist at the expected path.
        yaml.YAMLError: If the file is present but cannot be parsed as YAML.
    """
    with open(_CONFIG_PATH, "r", encoding="utf-8") as f:
        return yaml.safe_load(f)


def _load_features() -> list[dict[str, Any]]:
    """Load the feature registry produced by the extraction swarm.

    Reads ``outputs/feature_registry.json`` (the module-level
    ``_FEATURE_REGISTRY_PATH``), which is written by
    ``extract_features_swarm.async_main``. Each entry carries the feature id,
    claimed files, declared data stores, and supporting evidence that
    :func:`detect_static_interactions` cross-references to find code-grounded
    edges. This is a pure filesystem read with no external side effects.

    Invoked by :func:`async_main` in this module; no other internal callers
    were found.

    Returns:
        The list of feature records loaded from the registry JSON.

    Raises:
        FileNotFoundError: If the feature registry file is missing (the
            extraction step has not run yet).
        json.JSONDecodeError: If the file contents are not valid JSON.
    """
    with open(_FEATURE_REGISTRY_PATH, "r", encoding="utf-8") as f:
        return json.load(f)


def _load_symbols() -> list[dict[str, Any]]:
    """Load the repository symbol index used for static interaction analysis.

    Reads ``outputs/repo_symbols.json`` (the module-level ``_SYMBOLS_PATH``),
    the per-file symbol records (imports, env vars, classes, functions)
    emitted by the repo symbol extractor. :func:`detect_static_interactions`
    consumes these records to build the import graph and env-var sets. This is
    a pure filesystem read with no external side effects.

    Invoked by :func:`async_main` in this module; no other internal callers
    were found.

    Returns:
        The list of per-file symbol records loaded from the symbol index JSON.

    Raises:
        FileNotFoundError: If the symbol index file is missing (the symbol
            extraction step has not run yet).
        json.JSONDecodeError: If the file contents are not valid JSON.
    """
    with open(_SYMBOLS_PATH, "r", encoding="utf-8") as f:
        return json.load(f)


def _build_file_to_features(features: list[dict[str, Any]]) -> dict[str, list[str]]:
    """Build a reverse index from file path to the feature IDs that claim it.

    Inverts the feature registry so that, for each file path listed under a
    feature's ``files`` key, the resulting mapping records every feature that
    references it. This is the foundation for detecting the
    ``shared_memory_layer`` mechanism, where two features touching the same
    file imply an implicit interaction. Pure in-memory transformation with no
    Redis, knowledge-graph, LLM, or filesystem side effects.

    Invoked by :func:`detect_static_interactions` in this module; no other
    internal callers were found.

    Args:
        features: The feature registry records, each carrying an ``id`` and a
            ``files`` list of claimed file paths.

    Returns:
        A mapping from file path to the list of feature IDs that claim it.
    """
    mapping: dict[str, list[str]] = defaultdict(list)
    for feat in features:
        for filepath in feat.get("files", []):
            mapping[filepath].append(feat["id"])
    return dict(mapping)


def _build_import_graph(
    symbols: list[dict[str, Any]],
) -> dict[str, set[str]]:
    """Build an import graph mapping each Python file to its imported modules.

    Walks the repo symbol index and, for every record of ``type`` ``python``,
    collects the set of module names that file imports. The resulting graph
    lets :func:`detect_static_interactions` trace ``direct_import`` edges by
    resolving each imported module back to a file owned by another feature.
    Non-Python records are skipped. Pure in-memory transformation with no
    external side effects.

    Invoked by :func:`detect_static_interactions` in this module; no other
    internal callers were found.

    Args:
        symbols: The per-file symbol records from the repo symbol index, each
            optionally carrying ``type``, ``file``, and ``imports`` fields.

    Returns:
        A mapping from Python file path to the set of module names it imports.
    """
    graph: dict[str, set[str]] = {}
    for record in symbols:
        if record.get("type") != "python":
            continue
        filepath = record.get("file", "")
        imports = set(record.get("imports", []))
        graph[filepath] = imports
    return graph


def _module_to_filepath(module: str, symbols: list[dict[str, Any]]) -> str | None:
    """Resolve a Python module name to its file path within the symbol index.

    Translates a dotted module name (for example ``foo.bar``) into the two
    plausible on-disk paths (``foo/bar.py`` and ``foo/bar/__init__.py``) and
    returns the first that is actually present in the symbol index. This lets
    :func:`detect_static_interactions` decide whether an import lands inside a
    file owned by another feature. Resolution is best-effort and string-based;
    modules outside the indexed repo (third-party packages, dynamic imports)
    resolve to ``None``. Pure in-memory lookup with no external side effects.

    Invoked by :func:`detect_static_interactions` in this module; no other
    internal callers were found.

    Args:
        module: The dotted Python module name to resolve.
        symbols: The per-file symbol records, used as the set of known files.

    Returns:
        The matching repo-relative file path, or ``None`` when no indexed file
        corresponds to the module.
    """
    # Try direct mapping: foo.bar -> foo/bar.py
    candidates = [
        module.replace(".", "/") + ".py",
        module.replace(".", "/") + "/__init__.py",
    ]
    known_files = {r.get("file", "") for r in symbols}
    for candidate in candidates:
        if candidate in known_files:
            return candidate
    return None


[docs] def detect_static_interactions( features: list[dict[str, Any]], symbols: list[dict[str, Any]], ) -> list[dict[str, Any]]: """Detect code-grounded interactions between feature pairs via static analysis. The first and only phase of the interaction detector: it cross-references the feature registry against the repo symbol index to surface edges that are evidenced purely by code structure, without any LLM or runtime inspection. For every ordered pair of features it emits an interaction when they share a file (mechanism ``shared_memory_layer``), when one imports a file owned by the other (mechanism ``direct_import``), when they share a declared data store (mechanism ``shared_falkordb_graph``, ``shared_redis_db``, or ``shared_vector_store`` depending on the store name), or when they reference the same environment variable (mechanism ``shared_env_var``). Each edge carries a confidence score and human-readable evidence string. The results are deduplicated so that only the highest-confidence interaction survives per source, target, and mechanism triple, and the surviving and pre-dedup counts are logged. Internally it builds its lookup structures by calling :func:`_build_file_to_features` and :func:`_build_import_graph`, and resolves imports back to owning files via :func:`_module_to_filepath`. It is a pure in-memory computation with no Redis, knowledge-graph, LLM, HTTP, or filesystem side effects of its own. Invoked by :func:`async_main` in this module, which then persists the returned edges to FalkorDB; no other internal callers were found. Args: features: The feature registry records, each with an ``id``, a ``files`` list, and an optional ``data_stores`` list. symbols: The per-file symbol records used to derive imports and the environment variables referenced by each feature's files. Returns: A deduplicated list of interaction edge dictionaries, each describing a source feature, target feature, mechanism, confidence, supporting file and symbol references, an evidence string, and a direction label. """ interactions: list[dict[str, Any]] = [] file_to_features = _build_file_to_features(features) import_graph = _build_import_graph(symbols) # Build per-feature data store sets feature_data_stores: dict[str, set[str]] = {} feature_env_vars: dict[str, set[str]] = {} feature_files: dict[str, set[str]] = {} # Build symbol lookup: file -> record file_to_record: dict[str, dict[str, Any]] = {} for record in symbols: file_to_record[record.get("file", "")] = record for feat in features: fid = feat["id"] feature_files[fid] = set(feat.get("files", [])) feature_data_stores[fid] = set(feat.get("data_stores", [])) # Collect env vars from all files in this feature env_vars: set[str] = set() for filepath in feat.get("files", []): record = file_to_record.get(filepath, {}) env_vars.update(record.get("env_vars", [])) feature_env_vars[fid] = env_vars feature_ids = [f["id"] for f in features] for i, src_id in enumerate(feature_ids): for j, tgt_id in enumerate(feature_ids): if i == j: continue src_files = feature_files.get(src_id, set()) tgt_files = feature_files.get(tgt_id, set()) # 1. Shared files shared = src_files & tgt_files if shared: interactions.append({ "source_id": src_id, "target_id": tgt_id, "mechanism": "shared_memory_layer", "confidence": min(0.9, 0.5 + len(shared) * 0.1), "file_refs": sorted(shared)[:10], "symbol_refs": [], "evidence": ( f"{src_id} and {tgt_id} share {len(shared)} file(s): " f"{', '.join(sorted(shared)[:3])}" ), "direction": f"{src_id}_TO_{tgt_id}", }) # 2. Import relationships for src_file in src_files: src_imports = import_graph.get(src_file, set()) for imp in src_imports: imp_file = _module_to_filepath(imp, symbols) if imp_file and imp_file in tgt_files: interactions.append({ "source_id": src_id, "target_id": tgt_id, "mechanism": "direct_import", "confidence": 0.85, "file_refs": [src_file, imp_file], "symbol_refs": [imp], "evidence": ( f"{src_file} imports {imp} " f"which belongs to {tgt_id}" ), "direction": f"{src_id}_TO_{tgt_id}", }) break # One import evidence per pair is enough # 3. Shared data stores shared_ds = feature_data_stores.get(src_id, set()) & feature_data_stores.get( tgt_id, set() ) if shared_ds: interactions.append({ "source_id": src_id, "target_id": tgt_id, "mechanism": "shared_falkordb_graph" if any("falkor" in d.lower() for d in shared_ds) else "shared_redis_db" if any("redis" in d.lower() for d in shared_ds) else "shared_vector_store", "confidence": 0.8, "file_refs": [], "symbol_refs": [], "evidence": ( f"{src_id} and {tgt_id} share data store(s): " f"{', '.join(sorted(shared_ds)[:3])}" ), "direction": f"{src_id}_TO_{tgt_id}", }) # 4. Shared env vars shared_env = feature_env_vars.get(src_id, set()) & feature_env_vars.get( tgt_id, set() ) if shared_env: interactions.append({ "source_id": src_id, "target_id": tgt_id, "mechanism": "shared_env_var", "confidence": 0.6, "file_refs": [], "symbol_refs": sorted(shared_env)[:5], "evidence": ( f"{src_id} and {tgt_id} reference shared env vars: " f"{', '.join(sorted(shared_env)[:3])}" ), "direction": f"{src_id}_TO_{tgt_id}", }) # Deduplicate: keep the highest confidence interaction per (src, tgt, mechanism) seen: dict[tuple[str, str, str], dict[str, Any]] = {} for interaction in interactions: key = ( interaction["source_id"], interaction["target_id"], interaction["mechanism"], ) if key not in seen or interaction["confidence"] > seen[key]["confidence"]: seen[key] = interaction deduped = list(seen.values()) logger.info( "Static analysis found %d interactions (%d before dedup)", len(deduped), len(interactions), ) return deduped
[docs] async def load_interactions_to_falkor( interactions: list[dict[str, Any]], ) -> int: """Persist detected interactions as CODE_INTERACTS_WITH edges in FalkorDB. Opens a connection to the atlas graph via :func:`tools.feature_atlas.atlas_connection.get_atlas_graph` and, for each interaction, upserts the corresponding ``CODE_INTERACTS_WITH`` relationship by calling :func:`tools.feature_atlas.atlas_connection.merge_code_interaction`. Individual failures are caught and logged so one bad edge does not abort the whole load, and the underlying Redis client is closed before returning. This mutates the FalkorDB ``stargazer_feature_interaction_atlas`` graph and holds a network connection for the duration of the load. Invoked by :func:`async_main` in this module; no other internal callers were found. Args: interactions: The interaction edge dictionaries produced by :func:`detect_static_interactions`. Returns: The number of edges that were successfully merged into the graph. """ from tools.feature_atlas.atlas_connection import ( get_atlas_graph, merge_code_interaction, ) graph, rc = await get_atlas_graph() loaded = 0 for interaction in interactions: try: await merge_code_interaction(graph, interaction) loaded += 1 except Exception as e: logger.error( "Failed to load interaction %s -> %s: %s", interaction.get("source_id"), interaction.get("target_id"), e, ) await rc.aclose() return loaded
[docs] async def async_main() -> None: """Run the full code-interaction detection step end to end. Orchestrates step 3 of the Feature Atlas pipeline: it loads the feature registry and symbol index from disk via :func:`_load_features` and :func:`_load_symbols`, runs :func:`detect_static_interactions` to compute the edges, writes them to ``outputs/code_interactions.json``, then persists them to FalkorDB through :func:`load_interactions_to_falkor`. It finishes by printing a human-readable summary that breaks the edges down by mechanism and reports timing and the output path. Side effects therefore span the filesystem (the output JSON) and the FalkorDB atlas graph (via the load helper); it also logs progress at INFO level. Invoked by :func:`main` in this module's ``__main__`` guard and imported as ``async_main`` by :func:`tools.feature_atlas.run_atlas.step_detect_interactions` (the ``detect-interactions`` step of the atlas runner); no other internal callers were found. Returns: None. """ t0 = time.time() features = _load_features() symbols = _load_symbols() # Phase 1: Static analysis logger.info("Phase 1: Static interaction detection...") interactions = detect_static_interactions(features, symbols) # Write output _OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) with open(_OUTPUT_PATH, "w", encoding="utf-8") as f: json.dump(interactions, f, indent=2, ensure_ascii=False) # Load to FalkorDB logger.info("Loading %d interactions to FalkorDB...", len(interactions)) loaded = await load_interactions_to_falkor(interactions) elapsed = time.time() - t0 # Summary mechanisms = defaultdict(int) for ix in interactions: mechanisms[ix["mechanism"]] += 1 print(f"\n{'=' * 60}") print(f" CODE INTERACTION DETECTION COMPLETE") print(f"{'=' * 60}") print(f" Interactions found: {len(interactions)}") print(f" Loaded to FalkorDB: {loaded}") print(f" By mechanism:") for mech, count in sorted(mechanisms.items(), key=lambda x: -x[1]): print(f" {mech:35s} {count:>4}") print(f" Time elapsed: {elapsed:.1f}s") print(f" Output: {_OUTPUT_PATH}") print(f"{'=' * 60}\n")
[docs] def main() -> None: """Synchronous entry point for the code-interaction detection step. Configures root logging at INFO level and drives the async pipeline by calling ``asyncio.run(async_main())``, which loads the registry and symbol index, computes the interactions, writes ``outputs/code_interactions.json``, and loads the edges into FalkorDB. All Redis, FalkorDB, and filesystem side effects happen transitively inside :func:`async_main`; this wrapper only sets up logging and starts the event loop. Invoked from the module's ``if __name__ == "__main__"`` guard via ``python -m tools.feature_atlas.detect_code_interactions``; no other internal callers were found. Returns: None. """ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) asyncio.run(async_main())
if __name__ == "__main__": main()