Source code for classifiers.update_skill_embeddings

#!/usr/bin/env python3
"""Compute skill embeddings from SQLite index and store in Redis."""

from __future__ import annotations

import argparse
import asyncio
import jsonutil as json
import logging
import os
import sys
from pathlib import Path
from typing import Any

import numpy as np
import redis.asyncio as aioredis

sys.path.insert(
    0,
    os.path.abspath(os.path.join(os.path.dirname(__file__), "..")),
)

from classifiers.skill_catalog import (  # noqa: E402
    load_all_skills,
    skill_embedding_text,
)
from classifiers.vector_classifier import (  # noqa: E402
    SKILL_EMBEDDINGS_HASH_KEY,
    SKILL_METADATA_HASH_KEY,
)
from classifiers.redis_vector_index import (  # noqa: E402
    delete_skill_embedding_hash,
    store_skill_embedding_hash,
)
from config import Config  # noqa: E402
from rag_system.openrouter_embeddings import OpenRouterEmbeddings  # noqa: E402

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)


async def _get_existing_ids(redis_client: aioredis.Redis) -> set[str]:
    """Return the set of skill ids that already have embeddings in Redis.

    Issues a single ``HKEYS`` against the ``SKILL_EMBEDDINGS_HASH_KEY`` hash
    (the legacy monolithic ``stargazer:skill_embeddings`` map defined in
    :mod:`classifiers.vector_classifier`) and decodes any ``bytes`` field names
    to ``str`` so callers can compare against SQLite ``skill_id`` values. Used
    to drive incremental embedding (skip ids already present) and orphan
    pruning (delete ids no longer in SQLite).

    Called by :func:`prune_skill_embedding_orphans` and
    :func:`update_skill_embeddings` in this module, and by
    ``scripts/verify_npx_skills_e2e.py`` as part of its end-to-end check.

    Args:
        redis_client (aioredis.Redis): Async Redis connection used for the
            ``HKEYS`` read.

    Returns:
        set[str]: Skill ids currently stored in the embeddings hash.
    """
    keys = await redis_client.hkeys(SKILL_EMBEDDINGS_HASH_KEY)
    return {k.decode("utf-8") if isinstance(k, bytes) else k for k in keys}


