Source code for arche_audit

#!/usr/bin/env python3
"""Audit Arche DEF CON materials via Gemini: compare raw sources to talk-tier rubrics.

Locates four markdown files (known layout paths + ``find`` fallback), sends their
full text to a capable Gemini model, and writes ``arche_audit_result.md``.
"""

from __future__ import annotations

import argparse
import asyncio
import os
import subprocess
import sys
from pathlib import Path

from google import genai
from google.genai import types

REPO_ROOT = Path(__file__).resolve().parent

ARCHIVE_FILES: tuple[str, ...] = (
    "arche-transport.md",
    "arche-framework-v8.md",
    "design-decomposition.md",
    "post-talk-content.md",
)

# Typical locations (repo-relative and sibling checkout); also see ``find`` fallback.
_RELATIVE_SEARCH_DIRS: tuple[str, ...] = (
    "data/arche",
    "arche",
    "output",
    "notes",
    "defcon",
)

# Prefer Gemini 1.5 Pro when available; fall back to newer Pro-class IDs the SDK accepts.
_MODEL_CANDIDATES: tuple[str, ...] = (
    "gemini-1.5-pro",
    "gemini-1.5-pro-latest",
    "gemini-2.5-pro",
    "gemini-2.5-pro-preview-05-06",
    "gemini-3.1-pro-preview",
)

_OUTPUT_NAME = "arche_audit_result.md"
_HTTP_TIMEOUT_MS = 600_000
_MAX_OUTPUT_TOKENS = 65_536


def _resolve_api_key() -> str:
    """Resolve a usable Gemini API key from the environment or the shared key pool.

    Checks a fixed list of environment variables in priority order and, if none
    is set, falls back to the repo's shared Gemini key pool so the script works
    even when no provider-specific env var is configured.

    Interactions: reads ``os.environ`` for ``GEMINI_API_KEY``, ``GOOGLE_API_KEY``,
    ``GOOGLE_AI_API_KEY`` and ``GENAI_API_KEY`` (first non-empty wins); on miss it
    lazily imports ``gemini_embed_pool.next_gemini_flash_key`` and returns a
    round-robin key from that shared pool. Any import/lookup failure is swallowed
    and treated as "no key".

    Called by :func:`_async_main` immediately before :func:`_generate`, on the
    non-dry-run path.

    Returns:
        str: A non-empty Gemini API key.

    Raises:
        SystemExit: If no key is found in the environment and the key pool is
            unavailable or yields nothing.
    """
    for env_name in (
        "GEMINI_API_KEY",
        "GOOGLE_API_KEY",
        "GOOGLE_AI_API_KEY",
        "GENAI_API_KEY",
    ):
        v = (os.environ.get(env_name) or "").strip()
        if v:
            return v
    try:
        from gemini_embed_pool import next_gemini_flash_key

        return next_gemini_flash_key()
    except Exception:
        pass
    raise SystemExit(
        "No Gemini API key found. Set GEMINI_API_KEY or GOOGLE_API_KEY "
        "(or install/use gemini_embed_pool keys).",
    )


def _find_via_find(filename: str, roots: list[Path]) -> Path | None:
    """Locate a file by exact basename by shelling out to ``find(1)`` under each root.

    Serves as the last-resort fallback when the known relative search directories
    do not contain a required Arche markdown file, returning the first match found.

    Interactions: runs the external ``find`` command via
    ``subprocess.run([... "-type", "f", "-name", filename])`` with a 120s timeout
    per root, skipping roots that are not directories; ``OSError`` and
    ``subprocess.TimeoutExpired`` are caught and cause the root to be skipped. Only
    the first line of ``find``\\ 's stdout is considered, and it is returned only if it
    still resolves to a real file.

    Called by :func:`locate_arche_files` after its relative-directory lookups miss.

    Args:
        filename: Exact basename to match (passed to ``find -name``).
        roots: Directories to search recursively, tried in order.

    Returns:
        Path | None: Path to the first matching file, or ``None`` if no root yields
        an existing file.
    """
    for root in roots:
        if not root.is_dir():
            continue
        try:
            proc = subprocess.run(
                [
                    "find",
                    str(root),
                    "-type",
                    "f",
                    "-name",
                    filename,
                ],
                capture_output=True,
                text=True,
                timeout=120,
                check=False,
            )
        except (OSError, subprocess.TimeoutExpired):
            continue
        if proc.returncode != 0 or not (proc.stdout or "").strip():
            continue
        first = proc.stdout.strip().split("\n", 1)[0].strip()
        if first:
            p = Path(first)
            if p.is_file():
                return p
    return None


