"""Step 5: Gemini Flash swarm for interaction analysis.
Loads interaction prompts (already sorted by code evidence count), selects
the top N (default 50), and dispatches parallel Gemini Flash agents to
analyze each interaction. Handles JSON parse retries with backoff and
validates/clamps the returned scores.
Outputs ``outputs/interaction_analyses.jsonl``.
Usage:
python -m tools.feature_atlas.run_interaction_analysis_swarm
# skull fire infinity -- THE SWARM READS THE BODY
"""
from __future__ import annotations
import asyncio
import json
import logging
import sys
import time
from pathlib import Path
from typing import Any
import yaml
_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
logger = logging.getLogger(__name__)
_ATLAS_DIR = Path(__file__).resolve().parent
_CONFIG_PATH = _ATLAS_DIR / "config.yaml"
_PROMPTS_PATH = _ATLAS_DIR / "outputs" / "interaction_prompts.jsonl"
_OUTPUT_PATH = _ATLAS_DIR / "outputs" / "interaction_analyses.jsonl"
def _load_config() -> dict[str, Any]:
"""Load and parse the Atlas swarm configuration from ``config.yaml``.
Reads the module-level ``_CONFIG_PATH`` (``config.yaml`` next to this file)
as UTF-8 and deserializes it with ``yaml.safe_load``. The resulting dict
drives the analysis run: the ``swarm`` block supplies concurrency and retry
settings, while the ``analysis`` block supplies ``top_n_pairs`` and the
minimum evidence threshold. The only side effect is the synchronous file
read; no Redis, FalkorDB, LLM, or network access occurs here.
Called by ``run_analysis`` (when no config is passed in) and by
``async_main`` in this module before dispatching the Gemini Flash swarm.
Returns:
dict[str, Any]: The parsed YAML configuration mapping. May be ``None``
if the YAML file is empty, mirroring ``yaml.safe_load`` semantics.
Raises:
FileNotFoundError: If ``config.yaml`` is absent at ``_CONFIG_PATH``.
yaml.YAMLError: If the file contents are not valid YAML.
"""
with open(_CONFIG_PATH, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def _load_prompts() -> list[dict[str, Any]]:
"""Load the per-interaction prompt records from the upstream JSONL artifact.
Reads ``outputs/interaction_prompts.jsonl`` (the module-level
``_PROMPTS_PATH``) line by line, decoding each non-blank line as one JSON
prompt record. The file is produced by the earlier
``generate_interaction_prompts.py`` step and is already sorted by code
evidence count, so the returned order is the priority order this swarm
consumes. The only side effect is the synchronous file read; no Redis,
FalkorDB, LLM, or network access occurs here.
Called by :func:`run_analysis` at the start of a run to obtain the candidate
pairs before filtering and selecting the top N.
Returns:
list[dict[str, Any]]: The decoded prompt records in file order.
Raises:
FileNotFoundError: If the prompts JSONL is absent, signalling that
``generate_interaction_prompts.py`` has not been run yet.
"""
if not _PROMPTS_PATH.exists():
raise FileNotFoundError(
f"Prompts not found at {_PROMPTS_PATH}. "
"Run generate_interaction_prompts.py first."
)
prompts = []
with open(_PROMPTS_PATH, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
prompts.append(json.loads(line))
return prompts
async def _analyze_pair(
prompt_record: dict[str, Any],
config: dict[str, Any],
semaphore: asyncio.Semaphore,
) -> dict[str, Any] | None:
"""Analyze one source/target feature pair with a Gemini Flash agent.
Sends the pair's pre-rendered prompt to Gemini Flash and returns a validated
analysis record scoring the interaction's synergy, risk, weirdness, and
confidence. It exists so the swarm can fan many of these out concurrently
under a shared concurrency cap while keeping each result well-formed: it
retries on parse or call failures with backoff, normalizes the LLM output,
and guarantees the score and list fields have sane types.
Acquires ``semaphore`` to bound concurrency, then calls
:func:`tools.feature_atlas.extract_features_swarm._gemini_generate`, which
POSTs to the local OpenAI-compatible proxy (default
``http://localhost:3000/openai/chat/completions``) -- no direct API keys, the
proxy handles auth. Raw output is cleaned with
:func:`tools.feature_atlas.extract_features_swarm._scrub_llm_json` before
:func:`json.loads`; the parsed dict is then force-set with the correct
``source_id`` and ``target_id``, has its four score fields clamped to
``[0, 1]``, and has its list fields coerced to lists. On a JSON decode error
it sleeps one second between attempts; on any other error it backs off
``2 ** attempt`` seconds. Logs progress and per-attempt failures via the
module ``logger``; touches no Redis, FalkorDB, KG, or event bus.
Called by :func:`run_analysis`, which builds one coroutine per selected pair
and awaits them together with :func:`asyncio.gather`.
Args:
prompt_record: One prompt record from ``_load_prompts`` supplying
``source_id``, ``target_id``, and the rendered ``prompt`` text.
config: The run configuration; its ``swarm`` block supplies
``retry_attempts`` and the proxy/model settings used downstream.
semaphore: The shared concurrency limiter held for the duration of the
analysis.
Returns:
dict[str, Any] | None: The validated analysis record on success, or
``None`` when all retry attempts fail.
"""
from tools.feature_atlas.extract_features_swarm import _gemini_generate
source_id = prompt_record["source_id"]
target_id = prompt_record["target_id"]
prompt = prompt_record["prompt"]
async with semaphore:
swarm_cfg = config.get("swarm", {})
max_retries = swarm_cfg.get("retry_attempts", 3)
for attempt in range(max_retries):
try:
raw = await _gemini_generate(
prompt,
config,
system_instruction=(
"You are a precise code analysis agent. "
"You output only valid JSON objects. "
"You analyze actual code interactions, not theoretical ones. "
"You cite real file names and function names."
),
)
# Parse JSON -- scrub LLM wrapper garbage first
from tools.feature_atlas.extract_features_swarm import _scrub_llm_json
raw = _scrub_llm_json(raw)
analysis = json.loads(raw)
# Validate required fields
if not isinstance(analysis, dict):
raise ValueError("Response is not a JSON object")
if "summary" not in analysis:
raise ValueError("Missing 'summary' field")
# Ensure IDs are correct
analysis["source_id"] = source_id
analysis["target_id"] = target_id
# Clamp scores to [0, 1]
for score_key in ("synergy_score", "risk_score", "weirdness_score", "confidence"):
val = analysis.get(score_key, 0.0)
if isinstance(val, (int, float)):
analysis[score_key] = max(0.0, min(1.0, float(val)))
else:
analysis[score_key] = 0.0
# Ensure list fields are lists
for list_key in (
"failure_modes",
"security_risks",
"recommended_tests",
"recommended_constraints",
"source_refs",
):
if not isinstance(analysis.get(list_key), list):
analysis[list_key] = []
logger.info(
"Analyzed %s -> %s (risk=%.2f, synergy=%.2f, weird=%.2f)",
source_id,
target_id,
analysis.get("risk_score", 0),
analysis.get("synergy_score", 0),
analysis.get("weirdness_score", 0),
)
return analysis
except json.JSONDecodeError as e:
logger.warning(
"JSON parse error for %s -> %s (attempt %d/%d): %s",
source_id,
target_id,
attempt + 1,
max_retries,
e,
)
if attempt < max_retries - 1:
await asyncio.sleep(1)
except Exception as e:
logger.warning(
"Analysis error for %s -> %s (attempt %d/%d): %s",
source_id,
target_id,
attempt + 1,
max_retries,
e,
)
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
logger.error(
"All attempts failed for %s -> %s", source_id, target_id
)
return None
[docs]
async def run_analysis(config: dict[str, Any] | None = None) -> list[dict[str, Any]]:
"""Select the top feature pairs and run the Gemini Flash analysis swarm.
Orchestrates the whole step: it loads the prompt records, picks which pairs to
analyze, fans out one Gemini agent per pair under a concurrency cap, and
collects the successful results. Pair selection prefers candidates whose code
evidence count meets the configured ``min_evidence_threshold``; if fewer than
``top_n_pairs`` qualify, it pads up to ``top_n_pairs`` with the
highest-evidence remaining pairs so the swarm always runs at full requested
width when prompts are available.
Loads the config via :func:`_load_config` when none is passed, reads prompts
via :func:`_load_prompts`, builds an :class:`asyncio.Semaphore` from the
``swarm.max_concurrency`` setting, and dispatches :func:`_analyze_pair` for
each selected pair with :func:`asyncio.gather` (``return_exceptions=True``).
Each pair's LLM call goes through the local proxy used by
:func:`tools.feature_atlas.extract_features_swarm._gemini_generate`; dict
results are kept and exceptions are logged and counted as failures via the
module ``logger``. No Redis, FalkorDB, KG, event bus, or output-file writes
happen here -- persistence is left to :func:`async_main`.
Called by :func:`async_main` as the core of the command-line run.
Args:
config: The run configuration; loaded from ``config.yaml`` via
:func:`_load_config` when ``None``. Its ``analysis`` block supplies
``top_n_pairs`` and ``min_evidence_threshold`` and its ``swarm`` block
supplies ``max_concurrency``.
Returns:
list[dict[str, Any]]: The successfully completed analysis records (failed
pairs are dropped, not represented).
"""
if config is None:
config = _load_config()
prompts = _load_prompts()
analysis_cfg = config.get("analysis", {})
top_n = analysis_cfg.get("top_n_pairs", 50)
min_evidence = analysis_cfg.get("min_evidence_threshold", 1)
# Filter and select top N pairs by evidence
candidates = [p for p in prompts if p.get("evidence_count", 0) >= min_evidence]
if len(candidates) < top_n:
# If not enough with evidence, pad with highest-evidence-count pairs
remaining = [p for p in prompts if p not in candidates]
candidates.extend(remaining[: top_n - len(candidates)])
selected = candidates[:top_n]
logger.info(
"Selected %d pairs for analysis (%d had evidence >= %d)",
len(selected),
sum(1 for p in selected if p.get("evidence_count", 0) >= min_evidence),
min_evidence,
)
swarm_cfg = config.get("swarm", {})
max_concurrency = swarm_cfg.get("max_concurrency", 10)
semaphore = asyncio.Semaphore(max_concurrency)
# Run analysis swarm
tasks = [
_analyze_pair(prompt_record, config, semaphore)
for prompt_record in selected
]
results = await asyncio.gather(*tasks, return_exceptions=True)
analyses: list[dict[str, Any]] = []
failures = 0
for result in results:
if isinstance(result, dict):
analyses.append(result)
elif isinstance(result, Exception):
logger.error("Analysis task failed: %s", result)
failures += 1
else:
failures += 1
return analyses
[docs]
async def async_main() -> None:
"""Run the analysis swarm end to end, persist results, and print a summary.
The asynchronous entry point for this step: it loads the configuration, runs
the swarm, writes the completed analyses to disk, and prints a human-readable
summary (counts, average and maximum risk and synergy scores, and the top
three risk and synergy pairs) plus the elapsed time. This is what turns the
in-memory analysis list into the durable
``outputs/interaction_analyses.jsonl`` artifact that later Atlas steps import.
Loads config via :func:`_load_config`, delegates the actual analysis to
:func:`run_analysis` (which drives the Gemini Flash agents through the local
proxy), then creates the output directory and writes each record as one JSON
line to the module-level ``_OUTPUT_PATH``. The only side effects are that
filesystem write and stdout printing; no Redis, FalkorDB, KG, or event bus is
touched here.
Called by :func:`main` via :func:`asyncio.run`; ``main`` in turn runs under
this module's ``__main__`` guard.
Returns:
None
"""
t0 = time.time()
config = _load_config()
analyses = await run_analysis(config)
# Write JSONL output
_OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(_OUTPUT_PATH, "w", encoding="utf-8") as f:
for a in analyses:
f.write(json.dumps(a, ensure_ascii=False) + "\n")
elapsed = time.time() - t0
# Summary
if analyses:
avg_risk = sum(a.get("risk_score", 0) for a in analyses) / len(analyses)
avg_synergy = sum(a.get("synergy_score", 0) for a in analyses) / len(analyses)
max_risk = max(a.get("risk_score", 0) for a in analyses)
max_synergy = max(a.get("synergy_score", 0) for a in analyses)
top_risk = sorted(analyses, key=lambda a: -a.get("risk_score", 0))[:3]
top_synergy = sorted(analyses, key=lambda a: -a.get("synergy_score", 0))[:3]
else:
avg_risk = avg_synergy = max_risk = max_synergy = 0.0
top_risk = top_synergy = []
print(f"\n{'=' * 60}")
print(f" INTERACTION ANALYSIS SWARM COMPLETE")
print(f"{'=' * 60}")
print(f" Analyses completed: {len(analyses)}")
print(f" Avg risk score: {avg_risk:.3f}")
print(f" Max risk score: {max_risk:.3f}")
print(f" Avg synergy score: {avg_synergy:.3f}")
print(f" Max synergy score: {max_synergy:.3f}")
print(f"")
print(f" Top 3 risks:")
for a in top_risk:
print(f" {a['source_id']} -> {a['target_id']}: {a.get('risk_score', 0):.3f}")
print(f"")
print(f" Top 3 synergies:")
for a in top_synergy:
print(f" {a['source_id']} -> {a['target_id']}: {a.get('synergy_score', 0):.3f}")
print(f"")
print(f" Time elapsed: {elapsed:.1f}s")
print(f" Output: {_OUTPUT_PATH}")
print(f"{'=' * 60}\n")
[docs]
def main() -> None:
"""Configure logging and run the analysis swarm from a synchronous context.
The synchronous command-line entry point invoked when this module is run as
``python -m tools.feature_atlas.run_interaction_analysis_swarm``. It installs
a basic INFO-level logging configuration (timestamped, level-tagged lines) so
the swarm's progress and warnings are visible, then drives the async pipeline
by handing :func:`async_main` to :func:`asyncio.run`.
Side effects are limited to the global logging configuration and whatever
:func:`async_main` does (the JSONL write and stdout summary); no Redis,
FalkorDB, KG, or event bus is touched here.
Called only from this module's ``if __name__ == "__main__":`` guard.
Returns:
None
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
asyncio.run(async_main())
if __name__ == "__main__":
main()