Source code for tools.feature_atlas.run_interaction_analysis_swarm

"""Step 5: Gemini Flash swarm for interaction analysis.

Loads interaction prompts (already sorted by code evidence count), selects
the top N (default 50), and dispatches parallel Gemini Flash agents to
analyze each interaction. Handles JSON parse retries with backoff and
validates/clamps the returned scores.

Outputs ``outputs/interaction_analyses.jsonl``.

Usage:
    python -m tools.feature_atlas.run_interaction_analysis_swarm

# skull fire infinity -- THE SWARM READS THE BODY
"""

from __future__ import annotations

import asyncio
import json
import logging
import sys
import time
from pathlib import Path
from typing import Any

import yaml

_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(_PROJECT_ROOT))

logger = logging.getLogger(__name__)

_ATLAS_DIR = Path(__file__).resolve().parent
_CONFIG_PATH = _ATLAS_DIR / "config.yaml"
_PROMPTS_PATH = _ATLAS_DIR / "outputs" / "interaction_prompts.jsonl"
_OUTPUT_PATH = _ATLAS_DIR / "outputs" / "interaction_analyses.jsonl"


def _load_config() -> dict[str, Any]:
    """Load and parse the Atlas swarm configuration from ``config.yaml``.

    Reads the module-level ``_CONFIG_PATH`` (``config.yaml`` next to this file)
    as UTF-8 and deserializes it with ``yaml.safe_load``. The resulting dict
    drives the analysis run: the ``swarm`` block supplies concurrency and retry
    settings, while the ``analysis`` block supplies ``top_n_pairs`` and the
    minimum evidence threshold. The only side effect is the synchronous file
    read; no Redis, FalkorDB, LLM, or network access occurs here.

    Called by ``run_analysis`` (when no config is passed in) and by
    ``async_main`` in this module before dispatching the Gemini Flash swarm.

    Returns:
        dict[str, Any]: The parsed YAML configuration mapping. May be ``None``
        if the YAML file is empty, mirroring ``yaml.safe_load`` semantics.

    Raises:
        FileNotFoundError: If ``config.yaml`` is absent at ``_CONFIG_PATH``.
        yaml.YAMLError: If the file contents are not valid YAML.
    """
    with open(_CONFIG_PATH, "r", encoding="utf-8") as f:
        return yaml.safe_load(f)


