Source code for classifiers.skill_catalog

"""SQLite-backed index for Agent Skills (SKILL.md discovery and metadata)."""

from __future__ import annotations

import hashlib
import logging
import re
import sqlite3
import time
from pathlib import Path
from typing import Any

import yaml

logger = logging.getLogger(__name__)

# Skip paths that cannot contain skills (aligned with agentskills client guide).
_SKIP_DIR_NAMES = frozenset(
    {
        ".git",
        "node_modules",
        "__pycache__",
        ".venv",
        "venv",
        "dist",
        "build",
        ".mypy_cache",
        ".pytest_cache",
    }
)

_MAX_SCAN_DEPTH = 8

# Hidden dirs are skipped except `.agents` — `npx skills add` installs under `.agents/skills/`.
_ALLOW_DOT_DIR_NAMES = frozenset({".agents"})


def _parse_skill_md(path: Path) -> dict[str, Any] | None:
    """Parse one SKILL.md file's YAML frontmatter and body.

    Reads the file, splits the leading ``---`` fenced YAML frontmatter from the
    markdown body, and pulls out the required ``name`` and ``description``
    fields. This is the single-file parsing primitive behind skill discovery:
    a file that is not a well-formed skill (no frontmatter, malformed YAML,
    missing/empty name or description) yields ``None`` so the caller can skip
    it. It also fingerprints the body with a SHA-256 hash used downstream to
    dedupe identical skills across corpora.

    Touches the filesystem (reads *path*) and logs warnings on read errors and
    debug on YAML errors; it does no network, Redis, or database I/O.

    Called by ``classifiers.ingest_skills`` while walking the discovered skill
    directories. No other in-repo callers were found.

    Args:
        path: Filesystem path to a candidate ``SKILL.md`` file.

    Returns:
        A dict with ``name``, ``description``, ``body``, and ``body_hash`` keys
        for a valid skill, or ``None`` when the file is unreadable or is not a
        complete skill definition.
    """
    try:
        raw = path.read_text(encoding="utf-8", errors="replace")
    except OSError as exc:
        logger.warning("Cannot read %s: %s", path, exc)
        return None

    if not raw.startswith("---"):
        return None

    m = re.match(
        r"^---\s*\n(.*?)\n---\s*\n(.*)$",
        raw,
        re.DOTALL,
    )
    if not m:
        return None

    fm_raw, body = m.group(1), m.group(2)
    try:
        fm = yaml.safe_load(fm_raw) or {}
    except yaml.YAMLError as exc:
        logger.debug("YAML frontmatter failed %s: %s", path, exc)
        return None

    if not isinstance(fm, dict):
        return None

    name = fm.get("name")
    description = fm.get("description")
    if not name or not description:
        return None
    name = str(name).strip()
    description = str(description).strip()
    if not name or not description:
        return None

    body_hash = hashlib.sha256(body.encode("utf-8")).hexdigest()
    return {
        "name": name,
        "description": description,
        "body": body,
        "body_hash": body_hash,
    }


