"""Step 3: Detect code-grounded interactions between features.
Static analysis only: analyzes imports, shared files, shared data stores,
and shared env vars between feature pairs using the repo symbol index.
Outputs ``outputs/code_interactions.json`` and loads CODE_INTERACTS_WITH
edges into FalkorDB.
Usage:
python -m tools.feature_atlas.detect_code_interactions
# fire skull -- FINDING THE NERVES BETWEEN ORGANS
"""
from __future__ import annotations
import asyncio
import json
import logging
import sys
import time
from collections import defaultdict
from pathlib import Path
from typing import Any
import yaml
_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
logger = logging.getLogger(__name__)
_ATLAS_DIR = Path(__file__).resolve().parent
_CONFIG_PATH = _ATLAS_DIR / "config.yaml"
_FEATURE_REGISTRY_PATH = _ATLAS_DIR / "outputs" / "feature_registry.json"
_SYMBOLS_PATH = _ATLAS_DIR / "outputs" / "repo_symbols.json"
_OUTPUT_PATH = _ATLAS_DIR / "outputs" / "code_interactions.json"
def _load_config() -> dict[str, Any]:
"""Load the Feature Atlas configuration from ``config.yaml``.
Reads the module-level ``_CONFIG_PATH`` (``tools/feature_atlas/config.yaml``)
and parses it with ``yaml.safe_load``. The returned mapping holds the
canonical feature definitions and swarm settings shared across the atlas
pipeline. This is a pure filesystem read with no Redis, knowledge-graph,
LLM, or HTTP side effects.
No internal callers were found; this helper is retained for symmetry with
the other atlas steps and for ad-hoc/manual use.
Returns:
The parsed configuration as a dictionary.
Raises:
FileNotFoundError: If ``config.yaml`` does not exist at the expected path.
yaml.YAMLError: If the file is present but cannot be parsed as YAML.
"""
with open(_CONFIG_PATH, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def _load_features() -> list[dict[str, Any]]:
"""Load the feature registry produced by the extraction swarm.
Reads ``outputs/feature_registry.json`` (the module-level
``_FEATURE_REGISTRY_PATH``), which is written by
``extract_features_swarm.async_main``. Each entry carries the feature id,
claimed files, declared data stores, and supporting evidence that
:func:`detect_static_interactions` cross-references to find code-grounded
edges. This is a pure filesystem read with no external side effects.
Invoked by :func:`async_main` in this module; no other internal callers
were found.
Returns:
The list of feature records loaded from the registry JSON.
Raises:
FileNotFoundError: If the feature registry file is missing (the
extraction step has not run yet).
json.JSONDecodeError: If the file contents are not valid JSON.
"""
with open(_FEATURE_REGISTRY_PATH, "r", encoding="utf-8") as f:
return json.load(f)
def _load_symbols() -> list[dict[str, Any]]:
"""Load the repository symbol index used for static interaction analysis.
Reads ``outputs/repo_symbols.json`` (the module-level ``_SYMBOLS_PATH``),
the per-file symbol records (imports, env vars, classes, functions)
emitted by the repo symbol extractor. :func:`detect_static_interactions`
consumes these records to build the import graph and env-var sets. This is
a pure filesystem read with no external side effects.
Invoked by :func:`async_main` in this module; no other internal callers
were found.
Returns:
The list of per-file symbol records loaded from the symbol index JSON.
Raises:
FileNotFoundError: If the symbol index file is missing (the symbol
extraction step has not run yet).
json.JSONDecodeError: If the file contents are not valid JSON.
"""
with open(_SYMBOLS_PATH, "r", encoding="utf-8") as f:
return json.load(f)
def _build_file_to_features(features: list[dict[str, Any]]) -> dict[str, list[str]]:
"""Build a reverse index from file path to the feature IDs that claim it.
Inverts the feature registry so that, for each file path listed under a
feature's ``files`` key, the resulting mapping records every feature that
references it. This is the foundation for detecting the
``shared_memory_layer`` mechanism, where two features touching the same
file imply an implicit interaction. Pure in-memory transformation with no
Redis, knowledge-graph, LLM, or filesystem side effects.
Invoked by :func:`detect_static_interactions` in this module; no other
internal callers were found.
Args:
features: The feature registry records, each carrying an ``id`` and a
``files`` list of claimed file paths.
Returns:
A mapping from file path to the list of feature IDs that claim it.
"""
mapping: dict[str, list[str]] = defaultdict(list)
for feat in features:
for filepath in feat.get("files", []):
mapping[filepath].append(feat["id"])
return dict(mapping)
def _build_import_graph(
symbols: list[dict[str, Any]],
) -> dict[str, set[str]]:
"""Build an import graph mapping each Python file to its imported modules.
Walks the repo symbol index and, for every record of ``type`` ``python``,
collects the set of module names that file imports. The resulting graph
lets :func:`detect_static_interactions` trace ``direct_import`` edges by
resolving each imported module back to a file owned by another feature.
Non-Python records are skipped. Pure in-memory transformation with no
external side effects.
Invoked by :func:`detect_static_interactions` in this module; no other
internal callers were found.
Args:
symbols: The per-file symbol records from the repo symbol index, each
optionally carrying ``type``, ``file``, and ``imports`` fields.
Returns:
A mapping from Python file path to the set of module names it imports.
"""
graph: dict[str, set[str]] = {}
for record in symbols:
if record.get("type") != "python":
continue
filepath = record.get("file", "")
imports = set(record.get("imports", []))
graph[filepath] = imports
return graph
def _module_to_filepath(module: str, symbols: list[dict[str, Any]]) -> str | None:
"""Resolve a Python module name to its file path within the symbol index.
Translates a dotted module name (for example ``foo.bar``) into the two
plausible on-disk paths (``foo/bar.py`` and ``foo/bar/__init__.py``) and
returns the first that is actually present in the symbol index. This lets
:func:`detect_static_interactions` decide whether an import lands inside a
file owned by another feature. Resolution is best-effort and string-based;
modules outside the indexed repo (third-party packages, dynamic imports)
resolve to ``None``. Pure in-memory lookup with no external side effects.
Invoked by :func:`detect_static_interactions` in this module; no other
internal callers were found.
Args:
module: The dotted Python module name to resolve.
symbols: The per-file symbol records, used as the set of known files.
Returns:
The matching repo-relative file path, or ``None`` when no indexed file
corresponds to the module.
"""
# Try direct mapping: foo.bar -> foo/bar.py
candidates = [
module.replace(".", "/") + ".py",
module.replace(".", "/") + "/__init__.py",
]
known_files = {r.get("file", "") for r in symbols}
for candidate in candidates:
if candidate in known_files:
return candidate
return None
[docs]
def detect_static_interactions(
features: list[dict[str, Any]],
symbols: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Detect code-grounded interactions between feature pairs via static analysis.
The first and only phase of the interaction detector: it cross-references
the feature registry against the repo symbol index to surface edges that
are evidenced purely by code structure, without any LLM or runtime
inspection. For every ordered pair of features it emits an interaction
when they share a file (mechanism ``shared_memory_layer``), when one
imports a file owned by the other (mechanism ``direct_import``), when they
share a declared data store (mechanism ``shared_falkordb_graph``,
``shared_redis_db``, or ``shared_vector_store`` depending on the store
name), or when they reference the same environment variable (mechanism
``shared_env_var``). Each edge carries a confidence score and human-readable
evidence string. The results are deduplicated so that only the
highest-confidence interaction survives per source, target, and mechanism
triple, and the surviving and pre-dedup counts are logged.
Internally it builds its lookup structures by calling
:func:`_build_file_to_features` and :func:`_build_import_graph`, and
resolves imports back to owning files via :func:`_module_to_filepath`. It
is a pure in-memory computation with no Redis, knowledge-graph, LLM, HTTP,
or filesystem side effects of its own.
Invoked by :func:`async_main` in this module, which then persists the
returned edges to FalkorDB; no other internal callers were found.
Args:
features: The feature registry records, each with an ``id``, a
``files`` list, and an optional ``data_stores`` list.
symbols: The per-file symbol records used to derive imports and the
environment variables referenced by each feature's files.
Returns:
A deduplicated list of interaction edge dictionaries, each describing a
source feature, target feature, mechanism, confidence, supporting file
and symbol references, an evidence string, and a direction label.
"""
interactions: list[dict[str, Any]] = []
file_to_features = _build_file_to_features(features)
import_graph = _build_import_graph(symbols)
# Build per-feature data store sets
feature_data_stores: dict[str, set[str]] = {}
feature_env_vars: dict[str, set[str]] = {}
feature_files: dict[str, set[str]] = {}
# Build symbol lookup: file -> record
file_to_record: dict[str, dict[str, Any]] = {}
for record in symbols:
file_to_record[record.get("file", "")] = record
for feat in features:
fid = feat["id"]
feature_files[fid] = set(feat.get("files", []))
feature_data_stores[fid] = set(feat.get("data_stores", []))
# Collect env vars from all files in this feature
env_vars: set[str] = set()
for filepath in feat.get("files", []):
record = file_to_record.get(filepath, {})
env_vars.update(record.get("env_vars", []))
feature_env_vars[fid] = env_vars
feature_ids = [f["id"] for f in features]
for i, src_id in enumerate(feature_ids):
for j, tgt_id in enumerate(feature_ids):
if i == j:
continue
src_files = feature_files.get(src_id, set())
tgt_files = feature_files.get(tgt_id, set())
# 1. Shared files
shared = src_files & tgt_files
if shared:
interactions.append({
"source_id": src_id,
"target_id": tgt_id,
"mechanism": "shared_memory_layer",
"confidence": min(0.9, 0.5 + len(shared) * 0.1),
"file_refs": sorted(shared)[:10],
"symbol_refs": [],
"evidence": (
f"{src_id} and {tgt_id} share {len(shared)} file(s): "
f"{', '.join(sorted(shared)[:3])}"
),
"direction": f"{src_id}_TO_{tgt_id}",
})
# 2. Import relationships
for src_file in src_files:
src_imports = import_graph.get(src_file, set())
for imp in src_imports:
imp_file = _module_to_filepath(imp, symbols)
if imp_file and imp_file in tgt_files:
interactions.append({
"source_id": src_id,
"target_id": tgt_id,
"mechanism": "direct_import",
"confidence": 0.85,
"file_refs": [src_file, imp_file],
"symbol_refs": [imp],
"evidence": (
f"{src_file} imports {imp} "
f"which belongs to {tgt_id}"
),
"direction": f"{src_id}_TO_{tgt_id}",
})
break # One import evidence per pair is enough
# 3. Shared data stores
shared_ds = feature_data_stores.get(src_id, set()) & feature_data_stores.get(
tgt_id, set()
)
if shared_ds:
interactions.append({
"source_id": src_id,
"target_id": tgt_id,
"mechanism": "shared_falkordb_graph"
if any("falkor" in d.lower() for d in shared_ds)
else "shared_redis_db"
if any("redis" in d.lower() for d in shared_ds)
else "shared_vector_store",
"confidence": 0.8,
"file_refs": [],
"symbol_refs": [],
"evidence": (
f"{src_id} and {tgt_id} share data store(s): "
f"{', '.join(sorted(shared_ds)[:3])}"
),
"direction": f"{src_id}_TO_{tgt_id}",
})
# 4. Shared env vars
shared_env = feature_env_vars.get(src_id, set()) & feature_env_vars.get(
tgt_id, set()
)
if shared_env:
interactions.append({
"source_id": src_id,
"target_id": tgt_id,
"mechanism": "shared_env_var",
"confidence": 0.6,
"file_refs": [],
"symbol_refs": sorted(shared_env)[:5],
"evidence": (
f"{src_id} and {tgt_id} reference shared env vars: "
f"{', '.join(sorted(shared_env)[:3])}"
),
"direction": f"{src_id}_TO_{tgt_id}",
})
# Deduplicate: keep the highest confidence interaction per (src, tgt, mechanism)
seen: dict[tuple[str, str, str], dict[str, Any]] = {}
for interaction in interactions:
key = (
interaction["source_id"],
interaction["target_id"],
interaction["mechanism"],
)
if key not in seen or interaction["confidence"] > seen[key]["confidence"]:
seen[key] = interaction
deduped = list(seen.values())
logger.info(
"Static analysis found %d interactions (%d before dedup)",
len(deduped),
len(interactions),
)
return deduped
[docs]
async def load_interactions_to_falkor(
interactions: list[dict[str, Any]],
) -> int:
"""Persist detected interactions as CODE_INTERACTS_WITH edges in FalkorDB.
Opens a connection to the atlas graph via
:func:`tools.feature_atlas.atlas_connection.get_atlas_graph` and, for each
interaction, upserts the corresponding ``CODE_INTERACTS_WITH`` relationship
by calling
:func:`tools.feature_atlas.atlas_connection.merge_code_interaction`.
Individual failures are caught and logged so one bad edge does not abort
the whole load, and the underlying Redis client is closed before returning.
This mutates the FalkorDB ``stargazer_feature_interaction_atlas`` graph and
holds a network connection for the duration of the load.
Invoked by :func:`async_main` in this module; no other internal callers
were found.
Args:
interactions: The interaction edge dictionaries produced by
:func:`detect_static_interactions`.
Returns:
The number of edges that were successfully merged into the graph.
"""
from tools.feature_atlas.atlas_connection import (
get_atlas_graph,
merge_code_interaction,
)
graph, rc = await get_atlas_graph()
loaded = 0
for interaction in interactions:
try:
await merge_code_interaction(graph, interaction)
loaded += 1
except Exception as e:
logger.error(
"Failed to load interaction %s -> %s: %s",
interaction.get("source_id"),
interaction.get("target_id"),
e,
)
await rc.aclose()
return loaded
[docs]
async def async_main() -> None:
"""Run the full code-interaction detection step end to end.
Orchestrates step 3 of the Feature Atlas pipeline: it loads the feature
registry and symbol index from disk via :func:`_load_features` and
:func:`_load_symbols`, runs :func:`detect_static_interactions` to compute
the edges, writes them to ``outputs/code_interactions.json``, then persists
them to FalkorDB through :func:`load_interactions_to_falkor`. It finishes by
printing a human-readable summary that breaks the edges down by mechanism
and reports timing and the output path. Side effects therefore span the
filesystem (the output JSON) and the FalkorDB atlas graph (via the load
helper); it also logs progress at INFO level.
Invoked by :func:`main` in this module's ``__main__`` guard and imported as
``async_main`` by
:func:`tools.feature_atlas.run_atlas.step_detect_interactions` (the
``detect-interactions`` step of the atlas runner); no other internal callers
were found.
Returns:
None.
"""
t0 = time.time()
features = _load_features()
symbols = _load_symbols()
# Phase 1: Static analysis
logger.info("Phase 1: Static interaction detection...")
interactions = detect_static_interactions(features, symbols)
# Write output
_OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(_OUTPUT_PATH, "w", encoding="utf-8") as f:
json.dump(interactions, f, indent=2, ensure_ascii=False)
# Load to FalkorDB
logger.info("Loading %d interactions to FalkorDB...", len(interactions))
loaded = await load_interactions_to_falkor(interactions)
elapsed = time.time() - t0
# Summary
mechanisms = defaultdict(int)
for ix in interactions:
mechanisms[ix["mechanism"]] += 1
print(f"\n{'=' * 60}")
print(f" CODE INTERACTION DETECTION COMPLETE")
print(f"{'=' * 60}")
print(f" Interactions found: {len(interactions)}")
print(f" Loaded to FalkorDB: {loaded}")
print(f" By mechanism:")
for mech, count in sorted(mechanisms.items(), key=lambda x: -x[1]):
print(f" {mech:35s} {count:>4}")
print(f" Time elapsed: {elapsed:.1f}s")
print(f" Output: {_OUTPUT_PATH}")
print(f"{'=' * 60}\n")
[docs]
def main() -> None:
"""Synchronous entry point for the code-interaction detection step.
Configures root logging at INFO level and drives the async pipeline by
calling ``asyncio.run(async_main())``, which loads the registry and symbol
index, computes the interactions, writes ``outputs/code_interactions.json``,
and loads the edges into FalkorDB. All Redis, FalkorDB, and filesystem side
effects happen transitively inside :func:`async_main`; this wrapper only
sets up logging and starts the event loop.
Invoked from the module's ``if __name__ == "__main__"`` guard via
``python -m tools.feature_atlas.detect_code_interactions``; no other
internal callers were found.
Returns:
None.
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
asyncio.run(async_main())
if __name__ == "__main__":
main()