def _load_prompts() -> list[dict[str, Any]]:
    """Load the per-interaction prompt records from the upstream JSONL artifact.

    Reads ``outputs/interaction_prompts.jsonl`` (the module-level
    ``_PROMPTS_PATH``) line by line, decoding each non-blank line as one JSON
    prompt record. The file is produced by the earlier
    ``generate_interaction_prompts.py`` step and is already sorted by code
    evidence count, so the returned order is the priority order this swarm
    consumes. The only side effect is the synchronous file read; no Redis,
    FalkorDB, LLM, or network access occurs here.

    Called by :func:`run_analysis` at the start of a run to obtain the candidate
    pairs before filtering and selecting the top N.

    Returns:
        list[dict[str, Any]]: The decoded prompt records in file order.

    Raises:
        FileNotFoundError: If the prompts JSONL is absent, signalling that
            ``generate_interaction_prompts.py`` has not been run yet.
    """
    if not _PROMPTS_PATH.exists():
        raise FileNotFoundError(
            f"Prompts not found at {_PROMPTS_PATH}. "
            "Run generate_interaction_prompts.py first."
        )
    prompts = []
    with open(_PROMPTS_PATH, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if line:
                prompts.append(json.loads(line))
    return prompts


async def _analyze_pair(
    prompt_record: dict[str, Any],
    config: dict[str, Any],
    semaphore: asyncio.Semaphore,
) -> dict[str, Any] | None:
    """Analyze one source/target feature pair with a Gemini Flash agent.

    Sends the pair's pre-rendered prompt to Gemini Flash and returns a validated
    analysis record scoring the interaction's synergy, risk, weirdness, and
    confidence. It exists so the swarm can fan many of these out concurrently
    under a shared concurrency cap while keeping each result well-formed: it
    retries on parse or call failures with backoff, normalizes the LLM output,
    and guarantees the score and list fields have sane types.

    Acquires ``semaphore`` to bound concurrency, then calls
    :func:`tools.feature_atlas.extract_features_swarm._gemini_generate`, which
    POSTs to the local OpenAI-compatible proxy (default
    ``http://localhost:3000/openai/chat/completions``) -- no direct API keys, the
    proxy handles auth. Raw output is cleaned with
    :func:`tools.feature_atlas.extract_features_swarm._scrub_llm_json` before
    :func:`json.loads`; the parsed dict is then force-set with the correct
    ``source_id`` and ``target_id``, has its four score fields clamped to
    ``[0, 1]``, and has its list fields coerced to lists. On a JSON decode error
    it sleeps one second between attempts; on any other error it backs off
    ``2 ** attempt`` seconds. Logs progress and per-attempt failures via the
    module ``logger``; touches no Redis, FalkorDB, KG, or event bus.

    Called by :func:`run_analysis`, which builds one coroutine per selected pair
    and awaits them together with :func:`asyncio.gather`.

    Args:
        prompt_record: One prompt record from ``_load_prompts`` supplying
            ``source_id``, ``target_id``, and the rendered ``prompt`` text.
        config: The run configuration; its ``swarm`` block supplies
            ``retry_attempts`` and the proxy/model settings used downstream.
        semaphore: The shared concurrency limiter held for the duration of the
            analysis.

    Returns:
        dict[str, Any] | None: The validated analysis record on success, or
        ``None`` when all retry attempts fail.
    """
    from tools.feature_atlas.extract_features_swarm import _gemini_generate

    source_id = prompt_record["source_id"]
    target_id = prompt_record["target_id"]
    prompt = prompt_record["prompt"]

    async with semaphore:
        swarm_cfg = config.get("swarm", {})
        max_retries = swarm_cfg.get("retry_attempts", 3)

        for attempt in range(max_retries):
            try:
                raw = await _gemini_generate(
                    prompt,
                    config,
                    system_instruction=(
                        "You are a precise code analysis agent. "
                        "You output only valid JSON objects. "
                        "You analyze actual code interactions, not theoretical ones. "
                        "You cite real file names and function names."
                    ),
                )

                # Parse JSON -- scrub LLM wrapper garbage first
                from tools.feature_atlas.extract_features_swarm import _scrub_llm_json
                raw = _scrub_llm_json(raw)

                analysis = json.loads(raw)

                # Validate required fields
                if not isinstance(analysis, dict):
                    raise ValueError("Response is not a JSON object")
                if "summary" not in analysis:
                    raise ValueError("Missing 'summary' field")

                # Ensure IDs are correct
                analysis["source_id"] = source_id
                analysis["target_id"] = target_id

                # Clamp scores to [0, 1]
                for score_key in ("synergy_score", "risk_score", "weirdness_score", "confidence"):
                    val = analysis.get(score_key, 0.0)
                    if isinstance(val, (int, float)):
                        analysis[score_key] = max(0.0, min(1.0, float(val)))
                    else:
                        analysis[score_key] = 0.0

                # Ensure list fields are lists
                for list_key in (
                    "failure_modes",
                    "security_risks",
                    "recommended_tests",
                    "recommended_constraints",
                    "source_refs",
                ):
                    if not isinstance(analysis.get(list_key), list):
                        analysis[list_key] = []

                logger.info(
                    "Analyzed %s -> %s (risk=%.2f, synergy=%.2f, weird=%.2f)",
                    source_id,
                    target_id,
                    analysis.get("risk_score", 0),
                    analysis.get("synergy_score", 0),
                    analysis.get("weirdness_score", 0),
                )
                return analysis

            except json.JSONDecodeError as e:
                logger.warning(
                    "JSON parse error for %s -> %s (attempt %d/%d): %s",
                    source_id,
                    target_id,
                    attempt + 1,
                    max_retries,
                    e,
                )
                if attempt < max_retries - 1:
                    await asyncio.sleep(1)
            except Exception as e:
                logger.warning(
                    "Analysis error for %s -> %s (attempt %d/%d): %s",
                    source_id,
                    target_id,
                    attempt + 1,
                    max_retries,
                    e,
                )
                if attempt < max_retries - 1:
                    await asyncio.sleep(2 ** attempt)

        logger.error(
            "All attempts failed for %s -> %s", source_id, target_id
        )
        return None


[docs] async def run_analysis(config: dict[str, Any] | None = None) -> list[dict[str, Any]]: """Select the top feature pairs and run the Gemini Flash analysis swarm. Orchestrates the whole step: it loads the prompt records, picks which pairs to analyze, fans out one Gemini agent per pair under a concurrency cap, and collects the successful results. Pair selection prefers candidates whose code evidence count meets the configured ``min_evidence_threshold``; if fewer than ``top_n_pairs`` qualify, it pads up to ``top_n_pairs`` with the highest-evidence remaining pairs so the swarm always runs at full requested width when prompts are available. Loads the config via :func:`_load_config` when none is passed, reads prompts via :func:`_load_prompts`, builds an :class:`asyncio.Semaphore` from the ``swarm.max_concurrency`` setting, and dispatches :func:`_analyze_pair` for each selected pair with :func:`asyncio.gather` (``return_exceptions=True``). Each pair's LLM call goes through the local proxy used by :func:`tools.feature_atlas.extract_features_swarm._gemini_generate`; dict results are kept and exceptions are logged and counted as failures via the module ``logger``. No Redis, FalkorDB, KG, event bus, or output-file writes happen here -- persistence is left to :func:`async_main`. Called by :func:`async_main` as the core of the command-line run. Args: config: The run configuration; loaded from ``config.yaml`` via :func:`_load_config` when ``None``. Its ``analysis`` block supplies ``top_n_pairs`` and ``min_evidence_threshold`` and its ``swarm`` block supplies ``max_concurrency``. Returns: list[dict[str, Any]]: The successfully completed analysis records (failed pairs are dropped, not represented). """ if config is None: config = _load_config() prompts = _load_prompts() analysis_cfg = config.get("analysis", {}) top_n = analysis_cfg.get("top_n_pairs", 50) min_evidence = analysis_cfg.get("min_evidence_threshold", 1) # Filter and select top N pairs by evidence candidates = [p for p in prompts if p.get("evidence_count", 0) >= min_evidence] if len(candidates) < top_n: # If not enough with evidence, pad with highest-evidence-count pairs remaining = [p for p in prompts if p not in candidates] candidates.extend(remaining[: top_n - len(candidates)]) selected = candidates[:top_n] logger.info( "Selected %d pairs for analysis (%d had evidence >= %d)", len(selected), sum(1 for p in selected if p.get("evidence_count", 0) >= min_evidence), min_evidence, ) swarm_cfg = config.get("swarm", {}) max_concurrency = swarm_cfg.get("max_concurrency", 10) semaphore = asyncio.Semaphore(max_concurrency) # Run analysis swarm tasks = [ _analyze_pair(prompt_record, config, semaphore) for prompt_record in selected ] results = await asyncio.gather(*tasks, return_exceptions=True) analyses: list[dict[str, Any]] = [] failures = 0 for result in results: if isinstance(result, dict): analyses.append(result) elif isinstance(result, Exception): logger.error("Analysis task failed: %s", result) failures += 1 else: failures += 1 return analyses
[docs] async def async_main() -> None: """Run the analysis swarm end to end, persist results, and print a summary. The asynchronous entry point for this step: it loads the configuration, runs the swarm, writes the completed analyses to disk, and prints a human-readable summary (counts, average and maximum risk and synergy scores, and the top three risk and synergy pairs) plus the elapsed time. This is what turns the in-memory analysis list into the durable ``outputs/interaction_analyses.jsonl`` artifact that later Atlas steps import. Loads config via :func:`_load_config`, delegates the actual analysis to :func:`run_analysis` (which drives the Gemini Flash agents through the local proxy), then creates the output directory and writes each record as one JSON line to the module-level ``_OUTPUT_PATH``. The only side effects are that filesystem write and stdout printing; no Redis, FalkorDB, KG, or event bus is touched here. Called by :func:`main` via :func:`asyncio.run`; ``main`` in turn runs under this module's ``__main__`` guard. Returns: None """ t0 = time.time() config = _load_config() analyses = await run_analysis(config) # Write JSONL output _OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) with open(_OUTPUT_PATH, "w", encoding="utf-8") as f: for a in analyses: f.write(json.dumps(a, ensure_ascii=False) + "\n") elapsed = time.time() - t0 # Summary if analyses: avg_risk = sum(a.get("risk_score", 0) for a in analyses) / len(analyses) avg_synergy = sum(a.get("synergy_score", 0) for a in analyses) / len(analyses) max_risk = max(a.get("risk_score", 0) for a in analyses) max_synergy = max(a.get("synergy_score", 0) for a in analyses) top_risk = sorted(analyses, key=lambda a: -a.get("risk_score", 0))[:3] top_synergy = sorted(analyses, key=lambda a: -a.get("synergy_score", 0))[:3] else: avg_risk = avg_synergy = max_risk = max_synergy = 0.0 top_risk = top_synergy = [] print(f"\n{'=' * 60}") print(f" INTERACTION ANALYSIS SWARM COMPLETE") print(f"{'=' * 60}") print(f" Analyses completed: {len(analyses)}") print(f" Avg risk score: {avg_risk:.3f}") print(f" Max risk score: {max_risk:.3f}") print(f" Avg synergy score: {avg_synergy:.3f}") print(f" Max synergy score: {max_synergy:.3f}") print(f"") print(f" Top 3 risks:") for a in top_risk: print(f" {a['source_id']} -> {a['target_id']}: {a.get('risk_score', 0):.3f}") print(f"") print(f" Top 3 synergies:") for a in top_synergy: print(f" {a['source_id']} -> {a['target_id']}: {a.get('synergy_score', 0):.3f}") print(f"") print(f" Time elapsed: {elapsed:.1f}s") print(f" Output: {_OUTPUT_PATH}") print(f"{'=' * 60}\n")
[docs] def main() -> None: """Configure logging and run the analysis swarm from a synchronous context. The synchronous command-line entry point invoked when this module is run as ``python -m tools.feature_atlas.run_interaction_analysis_swarm``. It installs a basic INFO-level logging configuration (timestamped, level-tagged lines) so the swarm's progress and warnings are visible, then drives the async pipeline by handing :func:`async_main` to :func:`asyncio.run`. Side effects are limited to the global logging configuration and whatever :func:`async_main` does (the JSONL write and stdout summary); no Redis, FalkorDB, KG, or event bus is touched here. Called only from this module's ``if __name__ == "__main__":`` guard. Returns: None """ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) asyncio.run(async_main())
if __name__ == "__main__": main()