[docs] def locate_arche_files( repo_root: Path, arche_root: Path | None = None, ) -> dict[str, Path]: """Locate each required Arche source file, returning basename -> path. Resolves the set of mandatory Arche files (``ARCHIVE_FILES``) by trying, in order: an explicit ``arche_root`` when given, then each entry of ``_RELATIVE_SEARCH_DIRS`` under ``repo_root``, and finally a filesystem ``find`` (:func:`_find_via_find`) rooted at ``repo_root`` and its parent. The first hit for each name wins; names that cannot be found are omitted. Called by this module's ``main`` CLI entry point (arche_audit.py:401) to discover the files it audits. Args: repo_root: Repository root to search beneath (and whose parent the ``find`` fallback also scans). arche_root: Optional explicit directory checked first for each file. Returns: dict[str, Path]: Mapping of each located file's basename to its resolved absolute path; missing files are absent from the mapping. """ found: dict[str, Path] = {} roots: list[Path] = [repo_root, repo_root.parent] if arche_root is not None: base = arche_root.resolve() for name in ARCHIVE_FILES: p = base / name if p.is_file(): found[name] = p for name in ARCHIVE_FILES: if name in found: continue for rel in _RELATIVE_SEARCH_DIRS: candidate = (repo_root / rel / name).resolve() if candidate.is_file(): found[name] = candidate break if name in found: continue located = _find_via_find(name, roots) if located is not None: found[name] = located.resolve() return found
def _read_text(path: Path) -> str: """Read a file's full text as UTF-8, replacing any undecodable bytes. Uses ``errors="replace"`` so a source file with stray non-UTF-8 bytes never aborts the audit run. Interactions: calls ``Path.read_text`` on the given path. Called by :func:`_async_main` to load each located Arche markdown file before prompt assembly. Args: path: Path to the file to read. Returns: str: The decoded file contents. """ return path.read_text(encoding="utf-8", errors="replace") def _build_user_prompt(paths: dict[str, Path], contents: dict[str, str]) -> str: """Assemble the full Gemini user prompt: audit instructions plus raw source files. Builds a single string containing the DEF CON tier rubric, the required output structure, and then the complete text of each of the four Arche markdown files (each preceded by its basename and on-disk path) so the model can ground its audit in the exact primary sources. Interactions: iterates the module-level ``ARCHIVE_FILES`` ordering and joins the static instruction blocks with per-file sections drawn from ``paths`` and ``contents``; performs no I/O or network calls. Called by :func:`_async_main`, whose result is passed to :func:`_generate` (or, on ``--dry-run``, only sized and reported). Args: paths: Mapping of file basename to its resolved on-disk ``Path`` (used for display in each file header). contents: Mapping of file basename to its full decoded text. Returns: str: The complete prompt text to send to Gemini. """ blocks: list[str] = [ "You are auditing raw primary-source markdown for a DEF CON–style technical talk " "proposal (project Arche). The user pasted the FULL source files below — you must " "ground every claim in those exact texts. Do not invent structure or claims that " "are not evidenced by the files. Quote or paraphrase tightly, and cite which file " "each observation comes from.", "", "## Rubric: three DEF CON talk tiers", "", "Define and use these tiers consistently (you may refine criteria, but keep the " "three levels distinct):", "", "### Mid-Tier", "Solid professional talk: clear problem statement, competent technical depth, " "reasonable demos or evidence, good pacing, audience-appropriate background, " "credible speaker stance, actionable takeaways. Not necessarily novel globally.", "", "### Legendary", "Memorable, unusually rigorous or creative: novel angle, exceptional clarity, " "strong narrative, impressive live demonstration or reproducible artifacts, " "meaningful impact on practitioner workflows, quotable insights, handles " "adversarial Q&A implications in the material.", "", "### Field-Changing", "Redefines expectations: introduces a new primitive, invalidates an assumption, " "or opens a new research/engineering frontier; broad influence beyond a niche; " "others will cite or rebuild on it; ethical/safety stance is credible where relevant.", "", "## Required output structure", "", "1. **Executive verdict** — single paragraph: which tier the *combined* Arche " "package most honestly fits today, and why (honest about gaps).", "2. **Per-file audit** — for EACH of the four files: outline its actual structure " "(headings/sections as written), technical claims as stated (not inferred), strengths, " "weaknesses, and tier signals (Mid / Legendary / Field-Changing) with evidence.", "3. **Cross-file coherence** — contradictions, duplication, missing links between " "transport/framework/design/post-talk narrative.", "4. **DEF CON readiness** — what must change to credibly aim one tier higher " "(concrete, prioritized).", "5. **Risk register** — credibility, operational security, demo feasibility, " "overclaiming, audience assumptions.", "", "Be exhaustive and adversarial. Length is not a constraint within your output limit.", "", "---", "", "## RAW SOURCE FILES (complete text follows)", "", ] for name in ARCHIVE_FILES: path = paths[name] body = contents[name] blocks.append(f"### FILE: `{name}`") blocks.append(f"Path on disk: `{path}`") blocks.append("") blocks.append(body) blocks.append("") blocks.append("---") blocks.append("") return "\n".join(blocks) async def _generate( api_key: str, user_prompt: str, model_override: str | None, ) -> tuple[str, str]: """Call Gemini to produce the audit, trying model candidates until one succeeds. Constructs a ``genai.Client`` with a long HTTP timeout and issues an async ``generate_content`` request for the given prompt. If ``model_override`` is set only that model is attempted; otherwise the module-level ``_MODEL_CANDIDATES`` are tried in order until one returns non-empty text, providing resilience when a given Pro-class model id is not accepted by the account/SDK. Interactions: performs network I/O against the Gemini API via ``client.aio.models.generate_content`` (using ``_MAX_OUTPUT_TOKENS`` and ``temperature=0.35``); an empty response is treated as a failure and the next candidate is tried. The client is always closed via ``client.aio.aclose()`` in a ``finally`` block (close errors are suppressed). Called by :func:`_async_main` on the non-dry-run path with the prompt from :func:`_build_user_prompt` and the key from :func:`_resolve_api_key`. Args: api_key: Gemini API key used to authenticate the client. user_prompt: Full prompt text to send as the request content. model_override: Specific model id to force; if ``None``, the candidate list is tried in order. Returns: tuple[str, str]: ``(response_text, model_id)`` for the first model that returned non-empty text. Raises: RuntimeError: If every attempted model fails or returns empty text; the last underlying error is chained. """ client = genai.Client( api_key=api_key, http_options=types.HttpOptions(timeout=_HTTP_TIMEOUT_MS), ) models = (model_override,) if model_override else _MODEL_CANDIDATES last_err: BaseException | None = None try: for model_id in models: try: resp = await client.aio.models.generate_content( model=model_id, contents=types.Content( parts=[types.Part(text=user_prompt)], ), config=types.GenerateContentConfig( max_output_tokens=_MAX_OUTPUT_TOKENS, temperature=0.35, ), ) text = (getattr(resp, "text", None) or "").strip() if not text: raise RuntimeError("Empty response text from Gemini") return text, model_id except BaseException as e: last_err = e continue raise RuntimeError( f"All model candidates failed. Last error: {last_err!r}", ) from last_err finally: try: await client.aio.aclose() except Exception: pass async def _async_main(argv: list[str]) -> int: """Drive the end-to-end audit: parse args, locate files, call Gemini, write output. Parses ``--model``, ``--dry-run`` and ``--arche-root``; locates the four required Arche markdown files; and either reports the resolved paths and prompt size (dry run) or runs the audit and persists the result. This is the async body of the script's CLI entry point. Interactions: builds an ``argparse`` parser, calls :func:`locate_arche_files` and writes missing-file diagnostics to ``sys.stderr`` when any are absent; reads each file with :func:`_read_text` and assembles the prompt via :func:`_build_user_prompt`. On a real run it obtains a key from :func:`_resolve_api_key`, calls :func:`_generate`, and writes the markdown result (with a header naming the model and source paths) to ``REPO_ROOT/_OUTPUT_NAME`` (``arche_audit_result.md``), printing a summary and the full result to stdout. Called by :func:`main` via ``asyncio.run``. Args: argv: Command-line arguments excluding the program name (typically ``sys.argv[1:]``). Returns: int: Process exit code -- ``0`` on success or dry run, ``1`` if one or more required files could not be located. """ parser = argparse.ArgumentParser(description="Arche DEF CON tier audit via Gemini.") parser.add_argument( "--model", help="Override Gemini model id (otherwise tries Pro-class candidates).", ) parser.add_argument( "--dry-run", action="store_true", help="Resolve file paths only; do not call the API.", ) parser.add_argument( "--arche-root", type=Path, metavar="DIR", help=( "Directory that contains all four markdown files (optional; otherwise " "searches data/arche, output/, notes/, defcon/, and find(1))." ), ) args = parser.parse_args(argv) located = locate_arche_files(REPO_ROOT, args.arche_root) missing = [n for n in ARCHIVE_FILES if n not in located] if missing: sys.stderr.write( "Could not find required files:\n " + "\n ".join(missing) + "\n\nSearched under:\n " + "\n ".join(str(REPO_ROOT / d) for d in _RELATIVE_SEARCH_DIRS) + f"\n and `find` under {REPO_ROOT} and {REPO_ROOT.parent}\n", ) return 1 contents = {n: _read_text(located[n]) for n in ARCHIVE_FILES} prompt = _build_user_prompt(located, contents) if args.dry_run: print("Located files:") for n in ARCHIVE_FILES: print(f" {n} -> {located[n]}") print(f"Prompt size: {len(prompt):,} characters") return 0 api_key = _resolve_api_key() result, model_used = await _generate(api_key, prompt, args.model) out_path = REPO_ROOT / _OUTPUT_NAME header = ( f"# Arche DEF CON tier audit\n\n" f"- Model: `{model_used}`\n" f"- Source files: " + ", ".join(f"`{located[n]}`" for n in ARCHIVE_FILES) + "\n\n---\n\n" ) out_path.write_text(header + result + "\n", encoding="utf-8") print( f"Wrote {out_path} ({out_path.stat().st_size:,} bytes) using model {model_used}." ) print() print(result) return 0
[docs] def main() -> None: """Run the async audit pipeline and exit with its return code. Synchronous CLI entry point that runs :func:`_async_main` via ``asyncio.run`` and converts its integer return value into process exit status. Interactions: calls ``asyncio.run(_async_main(sys.argv[1:]))`` and raises ``SystemExit`` with the result. Invoked from the ``if __name__ == "__main__"`` guard; no internal callers were found elsewhere in the repo. Raises: SystemExit: Always, carrying the exit code returned by :func:`_async_main`. """ raise SystemExit(asyncio.run(_async_main(sys.argv[1:])))
if __name__ == "__main__": main()