Source code for tools.feature_atlas.atlas_connection

"""FalkorDB connection helper for the Feature Interaction Atlas.

Connects to the same Redis/FalkorDB instance as the production knowledge
graph but uses a completely separate graph namespace:
``stargazer_feature_interaction_atlas``.

Reuses the project's ``config.Config.load()`` for Redis URL and TLS
kwargs, matching the pattern in ``memory_search.py``.

# fire -- SEPARATE GRAPH. SEPARATE KEYSPACE. ZERO RISK.
"""

from __future__ import annotations

import asyncio
import logging
import os
import sys
import time
from pathlib import Path
from typing import Any

# Ensure project root is on sys.path so we can import config, etc.
_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(_PROJECT_ROOT))

from tools.feature_atlas import ATLAS_GRAPH_NAME

logger = logging.getLogger(__name__)


[docs] async def get_redis() -> Any: """Open an async Redis connection configured from the project ``Config``. Loads ``config.Config`` to reuse the production Redis URL and TLS settings, opens a decoded async client, and raises FalkorDB's ``RESULTSET_SIZE`` cap so the large atlas graph queries are not truncated (best-effort, ignored if the command is unavailable). The Feature Interaction Atlas shares the Redis instance with the production knowledge graph but is isolated by graph namespace, not by connection. This performs a network connect and issues a ``GRAPH.CONFIG SET`` command. It is called by :func:`get_atlas_graph` whenever no client is supplied; the various atlas scripts (``load_features_to_falkor``, ``detect_code_interactions``, ``query_atlas``, and others) reach it transitively through that function. Returns: Any: A connected ``redis.asyncio`` client with ``decode_responses=True``. """ import redis.asyncio as aioredis from config import Config cfg = Config.load() ssl_kwargs = cfg.redis_connection_kwargs_for_url(cfg.redis_url) rc = await aioredis.from_url( cfg.redis_url, decode_responses=True, **ssl_kwargs, ) # FalkorDB needs large results for atlas queries try: await rc.execute_command("GRAPH.CONFIG", "SET", "RESULTSET_SIZE", -1) except Exception: pass return rc
[docs] async def get_atlas_graph(redis_client: Any | None = None) -> tuple[Any, Any]: """Select the isolated atlas graph and return it with its Redis client. Builds an async ``FalkorDB`` over the Redis client's connection pool and selects the dedicated ``ATLAS_GRAPH_NAME`` (``stargazer_feature_interaction_atlas``) namespace, keeping all atlas work off the production knowledge graph's keyspace. When no client is passed it opens one through :func:`get_redis`. This connects to FalkorDB over Redis. It is the standard entry point for every atlas script -- ``load_features_to_falkor``, ``detect_code_interactions``, ``generate_interaction_prompts``, ``import_interaction_analysis``, ``query_atlas``, and ``export_sarah_demo`` each call it to obtain a graph handle before running queries or merges. Args: redis_client: An existing async Redis client to reuse; when ``None`` a fresh connection is created via :func:`get_redis`. Returns: tuple[Any, Any]: ``(falkordb_graph, redis_client)`` -- the selected atlas graph handle and the Redis client backing it (so callers can close it). """ from falkordb.asyncio import FalkorDB if redis_client is None: redis_client = await get_redis() db = FalkorDB(connection_pool=redis_client.connection_pool) graph = db.select_graph(ATLAS_GRAPH_NAME) return graph, redis_client
[docs] async def atlas_query( graph: Any, cypher: str, params: dict | None = None, ) -> Any: """Run a read-write Cypher query against the atlas graph with error logging. Thin wrapper over the FalkorDB graph's ``query`` method that logs the exception and a truncated copy of the failing Cypher before re-raising, so write failures surface with enough context to debug. This is the mutating path (it allows ``MERGE``/``SET``), distinct from the read-only :func:`atlas_ro_query`. It executes Cypher against FalkorDB. It is called by all of the MERGE helpers in this module -- :func:`merge_feature`, :func:`merge_code_interaction`, :func:`merge_interaction_prompt`, and :func:`merge_interaction_analysis` (the last also using it for a follow-up status update). Args: graph: The selected FalkorDB atlas graph handle. cypher: The Cypher statement to execute. params: Optional query parameters. Returns: Any: The FalkorDB query result object. Raises: Exception: Re-raises any error from the underlying ``query`` call after logging it. """ try: return await graph.query(cypher, params=params) except Exception as e: logger.error("Atlas Cypher query failed: %s\nQuery: %s", e, cypher[:500]) raise
[docs] async def atlas_ro_query( graph: Any, cypher: str, params: dict | None = None, ) -> Any: """Run a read-only Cypher query against the atlas graph with error logging. Mirrors :func:`atlas_query` but routes through the FalkorDB graph's ``ro_query`` method, which the engine can fan out to replicas and which forbids mutations -- the safe path for analytics and dashboards over the atlas. On failure it logs the error and a truncated copy of the Cypher before re-raising. It executes read-only Cypher against FalkorDB. No caller invokes this module-level wrapper today (the read paths in ``query_atlas`` and ``atlas_query_tool`` call ``graph.ro_query`` directly); it is provided as the read-only counterpart to :func:`atlas_query` for atlas tooling. Args: graph: The selected FalkorDB atlas graph handle. cypher: The read-only Cypher statement to execute. params: Optional query parameters. Returns: Any: The FalkorDB query result object. Raises: Exception: Re-raises any error from the underlying ``ro_query`` call after logging it. """ try: return await graph.ro_query(cypher, params=params) except Exception as e: logger.error("Atlas RO query failed: %s\nQuery: %s", e, cypher[:500]) raise
# --------------------------------------------------------------------------- # MERGE helpers -- idempotent node/edge loaders # ---------------------------------------------------------------------------
[docs] async def merge_feature(graph: Any, feature: dict[str, Any]) -> None: """Upsert a Feature node into the atlas graph, idempotently. Runs a ``MERGE`` on ``(:Feature {id})`` and ``SET`` of the descriptive properties (human name, category, description, plus the list-valued files, symbols, entrypoints, data stores, and external tools serialised to JSON strings, and a confidence score), stamping ``updated_at`` every time and ``created_at`` only on first insert. Re-running with the same id refreshes the node in place rather than duplicating it. It writes to FalkorDB through :func:`atlas_query`. It is called by ``tools/feature_atlas/load_features_to_falkor.py`` in a loop over the feature registry. Args: graph: The selected FalkorDB atlas graph handle. feature: Feature mapping; ``id``, ``human_name``, and ``category`` are required, the list/text/confidence fields are optional. """ import json as _json now = time.time() cypher = """ MERGE (f:Feature {id: $id}) SET f.human_name = $human_name, f.category = $category, f.description = $description, f.files = $files, f.symbols = $symbols, f.entrypoints = $entrypoints, f.data_stores = $data_stores, f.external_tools = $external_tools, f.confidence = $confidence, f.updated_at = $now ON CREATE SET f.created_at = $now """ params = { "id": feature["id"], "human_name": feature["human_name"], "category": feature["category"], "description": feature.get("description", ""), "files": _json.dumps(feature.get("files", [])), "symbols": _json.dumps(feature.get("symbols", [])), "entrypoints": _json.dumps(feature.get("entrypoints", [])), "data_stores": _json.dumps(feature.get("data_stores", [])), "external_tools": _json.dumps(feature.get("external_tools", [])), "confidence": float(feature.get("confidence", 0.0)), "now": now, } await atlas_query(graph, cypher, params)
[docs] async def merge_code_interaction(graph: Any, interaction: dict[str, Any]) -> None: """Upsert a CODE_INTERACTS_WITH edge between two existing Feature nodes. Matches the source and target ``(:Feature)`` nodes by id and merges a ``CODE_INTERACTS_WITH`` relationship keyed on its ``mechanism``, then sets the confidence, the JSON-serialised file and symbol references, the free-text evidence, and the direction, stamping ``created_at`` only on first insert. Because the merge is keyed on mechanism, two features can be linked by several distinct interaction edges. Both endpoint features must already exist (typically loaded by :func:`merge_feature` first). It writes to FalkorDB through :func:`atlas_query`. It is called by ``tools/feature_atlas/detect_code_interactions.py`` for each detected static interaction. Args: graph: The selected FalkorDB atlas graph handle. interaction: Interaction mapping; ``source_id``, ``target_id``, and ``mechanism`` are required, the rest optional. """ import json as _json now = time.time() cypher = """ MATCH (src:Feature {id: $source_id}) MATCH (tgt:Feature {id: $target_id}) MERGE (src)-[e:CODE_INTERACTS_WITH {mechanism: $mechanism}]->(tgt) SET e.confidence = $confidence, e.file_refs = $file_refs, e.symbol_refs = $symbol_refs, e.evidence = $evidence, e.direction = $direction, e.updated_at = $now ON CREATE SET e.created_at = $now """ params = { "source_id": interaction["source_id"], "target_id": interaction["target_id"], "mechanism": interaction["mechanism"], "confidence": float(interaction.get("confidence", 0.0)), "file_refs": _json.dumps(interaction.get("file_refs", [])), "symbol_refs": _json.dumps(interaction.get("symbol_refs", [])), "evidence": interaction.get("evidence", ""), "direction": interaction.get("direction", ""), "now": now, } await atlas_query(graph, cypher, params)
[docs] async def merge_interaction_prompt( graph: Any, source_id: str, target_id: str, prompt: str, status: str = "pending", ) -> None: """Upsert an InteractionPrompt node wired between two Feature nodes. Matches the source and target ``(:Feature)`` nodes and merges an ``InteractionPrompt`` keyed on the ``(source_id, target_id)`` pair, linking it as ``(src)-[:HAS_INTERACTION]->(prompt)-[:TARGETS]->(tgt)`` and storing the generated prompt text and a workflow ``status`` (default ``pending``). These prompt nodes are the queue an offline LLM pass later consumes to produce analyses, so re-running updates the prompt without creating a duplicate. It writes to FalkorDB through :func:`atlas_query`. It is called by ``tools/feature_atlas/generate_interaction_prompts.py`` for each candidate feature pair. Args: graph: The selected FalkorDB atlas graph handle. source_id: Id of the source Feature node. target_id: Id of the target Feature node. prompt: The generated interaction-analysis prompt text. status: Workflow status to record (defaults to ``pending``). """ now = time.time() cypher = """ MATCH (src:Feature {id: $source_id}) MATCH (tgt:Feature {id: $target_id}) MERGE (src)-[:HAS_INTERACTION]->(p:InteractionPrompt { source_id: $source_id, target_id: $target_id })-[:TARGETS]->(tgt) SET p.prompt = $prompt, p.status = $status, p.updated_at = $now ON CREATE SET p.created_at = $now """ params = { "source_id": source_id, "target_id": target_id, "prompt": prompt, "status": status, "now": now, } await atlas_query(graph, cypher, params)
[docs] async def merge_interaction_analysis( graph: Any, analysis: dict[str, Any], ) -> None: """Upsert an InteractionAnalysis node and mark its prompt completed. Matches the ``InteractionPrompt`` for the ``(source_id, target_id)`` pair and merges a ``(prompt)-[:PRODUCED]->(:InteractionAnalysis)`` node, storing the LLM-derived findings: the summary, the direct/indirect interaction and shared-state narratives, memory/NCM/routing/prompt-context effects, the JSON-serialised failure modes, security risks, recommended tests and constraints and source refs, and the synergy/risk/weirdness/confidence scores. After the upsert it issues a second query setting the originating prompt's ``status`` to ``completed`` so the analysis queue drains. It writes to FalkorDB via two :func:`atlas_query` calls (the merge plus the status update). It is called by ``tools/feature_atlas/import_interaction_analysis.py`` for each completed analysis record. Args: graph: The selected FalkorDB atlas graph handle. analysis: Analysis mapping; ``source_id`` and ``target_id`` are required, all of the narrative, list, and score fields optional. """ import json as _json now = time.time() cypher = """ MATCH (p:InteractionPrompt { source_id: $source_id, target_id: $target_id }) MERGE (p)-[:PRODUCED]->(a:InteractionAnalysis { source_id: $source_id, target_id: $target_id }) SET a.summary = $summary, a.direct_interaction = $direct_interaction, a.indirect_interaction = $indirect_interaction, a.shared_state = $shared_state, a.memory_effects = $memory_effects, a.ncm_effects = $ncm_effects, a.routing_effects = $routing_effects, a.prompt_context_risks = $prompt_context_risks, a.failure_modes = $failure_modes, a.security_risks = $security_risks, a.synergy_score = $synergy_score, a.risk_score = $risk_score, a.weirdness_score = $weirdness_score, a.recommended_tests = $recommended_tests, a.recommended_constraints = $recommended_constraints, a.source_refs = $source_refs, a.confidence = $confidence, a.updated_at = $now ON CREATE SET a.created_at = $now """ params = { "source_id": analysis["source_id"], "target_id": analysis["target_id"], "summary": analysis.get("summary", ""), "direct_interaction": analysis.get("direct_interaction", ""), "indirect_interaction": analysis.get("indirect_interaction", ""), "shared_state": analysis.get("shared_state", ""), "memory_effects": analysis.get("memory_effects", ""), "ncm_effects": analysis.get("ncm_effects", ""), "routing_effects": analysis.get("routing_effects", ""), "prompt_context_risks": analysis.get("prompt_context_risks", ""), "failure_modes": _json.dumps(analysis.get("failure_modes", [])), "security_risks": _json.dumps(analysis.get("security_risks", [])), "synergy_score": float(analysis.get("synergy_score", 0.0)), "risk_score": float(analysis.get("risk_score", 0.0)), "weirdness_score": float(analysis.get("weirdness_score", 0.0)), "recommended_tests": _json.dumps(analysis.get("recommended_tests", [])), "recommended_constraints": _json.dumps( analysis.get("recommended_constraints", []) ), "source_refs": _json.dumps(analysis.get("source_refs", [])), "confidence": float(analysis.get("confidence", 0.0)), "now": now, } await atlas_query(graph, cypher, params) # Also mark the prompt as completed await atlas_query( graph, "MATCH (p:InteractionPrompt {source_id: $s, target_id: $t}) " "SET p.status = 'completed'", {"s": analysis["source_id"], "t": analysis["target_id"]}, )