[docs] def stable_skill_id(skill_root: Path, corpus_root: Path) -> str: """Derive a deterministic skill ID from its path relative to the corpus. Produces a stable, content-independent identifier for a skill so the same skill keeps the same primary key across re-ingests (it depends only on location, not on the skill body). The relative posix path under *corpus_root* is hashed with SHA-256 and truncated to 32 hex chars; if the skill root is not actually under *corpus_root* the directory name is used as the fallback basis instead. A pure path/hash helper with no I/O. Called by ``classifiers.ingest_skills`` (to key each upserted row) and exercised by ``tests/test_skill_catalog.py``. Args: skill_root: Directory containing the skill (the dir that holds its ``SKILL.md``). corpus_root: Root of the skills corpus that *skill_root* lives under. Returns: A 32-character lowercase hex string usable as the skill's primary key. """ try: rel = skill_root.resolve().relative_to(corpus_root.resolve()) except ValueError: rel = skill_root.name rel_s = rel.as_posix() return hashlib.sha256(rel_s.encode("utf-8")).hexdigest()[:32]
[docs] def canonical_skill_sort_key(skill_dir: Path, corpus_root: Path) -> tuple[int, str]: """Rank a skill directory so canonical sources win when deduping. Returns a sort key that establishes source precedence among otherwise identical skills: git-cloned corpora under ``repos`` sort first (bucket 0), ``npx``-installed copies next (bucket 1), everything else after (bucket 2), and anything outside *corpus_root* last (bucket 99). When several directories share the same ``body_hash``, sorting by this key and keeping the first ensures the git-managed copy is treated as canonical rather than a transient ``npx`` install. A pure path-classification helper with no I/O. Called by ``classifiers.ingest_skills`` as the ``key`` when sorting discovered skill directories before deduplication. No other in-repo callers were found. Args: skill_dir: Directory of the skill being ranked. corpus_root: Root of the corpus used to compute the relative path and identify the ``repos`` / ``npx`` top-level bucket. Returns: A ``(bucket, relative_posix_path)`` tuple; lower buckets and lexicographically smaller paths sort first. """ try: rel = skill_dir.resolve().relative_to(corpus_root.resolve()) except ValueError: return (99, skill_dir.as_posix()) parts = rel.parts bucket = 2 if len(parts) >= 1 and parts[0] == "repos": bucket = 0 elif len(parts) >= 1 and parts[0] == "npx": bucket = 1 return (bucket, rel.as_posix())
[docs] def discover_skill_dirs( root: Path, *, max_depth: int = _MAX_SCAN_DEPTH, ) -> list[Path]: """Find every directory under *root* that directly holds a SKILL.md. Entry point for skill discovery: it resolves *root* and runs a depth-bounded recursive walk (see the inner ``walk`` closure) that collects each directory containing a ``SKILL.md`` and prunes that branch, since skills are not nested. Directories named in ``_SKIP_DIR_NAMES`` and hidden dot-directories (except the allow-listed ``.agents``) are ignored so VCS, cache, and dependency trees are not scanned. Touches the filesystem (directory iteration only); unreadable directories are silently skipped. Called by ``classifiers.ingest_skills`` to enumerate the corpus and exercised by ``tests/test_skill_catalog.py``. Args: root: Directory to scan recursively for skills. max_depth: Maximum recursion depth below *root* (default ``_MAX_SCAN_DEPTH``); deeper directories are not visited. Returns: The list of directories that directly contain a ``SKILL.md`` file. """ found: list[Path] = [] root = root.resolve() def walk(dir_path: Path, depth: int) -> None: """Recursively collect directories that directly contain a ``SKILL.md``. Depth-bounded (``max_depth``) walk used by :func:`discover_skill_dirs`: when a directory holds a ``SKILL.md`` it is appended to the enclosing ``found`` list and the branch is pruned (skills are not nested); otherwise each non-skipped subdirectory is recursed into. Skipped names and unreadable directories are ignored silently. Mutates the closed-over ``found`` list and returns nothing. Args: dir_path: Directory currently being scanned. depth: Current recursion depth; recursion stops past ``max_depth``. """ if depth > max_depth: return try: entries = list(dir_path.iterdir()) except OSError: return skill_file = dir_path / "SKILL.md" if skill_file.is_file(): found.append(dir_path) return for entry in entries: if not entry.is_dir(): continue if entry.name in _SKIP_DIR_NAMES: continue if entry.name.startswith(".") and entry.name not in _ALLOW_DOT_DIR_NAMES: continue walk(entry, depth + 1) if root.is_dir(): walk(root, 0) return found
[docs] def init_db(db_path: Path) -> None: """Create the SQLite skills table (and its parent dir) if absent. Idempotent schema bootstrap for the skill catalog database. It ensures the parent directory exists, opens *db_path*, and runs a ``CREATE TABLE IF NOT EXISTS skills`` so subsequent :func:`upsert_skill` and ``load_*`` calls have a table to work against. Safe to call repeatedly. Touches the filesystem and the SQLite database (creates directories, connects, commits DDL, then closes the connection); no other I/O. Called by ``classifiers.ingest_skills`` before populating the catalog and by ``tests/test_skill_catalog.py``. Args: db_path: Path to the SQLite database file to initialise. """ db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(db_path)) try: conn.execute( """ CREATE TABLE IF NOT EXISTS skills ( skill_id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT NOT NULL, skill_md_path TEXT NOT NULL, skill_root TEXT NOT NULL, corpus_root TEXT NOT NULL, body_hash TEXT NOT NULL, ingested_at REAL NOT NULL ) """, ) conn.commit() finally: conn.close()
[docs] def upsert_skill( db_path: Path, row: dict[str, Any], ) -> None: """Insert or replace a single skill row in the catalog database. Upserts one skill keyed by ``skill_id`` using ``INSERT OR REPLACE`` so a re-ingested skill overwrites its prior row rather than duplicating it. The ``ingested_at`` timestamp defaults to ``time.time()`` when the caller does not supply one, recording when the row was last written. Touches the SQLite database (connects, executes the upsert, commits, and closes); assumes the ``skills`` table already exists via :func:`init_db`. Called by ``classifiers.ingest_skills`` for each discovered skill. No other in-repo callers were found. Args: db_path: Path to the SQLite catalog database. row: Mapping with ``skill_id``, ``name``, ``description``, ``skill_md_path``, ``skill_root``, ``corpus_root``, and ``body_hash`` keys, plus an optional ``ingested_at`` epoch float. """ conn = sqlite3.connect(str(db_path)) try: conn.execute( """ INSERT OR REPLACE INTO skills ( skill_id, name, description, skill_md_path, skill_root, corpus_root, body_hash, ingested_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( row["skill_id"], row["name"], row["description"], row["skill_md_path"], row["skill_root"], row["corpus_root"], row["body_hash"], row.get("ingested_at", time.time()), ), ) conn.commit() finally: conn.close()
[docs] def load_skill_by_id(db_path: Path, skill_id: str) -> dict[str, Any] | None: """Load a single skill's catalog row by its ID. Point lookup against the ``skills`` table used to resolve a skill's metadata (including its ``skill_md_path`` on disk) from the stable ID. A missing database file or a missing row both yield ``None`` rather than raising, so callers can treat "unknown skill" uniformly. Touches the SQLite database (opens read-only via a ``SELECT``, then closes). Called by the ``activate_skill`` tool to fetch the row before reading the skill body, and exercised by ``tests/test_skill_catalog.py``. Args: db_path: Path to the SQLite catalog database. skill_id: Stable skill identifier (see :func:`stable_skill_id`). Returns: A dict of the skill's columns (without ``ingested_at``), or ``None`` if the database file or the row does not exist. """ if not db_path.is_file(): return None conn = sqlite3.connect(str(db_path)) try: cur = conn.execute( "SELECT skill_id, name, description, skill_md_path, skill_root, " "corpus_root, body_hash FROM skills WHERE skill_id = ?", (skill_id,), ) r = cur.fetchone() if not r: return None return { "skill_id": r[0], "name": r[1], "description": r[2], "skill_md_path": r[3], "skill_root": r[4], "corpus_root": r[5], "body_hash": r[6], } finally: conn.close()
[docs] def load_all_skills(db_path: Path) -> list[dict[str, Any]]: """Return every skill's metadata row from the catalog database. Full-table scan of ``skills`` used wherever the whole catalog is needed: counting ingested skills, building embeddings over all skills, and end-to-end verification. A missing database file yields an empty list rather than raising. Touches the SQLite database (opens, runs a ``SELECT`` of all rows, closes). Called by ``classifiers.ingest_skills`` and ``classifiers.update_skill_embeddings`` (to embed skills), by ``scripts/skills_corpus_pipeline.py`` and ``scripts/verify_npx_skills_e2e.py``, and by ``tests/test_skill_catalog.py``. Args: db_path: Path to the SQLite catalog database. Returns: A list of per-skill dicts (each without ``ingested_at``); empty if the database file does not exist. """ if not db_path.is_file(): return [] conn = sqlite3.connect(str(db_path)) try: cur = conn.execute( "SELECT skill_id, name, description, skill_md_path, skill_root, " "corpus_root, body_hash FROM skills", ) rows = [] for r in cur.fetchall(): rows.append( { "skill_id": r[0], "name": r[1], "description": r[2], "skill_md_path": r[3], "skill_root": r[4], "corpus_root": r[5], "body_hash": r[6], } ) return rows finally: conn.close()
[docs] def read_skill_body(skill_md_path: Path) -> tuple[str, str]: """Read a SKILL.md and split its markdown body from the raw text. Loads the file and strips the leading ``---`` fenced YAML frontmatter, returning both the body alone (for presenting/activating the skill) and the untouched full text (for callers that still need the frontmatter). When no frontmatter is present the whole file is treated as the body. Touches the filesystem (reads *skill_md_path*); no other I/O. Called by the ``activate_skill`` tool when surfacing a skill's instructions. No other in-repo callers were found. Args: skill_md_path: Path to the ``SKILL.md`` file to read. Returns: A ``(body, raw)`` tuple: the markdown body with frontmatter removed and whitespace-stripped, and the original unmodified file contents. """ raw = skill_md_path.read_text(encoding="utf-8", errors="replace") m = re.match( r"^---\s*\n.*?\n---\s*\n(.*)$", raw, re.DOTALL, ) if m: return m.group(1).strip(), raw return raw.strip(), raw
[docs] def skill_embedding_text(name: str, description: str) -> str: """Build the text representation of a skill for semantic embedding. Joins a skill's name and description into the single string that gets embedded for vector search, so a query can be matched against skills by meaning. Keeping this in one helper ensures ingestion and lookup embed skills identically (tier-1 style: name then description, newline-separated). A pure string helper with no I/O. Called by ``classifiers.update_skill_embeddings`` when computing the embedding for each skill row. No other in-repo callers were found. Args: name: The skill's name. description: The skill's description. Returns: The ``"name\\ndescription"`` string fed to the embedding model. """ return f"{name}\n{description}"