[docs] async def prune_skill_embedding_orphans( *, db_path: str, ) -> int: """Delete Redis skill embeddings whose ids no longer exist in the SQLite catalog. Loads the authoritative skill rows from the SQLite index via :func:`classifiers.skill_catalog.load_all_skills`, reads the ids currently in Redis via :func:`_get_existing_ids`, and removes every id present in Redis but absent from SQLite. For each orphan it HDELs the field from both ``SKILL_EMBEDDINGS_HASH_KEY`` and ``SKILL_METADATA_HASH_KEY`` and drops the per-skill RediSearch document through :func:`classifiers.redis_vector_index.delete_skill_embedding_hash`. This keeps the vector index from routing to skills that have been deleted or renamed. Opens (from :class:`config.Config` / ``REDIS_URL``) and always closes its own async Redis connection; it touches Redis and reads the SQLite file but makes no embedding calls. Called by the nested ``_run`` coroutine in :func:`main` when ``--prune-orphans`` is passed; no other internal callers were found. Args: db_path (str): Path to the skills index SQLite database to treat as the source of valid skill ids. Returns: int: Number of orphaned skill ids removed from Redis (``0`` if none). """ cfg = Config.load() redis_url = cfg.redis_url or os.environ.get( "REDIS_URL", "redis://localhost:6379/0", ) redis_client = aioredis.from_url( redis_url, decode_responses=True, **cfg.redis_connection_kwargs_for_url(redis_url), ) try: rows = load_all_skills(Path(db_path)) valid = {r["skill_id"] for r in rows} existing = await _get_existing_ids(redis_client) orphan = existing - valid if not orphan: logger.info("No orphan skill embeddings in Redis") return 0 for oid in orphan: await redis_client.hdel(SKILL_EMBEDDINGS_HASH_KEY, oid) await redis_client.hdel(SKILL_METADATA_HASH_KEY, oid) await delete_skill_embedding_hash(redis_client, oid) logger.info("Pruned %d orphan skill embedding keys from Redis", len(orphan)) return len(orphan) finally: await redis_client.aclose()
[docs] async def update_skill_embeddings( *, db_path: str, force_all: bool = False, ) -> bool: """Embed skills from the SQLite catalog into Redis, incrementally by skill id. The core routine of this module. It loads every skill row via :func:`classifiers.skill_catalog.load_all_skills`, reads the already-embedded ids via :func:`_get_existing_ids`, and (unless ``force_all``) limits work to the skills missing from Redis. For each pending skill it builds the tier-1 embedding text with :func:`classifiers.skill_catalog.skill_embedding_text`, batches those texts through an :class:`OpenRouterEmbeddings` client (``cfg.embedding_batch_size`` per request), L2-normalizes each vector, and stores them: the JSON vector under ``SKILL_EMBEDDINGS_HASH_KEY``, a metadata blob (id, name, description, paths) under ``SKILL_METADATA_HASH_KEY``, and a per-skill RediSearch document via :func:`classifiers.redis_vector_index.store_skill_embedding_hash`. Opens (from :class:`config.Config` / ``REDIS_URL``) and always closes its own async Redis connection, reads the SQLite file, and makes OpenRouter embedding HTTP calls. Called by the nested ``_run`` coroutine in :func:`main` and by ``scripts/skills_corpus_pipeline.py``; no other internal callers were found. Args: db_path (str): Path to the skills index SQLite database to read skills from. force_all (bool): Recompute embeddings for every skill rather than only those missing from Redis. Returns: bool: ``True`` on completion, including the no-op cases where the database is empty or all skills already have embeddings. """ cfg = Config.load() redis_url = cfg.redis_url or os.environ.get( "REDIS_URL", "redis://localhost:6379/0", ) redis_client = aioredis.from_url( redis_url, decode_responses=True, **cfg.redis_connection_kwargs_for_url(redis_url), ) try: rows = load_all_skills(Path(db_path)) if not rows: logger.warning("No skills in database: %s", db_path) return True existing = await _get_existing_ids(redis_client) to_do = ( rows if force_all else [r for r in rows if r["skill_id"] not in existing] ) if not to_do: logger.info("All %d skills already have embeddings", len(rows)) return True logger.info( "Embedding %d skills (%d already in Redis)", len(to_do), len(existing) ) emb = OpenRouterEmbeddings(api_key=cfg.api_key or None) embs: dict[str, str] = {} metas: dict[str, str] = {} batch_size = max(1, cfg.embedding_batch_size) for i in range(0, len(to_do), batch_size): chunk = to_do[i : i + batch_size] texts = [skill_embedding_text(r["name"], r["description"]) for r in chunk] vectors = await emb.embed_texts(texts) for row, vec in zip(chunk, vectors): if vec.size == 0: logger.warning("Empty embedding for %s", row["skill_id"]) continue n = np.linalg.norm(vec) if n > 0: vec = vec / n embs[row["skill_id"]] = json.dumps(vec.tolist()) meta: dict[str, Any] = { "skill_id": row["skill_id"], "name": row["name"], "description": row["description"], "skill_md_path": row["skill_md_path"], "skill_root": row["skill_root"], } metas[row["skill_id"]] = json.dumps(meta) if embs: await redis_client.hset(SKILL_EMBEDDINGS_HASH_KEY, mapping=embs) await redis_client.hset(SKILL_METADATA_HASH_KEY, mapping=metas) for sid, js in embs.items(): vec = np.array(json.loads(js), dtype=np.float32) meta = json.loads(metas[sid]) await store_skill_embedding_hash( redis_client, sid, vec, meta, ) logger.info("Stored %d skill embeddings in Redis", len(embs)) await emb.close() return True finally: await redis_client.aclose()
[docs] def main() -> int: """Parse CLI flags and run the skill embedding update synchronously. Builds an :class:`argparse.ArgumentParser` exposing ``--db`` (SQLite path, defaulting to ``Config.skills_index_db``), ``--force-all`` (recompute every skill rather than only missing ids), and ``--prune-orphans`` (drop Redis keys absent from SQLite before the update). The resolved database path is threaded into a nested ``_run`` coroutine that optionally calls :func:`prune_skill_embedding_orphans` and then :func:`update_skill_embeddings`; ``_run`` is driven to completion with ``asyncio.run``. Both helpers open their own Redis connection (writing the ``SKILL_EMBEDDINGS_HASH_KEY`` / ``SKILL_METADATA_HASH_KEY`` hashes plus the per-skill ``skill_emb:*`` RediSearch documents) and call OpenRouter for embeddings. This is the module entry point invoked under ``if __name__ == "__main__"`` via ``raise SystemExit(main())`` (e.g. ``python -m classifiers.update_skill_embeddings``); no other internal callers were found. Returns: int: ``0`` when the update reports success, ``1`` otherwise — suitable as a process exit code. """ parser = argparse.ArgumentParser() parser.add_argument("--db", default=None, help="SQLite path (default: config)") parser.add_argument( "--force-all", action="store_true", help="Recompute embeddings for every skill", ) parser.add_argument( "--prune-orphans", action="store_true", help="HDEL Redis skill keys not in SQLite (before embedding update)", ) args = parser.parse_args() cfg = Config.load() db = args.db or cfg.skills_index_db async def _run() -> bool: """Run optional orphan pruning then the skill embedding update. Closure over the parsed ``args`` and resolved ``db`` path from :func:`main`. When ``--prune-orphans`` was passed it first awaits :func:`prune_skill_embedding_orphans` (deleting Redis skill keys absent from SQLite), then awaits :func:`update_skill_embeddings` with the ``--force-all`` flag and returns its success boolean. Each call manages its own Redis connection and OpenRouter embedding requests. Returns: bool: The success flag from :func:`update_skill_embeddings`. """ if args.prune_orphans: await prune_skill_embedding_orphans(db_path=db) return await update_skill_embeddings( db_path=db, force_all=args.force_all, ) ok = asyncio.run(_run()) return 0 if ok else 1
if __name__ == "__main__": raise SystemExit(main())