#!/usr/bin/env python3
"""Compute skill embeddings from SQLite index and store in Redis."""
from __future__ import annotations
import argparse
import asyncio
import jsonutil as json
import logging
import os
import sys
from pathlib import Path
from typing import Any
import numpy as np
import redis.asyncio as aioredis
sys.path.insert(
0,
os.path.abspath(os.path.join(os.path.dirname(__file__), "..")),
)
from classifiers.skill_catalog import ( # noqa: E402
load_all_skills,
skill_embedding_text,
)
from classifiers.vector_classifier import ( # noqa: E402
SKILL_EMBEDDINGS_HASH_KEY,
SKILL_METADATA_HASH_KEY,
)
from classifiers.redis_vector_index import ( # noqa: E402
delete_skill_embedding_hash,
store_skill_embedding_hash,
)
from config import Config # noqa: E402
from rag_system.openrouter_embeddings import OpenRouterEmbeddings # noqa: E402
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
async def _get_existing_ids(redis_client: aioredis.Redis) -> set[str]:
"""Return the set of skill ids that already have embeddings in Redis.
Issues a single ``HKEYS`` against the ``SKILL_EMBEDDINGS_HASH_KEY`` hash
(the legacy monolithic ``stargazer:skill_embeddings`` map defined in
:mod:`classifiers.vector_classifier`) and decodes any ``bytes`` field names
to ``str`` so callers can compare against SQLite ``skill_id`` values. Used
to drive incremental embedding (skip ids already present) and orphan
pruning (delete ids no longer in SQLite).
Called by :func:`prune_skill_embedding_orphans` and
:func:`update_skill_embeddings` in this module, and by
``scripts/verify_npx_skills_e2e.py`` as part of its end-to-end check.
Args:
redis_client (aioredis.Redis): Async Redis connection used for the
``HKEYS`` read.
Returns:
set[str]: Skill ids currently stored in the embeddings hash.
"""
keys = await redis_client.hkeys(SKILL_EMBEDDINGS_HASH_KEY)
return {k.decode("utf-8") if isinstance(k, bytes) else k for k in keys}
[docs]
async def prune_skill_embedding_orphans(
*,
db_path: str,
) -> int:
"""Delete Redis skill embeddings whose ids no longer exist in the SQLite catalog.
Loads the authoritative skill rows from the SQLite index via
:func:`classifiers.skill_catalog.load_all_skills`, reads the ids currently in
Redis via :func:`_get_existing_ids`, and removes every id present in Redis but
absent from SQLite. For each orphan it HDELs the field from both
``SKILL_EMBEDDINGS_HASH_KEY`` and ``SKILL_METADATA_HASH_KEY`` and drops the
per-skill RediSearch document through
:func:`classifiers.redis_vector_index.delete_skill_embedding_hash`. This keeps
the vector index from routing to skills that have been deleted or renamed.
Opens (from :class:`config.Config` / ``REDIS_URL``) and always closes its own
async Redis connection; it touches Redis and reads the SQLite file but makes
no embedding calls. Called by the nested ``_run`` coroutine in :func:`main`
when ``--prune-orphans`` is passed; no other internal callers were found.
Args:
db_path (str): Path to the skills index SQLite database to treat as the
source of valid skill ids.
Returns:
int: Number of orphaned skill ids removed from Redis (``0`` if none).
"""
cfg = Config.load()
redis_url = cfg.redis_url or os.environ.get(
"REDIS_URL",
"redis://localhost:6379/0",
)
redis_client = aioredis.from_url(
redis_url,
decode_responses=True,
**cfg.redis_connection_kwargs_for_url(redis_url),
)
try:
rows = load_all_skills(Path(db_path))
valid = {r["skill_id"] for r in rows}
existing = await _get_existing_ids(redis_client)
orphan = existing - valid
if not orphan:
logger.info("No orphan skill embeddings in Redis")
return 0
for oid in orphan:
await redis_client.hdel(SKILL_EMBEDDINGS_HASH_KEY, oid)
await redis_client.hdel(SKILL_METADATA_HASH_KEY, oid)
await delete_skill_embedding_hash(redis_client, oid)
logger.info("Pruned %d orphan skill embedding keys from Redis", len(orphan))
return len(orphan)
finally:
await redis_client.aclose()
[docs]
async def update_skill_embeddings(
*,
db_path: str,
force_all: bool = False,
) -> bool:
"""Embed skills from the SQLite catalog into Redis, incrementally by skill id.
The core routine of this module. It loads every skill row via
:func:`classifiers.skill_catalog.load_all_skills`, reads the already-embedded
ids via :func:`_get_existing_ids`, and (unless ``force_all``) limits work to
the skills missing from Redis. For each pending skill it builds the tier-1
embedding text with :func:`classifiers.skill_catalog.skill_embedding_text`,
batches those texts through an :class:`OpenRouterEmbeddings` client
(``cfg.embedding_batch_size`` per request), L2-normalizes each vector, and
stores them: the JSON vector under ``SKILL_EMBEDDINGS_HASH_KEY``, a metadata
blob (id, name, description, paths) under ``SKILL_METADATA_HASH_KEY``, and a
per-skill RediSearch document via
:func:`classifiers.redis_vector_index.store_skill_embedding_hash`.
Opens (from :class:`config.Config` / ``REDIS_URL``) and always closes its own
async Redis connection, reads the SQLite file, and makes OpenRouter embedding
HTTP calls. Called by the nested ``_run`` coroutine in :func:`main` and by
``scripts/skills_corpus_pipeline.py``; no other internal callers were found.
Args:
db_path (str): Path to the skills index SQLite database to read skills
from.
force_all (bool): Recompute embeddings for every skill rather than only
those missing from Redis.
Returns:
bool: ``True`` on completion, including the no-op cases where the database
is empty or all skills already have embeddings.
"""
cfg = Config.load()
redis_url = cfg.redis_url or os.environ.get(
"REDIS_URL",
"redis://localhost:6379/0",
)
redis_client = aioredis.from_url(
redis_url,
decode_responses=True,
**cfg.redis_connection_kwargs_for_url(redis_url),
)
try:
rows = load_all_skills(Path(db_path))
if not rows:
logger.warning("No skills in database: %s", db_path)
return True
existing = await _get_existing_ids(redis_client)
to_do = (
rows if force_all else [r for r in rows if r["skill_id"] not in existing]
)
if not to_do:
logger.info("All %d skills already have embeddings", len(rows))
return True
logger.info(
"Embedding %d skills (%d already in Redis)", len(to_do), len(existing)
)
emb = OpenRouterEmbeddings(api_key=cfg.api_key or None)
embs: dict[str, str] = {}
metas: dict[str, str] = {}
batch_size = max(1, cfg.embedding_batch_size)
for i in range(0, len(to_do), batch_size):
chunk = to_do[i : i + batch_size]
texts = [skill_embedding_text(r["name"], r["description"]) for r in chunk]
vectors = await emb.embed_texts(texts)
for row, vec in zip(chunk, vectors):
if vec.size == 0:
logger.warning("Empty embedding for %s", row["skill_id"])
continue
n = np.linalg.norm(vec)
if n > 0:
vec = vec / n
embs[row["skill_id"]] = json.dumps(vec.tolist())
meta: dict[str, Any] = {
"skill_id": row["skill_id"],
"name": row["name"],
"description": row["description"],
"skill_md_path": row["skill_md_path"],
"skill_root": row["skill_root"],
}
metas[row["skill_id"]] = json.dumps(meta)
if embs:
await redis_client.hset(SKILL_EMBEDDINGS_HASH_KEY, mapping=embs)
await redis_client.hset(SKILL_METADATA_HASH_KEY, mapping=metas)
for sid, js in embs.items():
vec = np.array(json.loads(js), dtype=np.float32)
meta = json.loads(metas[sid])
await store_skill_embedding_hash(
redis_client,
sid,
vec,
meta,
)
logger.info("Stored %d skill embeddings in Redis", len(embs))
await emb.close()
return True
finally:
await redis_client.aclose()
[docs]
def main() -> int:
"""Parse CLI flags and run the skill embedding update synchronously.
Builds an :class:`argparse.ArgumentParser` exposing ``--db`` (SQLite path,
defaulting to ``Config.skills_index_db``), ``--force-all`` (recompute every
skill rather than only missing ids), and ``--prune-orphans`` (drop Redis
keys absent from SQLite before the update). The resolved database path is
threaded into a nested ``_run`` coroutine that optionally calls
:func:`prune_skill_embedding_orphans` and then
:func:`update_skill_embeddings`; ``_run`` is driven to completion with
``asyncio.run``. Both helpers open their own Redis connection (writing the
``SKILL_EMBEDDINGS_HASH_KEY`` / ``SKILL_METADATA_HASH_KEY`` hashes plus the
per-skill ``skill_emb:*`` RediSearch documents) and call OpenRouter for
embeddings.
This is the module entry point invoked under ``if __name__ ==
"__main__"`` via ``raise SystemExit(main())`` (e.g. ``python -m
classifiers.update_skill_embeddings``); no other internal callers were
found.
Returns:
int: ``0`` when the update reports success, ``1`` otherwise — suitable
as a process exit code.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--db", default=None, help="SQLite path (default: config)")
parser.add_argument(
"--force-all",
action="store_true",
help="Recompute embeddings for every skill",
)
parser.add_argument(
"--prune-orphans",
action="store_true",
help="HDEL Redis skill keys not in SQLite (before embedding update)",
)
args = parser.parse_args()
cfg = Config.load()
db = args.db or cfg.skills_index_db
async def _run() -> bool:
"""Run optional orphan pruning then the skill embedding update.
Closure over the parsed ``args`` and resolved ``db`` path from
:func:`main`. When ``--prune-orphans`` was passed it first awaits
:func:`prune_skill_embedding_orphans` (deleting Redis skill keys absent
from SQLite), then awaits :func:`update_skill_embeddings` with the
``--force-all`` flag and returns its success boolean. Each call manages
its own Redis connection and OpenRouter embedding requests.
Returns:
bool: The success flag from :func:`update_skill_embeddings`.
"""
if args.prune_orphans:
await prune_skill_embedding_orphans(db_path=db)
return await update_skill_embeddings(
db_path=db,
force_all=args.force_all,
)
ok = asyncio.run(_run())
return 0 if ok else 1
if __name__ == "__main__":
raise SystemExit(main())