"""FalkorDB connection helper for the Feature Interaction Atlas.
Connects to the same Redis/FalkorDB instance as the production knowledge
graph but uses a completely separate graph namespace:
``stargazer_feature_interaction_atlas``.
Reuses the project's ``config.Config.load()`` for Redis URL and TLS
kwargs, matching the pattern in ``memory_search.py``.
# fire -- SEPARATE GRAPH. SEPARATE KEYSPACE. ZERO RISK.
"""
from __future__ import annotations
import asyncio
import logging
import os
import sys
import time
from pathlib import Path
from typing import Any
# Ensure project root is on sys.path so we can import config, etc.
_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
from tools.feature_atlas import ATLAS_GRAPH_NAME
logger = logging.getLogger(__name__)
[docs]
async def get_redis() -> Any:
"""Open an async Redis connection configured from the project ``Config``.
Loads ``config.Config`` to reuse the production Redis URL and TLS settings,
opens a decoded async client, and raises FalkorDB's ``RESULTSET_SIZE`` cap
so the large atlas graph queries are not truncated (best-effort, ignored if
the command is unavailable). The Feature Interaction Atlas shares the Redis
instance with the production knowledge graph but is isolated by graph
namespace, not by connection.
This performs a network connect and issues a ``GRAPH.CONFIG SET`` command.
It is called by :func:`get_atlas_graph` whenever no client is supplied; the
various atlas scripts (``load_features_to_falkor``,
``detect_code_interactions``, ``query_atlas``, and others) reach it
transitively through that function.
Returns:
Any: A connected ``redis.asyncio`` client with
``decode_responses=True``.
"""
import redis.asyncio as aioredis
from config import Config
cfg = Config.load()
ssl_kwargs = cfg.redis_connection_kwargs_for_url(cfg.redis_url)
rc = await aioredis.from_url(
cfg.redis_url,
decode_responses=True,
**ssl_kwargs,
)
# FalkorDB needs large results for atlas queries
try:
await rc.execute_command("GRAPH.CONFIG", "SET", "RESULTSET_SIZE", -1)
except Exception:
pass
return rc
[docs]
async def get_atlas_graph(redis_client: Any | None = None) -> tuple[Any, Any]:
"""Select the isolated atlas graph and return it with its Redis client.
Builds an async ``FalkorDB`` over the Redis client's connection pool and
selects the dedicated ``ATLAS_GRAPH_NAME``
(``stargazer_feature_interaction_atlas``) namespace, keeping all atlas work
off the production knowledge graph's keyspace. When no client is passed it
opens one through :func:`get_redis`.
This connects to FalkorDB over Redis. It is the standard entry point for
every atlas script -- ``load_features_to_falkor``,
``detect_code_interactions``, ``generate_interaction_prompts``,
``import_interaction_analysis``, ``query_atlas``, and ``export_sarah_demo``
each call it to obtain a graph handle before running queries or merges.
Args:
redis_client: An existing async Redis client to reuse; when ``None`` a
fresh connection is created via :func:`get_redis`.
Returns:
tuple[Any, Any]: ``(falkordb_graph, redis_client)`` -- the selected
atlas graph handle and the Redis client backing it (so callers can
close it).
"""
from falkordb.asyncio import FalkorDB
if redis_client is None:
redis_client = await get_redis()
db = FalkorDB(connection_pool=redis_client.connection_pool)
graph = db.select_graph(ATLAS_GRAPH_NAME)
return graph, redis_client
[docs]
async def atlas_query(
graph: Any,
cypher: str,
params: dict | None = None,
) -> Any:
"""Run a read-write Cypher query against the atlas graph with error logging.
Thin wrapper over the FalkorDB graph's ``query`` method that logs the
exception and a truncated copy of the failing Cypher before re-raising, so
write failures surface with enough context to debug. This is the mutating
path (it allows ``MERGE``/``SET``), distinct from the read-only
:func:`atlas_ro_query`.
It executes Cypher against FalkorDB. It is called by all of the MERGE
helpers in this module -- :func:`merge_feature`,
:func:`merge_code_interaction`, :func:`merge_interaction_prompt`, and
:func:`merge_interaction_analysis` (the last also using it for a follow-up
status update).
Args:
graph: The selected FalkorDB atlas graph handle.
cypher: The Cypher statement to execute.
params: Optional query parameters.
Returns:
Any: The FalkorDB query result object.
Raises:
Exception: Re-raises any error from the underlying ``query`` call after
logging it.
"""
try:
return await graph.query(cypher, params=params)
except Exception as e:
logger.error("Atlas Cypher query failed: %s\nQuery: %s", e, cypher[:500])
raise
[docs]
async def atlas_ro_query(
graph: Any,
cypher: str,
params: dict | None = None,
) -> Any:
"""Run a read-only Cypher query against the atlas graph with error logging.
Mirrors :func:`atlas_query` but routes through the FalkorDB graph's
``ro_query`` method, which the engine can fan out to replicas and which
forbids mutations -- the safe path for analytics and dashboards over the
atlas. On failure it logs the error and a truncated copy of the Cypher
before re-raising.
It executes read-only Cypher against FalkorDB. No caller invokes this
module-level wrapper today (the read paths in ``query_atlas`` and
``atlas_query_tool`` call ``graph.ro_query`` directly); it is provided as
the read-only counterpart to :func:`atlas_query` for atlas tooling.
Args:
graph: The selected FalkorDB atlas graph handle.
cypher: The read-only Cypher statement to execute.
params: Optional query parameters.
Returns:
Any: The FalkorDB query result object.
Raises:
Exception: Re-raises any error from the underlying ``ro_query`` call
after logging it.
"""
try:
return await graph.ro_query(cypher, params=params)
except Exception as e:
logger.error("Atlas RO query failed: %s\nQuery: %s", e, cypher[:500])
raise
# ---------------------------------------------------------------------------
# MERGE helpers -- idempotent node/edge loaders
# ---------------------------------------------------------------------------
[docs]
async def merge_feature(graph: Any, feature: dict[str, Any]) -> None:
"""Upsert a Feature node into the atlas graph, idempotently.
Runs a ``MERGE`` on ``(:Feature {id})`` and ``SET`` of the descriptive
properties (human name, category, description, plus the list-valued files,
symbols, entrypoints, data stores, and external tools serialised to JSON
strings, and a confidence score), stamping ``updated_at`` every time and
``created_at`` only on first insert. Re-running with the same id refreshes
the node in place rather than duplicating it.
It writes to FalkorDB through :func:`atlas_query`. It is called by
``tools/feature_atlas/load_features_to_falkor.py`` in a loop over the
feature registry.
Args:
graph: The selected FalkorDB atlas graph handle.
feature: Feature mapping; ``id``, ``human_name``, and ``category`` are
required, the list/text/confidence fields are optional.
"""
import json as _json
now = time.time()
cypher = """
MERGE (f:Feature {id: $id})
SET f.human_name = $human_name,
f.category = $category,
f.description = $description,
f.files = $files,
f.symbols = $symbols,
f.entrypoints = $entrypoints,
f.data_stores = $data_stores,
f.external_tools = $external_tools,
f.confidence = $confidence,
f.updated_at = $now
ON CREATE SET f.created_at = $now
"""
params = {
"id": feature["id"],
"human_name": feature["human_name"],
"category": feature["category"],
"description": feature.get("description", ""),
"files": _json.dumps(feature.get("files", [])),
"symbols": _json.dumps(feature.get("symbols", [])),
"entrypoints": _json.dumps(feature.get("entrypoints", [])),
"data_stores": _json.dumps(feature.get("data_stores", [])),
"external_tools": _json.dumps(feature.get("external_tools", [])),
"confidence": float(feature.get("confidence", 0.0)),
"now": now,
}
await atlas_query(graph, cypher, params)
[docs]
async def merge_code_interaction(graph: Any, interaction: dict[str, Any]) -> None:
"""Upsert a CODE_INTERACTS_WITH edge between two existing Feature nodes.
Matches the source and target ``(:Feature)`` nodes by id and merges a
``CODE_INTERACTS_WITH`` relationship keyed on its ``mechanism``, then sets
the confidence, the JSON-serialised file and symbol references, the
free-text evidence, and the direction, stamping ``created_at`` only on
first insert. Because the merge is keyed on mechanism, two features can be
linked by several distinct interaction edges. Both endpoint features must
already exist (typically loaded by :func:`merge_feature` first).
It writes to FalkorDB through :func:`atlas_query`. It is called by
``tools/feature_atlas/detect_code_interactions.py`` for each detected
static interaction.
Args:
graph: The selected FalkorDB atlas graph handle.
interaction: Interaction mapping; ``source_id``, ``target_id``, and
``mechanism`` are required, the rest optional.
"""
import json as _json
now = time.time()
cypher = """
MATCH (src:Feature {id: $source_id})
MATCH (tgt:Feature {id: $target_id})
MERGE (src)-[e:CODE_INTERACTS_WITH {mechanism: $mechanism}]->(tgt)
SET e.confidence = $confidence,
e.file_refs = $file_refs,
e.symbol_refs = $symbol_refs,
e.evidence = $evidence,
e.direction = $direction,
e.updated_at = $now
ON CREATE SET e.created_at = $now
"""
params = {
"source_id": interaction["source_id"],
"target_id": interaction["target_id"],
"mechanism": interaction["mechanism"],
"confidence": float(interaction.get("confidence", 0.0)),
"file_refs": _json.dumps(interaction.get("file_refs", [])),
"symbol_refs": _json.dumps(interaction.get("symbol_refs", [])),
"evidence": interaction.get("evidence", ""),
"direction": interaction.get("direction", ""),
"now": now,
}
await atlas_query(graph, cypher, params)
[docs]
async def merge_interaction_prompt(
graph: Any,
source_id: str,
target_id: str,
prompt: str,
status: str = "pending",
) -> None:
"""Upsert an InteractionPrompt node wired between two Feature nodes.
Matches the source and target ``(:Feature)`` nodes and merges an
``InteractionPrompt`` keyed on the ``(source_id, target_id)`` pair, linking
it as ``(src)-[:HAS_INTERACTION]->(prompt)-[:TARGETS]->(tgt)`` and storing
the generated prompt text and a workflow ``status`` (default ``pending``).
These prompt nodes are the queue an offline LLM pass later consumes to
produce analyses, so re-running updates the prompt without creating a
duplicate.
It writes to FalkorDB through :func:`atlas_query`. It is called by
``tools/feature_atlas/generate_interaction_prompts.py`` for each candidate
feature pair.
Args:
graph: The selected FalkorDB atlas graph handle.
source_id: Id of the source Feature node.
target_id: Id of the target Feature node.
prompt: The generated interaction-analysis prompt text.
status: Workflow status to record (defaults to ``pending``).
"""
now = time.time()
cypher = """
MATCH (src:Feature {id: $source_id})
MATCH (tgt:Feature {id: $target_id})
MERGE (src)-[:HAS_INTERACTION]->(p:InteractionPrompt {
source_id: $source_id,
target_id: $target_id
})-[:TARGETS]->(tgt)
SET p.prompt = $prompt,
p.status = $status,
p.updated_at = $now
ON CREATE SET p.created_at = $now
"""
params = {
"source_id": source_id,
"target_id": target_id,
"prompt": prompt,
"status": status,
"now": now,
}
await atlas_query(graph, cypher, params)
[docs]
async def merge_interaction_analysis(
graph: Any,
analysis: dict[str, Any],
) -> None:
"""Upsert an InteractionAnalysis node and mark its prompt completed.
Matches the ``InteractionPrompt`` for the ``(source_id, target_id)`` pair
and merges a ``(prompt)-[:PRODUCED]->(:InteractionAnalysis)`` node, storing
the LLM-derived findings: the summary, the direct/indirect interaction and
shared-state narratives, memory/NCM/routing/prompt-context effects, the
JSON-serialised failure modes, security risks, recommended tests and
constraints and source refs, and the synergy/risk/weirdness/confidence
scores. After the upsert it issues a second query setting the originating
prompt's ``status`` to ``completed`` so the analysis queue drains.
It writes to FalkorDB via two :func:`atlas_query` calls (the merge plus the
status update). It is called by
``tools/feature_atlas/import_interaction_analysis.py`` for each completed
analysis record.
Args:
graph: The selected FalkorDB atlas graph handle.
analysis: Analysis mapping; ``source_id`` and ``target_id`` are
required, all of the narrative, list, and score fields optional.
"""
import json as _json
now = time.time()
cypher = """
MATCH (p:InteractionPrompt {
source_id: $source_id,
target_id: $target_id
})
MERGE (p)-[:PRODUCED]->(a:InteractionAnalysis {
source_id: $source_id,
target_id: $target_id
})
SET a.summary = $summary,
a.direct_interaction = $direct_interaction,
a.indirect_interaction = $indirect_interaction,
a.shared_state = $shared_state,
a.memory_effects = $memory_effects,
a.ncm_effects = $ncm_effects,
a.routing_effects = $routing_effects,
a.prompt_context_risks = $prompt_context_risks,
a.failure_modes = $failure_modes,
a.security_risks = $security_risks,
a.synergy_score = $synergy_score,
a.risk_score = $risk_score,
a.weirdness_score = $weirdness_score,
a.recommended_tests = $recommended_tests,
a.recommended_constraints = $recommended_constraints,
a.source_refs = $source_refs,
a.confidence = $confidence,
a.updated_at = $now
ON CREATE SET a.created_at = $now
"""
params = {
"source_id": analysis["source_id"],
"target_id": analysis["target_id"],
"summary": analysis.get("summary", ""),
"direct_interaction": analysis.get("direct_interaction", ""),
"indirect_interaction": analysis.get("indirect_interaction", ""),
"shared_state": analysis.get("shared_state", ""),
"memory_effects": analysis.get("memory_effects", ""),
"ncm_effects": analysis.get("ncm_effects", ""),
"routing_effects": analysis.get("routing_effects", ""),
"prompt_context_risks": analysis.get("prompt_context_risks", ""),
"failure_modes": _json.dumps(analysis.get("failure_modes", [])),
"security_risks": _json.dumps(analysis.get("security_risks", [])),
"synergy_score": float(analysis.get("synergy_score", 0.0)),
"risk_score": float(analysis.get("risk_score", 0.0)),
"weirdness_score": float(analysis.get("weirdness_score", 0.0)),
"recommended_tests": _json.dumps(analysis.get("recommended_tests", [])),
"recommended_constraints": _json.dumps(
analysis.get("recommended_constraints", [])
),
"source_refs": _json.dumps(analysis.get("source_refs", [])),
"confidence": float(analysis.get("confidence", 0.0)),
"now": now,
}
await atlas_query(graph, cypher, params)
# Also mark the prompt as completed
await atlas_query(
graph,
"MATCH (p:InteractionPrompt {source_id: $s, target_id: $t}) "
"SET p.status = 'completed'",
{"s": analysis["source_id"], "t": analysis["target_id"]},
)