Source code for classifiers.migrate_embeddings_redisearch

#!/usr/bin/env python3
"""Copy monolithic tool/skill embedding hashes into per-key RediSearch HASH docs.

Run after ``ensure_indexes`` has created ``idx:tool_embeddings`` and
``idx:skill_embeddings``. Safe to re-run (overwrites ``tool_emb:*`` /
``skill_emb:*`` keys from legacy hash data).

Usage::

    python -m classifiers.migrate_embeddings_redisearch
"""

from __future__ import annotations

import argparse
import asyncio
import logging
import os
import sys

import redis.asyncio as aioredis

sys.path.insert(
    0,
    os.path.abspath(os.path.join(os.path.dirname(__file__), "..")),
)

from classifiers.redis_vector_index import (  # noqa: E402
    migrate_legacy_skill_hashes_to_redisearch,
    migrate_legacy_tool_hashes_to_redisearch,
)
from classifiers.vector_classifier import (  # noqa: E402
    SKILL_EMBEDDINGS_HASH_KEY,
    SKILL_METADATA_HASH_KEY,
    TOOL_EMBEDDINGS_HASH_KEY,
    TOOL_METADATA_HASH_KEY,
)
from config import Config  # noqa: E402
from init_redis_indexes import ensure_indexes  # noqa: E402

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)


async def _run(redis_url: str, connection_kwargs: dict) -> None:
    """Connect to Redis and migrate legacy tool/skill embedding hashes.

    Opens an async Redis client (``decode_responses=True``) from *redis_url* plus
    *connection_kwargs*, pings it to confirm connectivity, then calls
    :func:`init_redis_indexes.ensure_indexes` to (re)create the RediSearch indexes
    ``idx:tool_embeddings`` and ``idx:skill_embeddings`` before copying data into
    them. It then calls
    :func:`classifiers.redis_vector_index.migrate_legacy_tool_hashes_to_redisearch`
    (reading the monolithic ``TOOL_EMBEDDINGS_HASH_KEY`` / ``TOOL_METADATA_HASH_KEY``
    hashes) and
    :func:`classifiers.redis_vector_index.migrate_legacy_skill_hashes_to_redisearch`
    (reading ``SKILL_EMBEDDINGS_HASH_KEY`` / ``SKILL_METADATA_HASH_KEY``), which
    fan the legacy blobs out into per-key ``tool_emb:*`` and ``skill_emb:*``
    RediSearch HASH documents.

    Side effects: writes the per-key embedding documents to Redis and logs a
    completion summary with the tool and skill counts. The client is always closed
    via ``aclose()`` in a ``finally`` block. Re-running is safe because the
    underlying migration overwrites existing ``tool_emb:*`` / ``skill_emb:*`` keys.

    Called by :func:`main` through :func:`asyncio.run`. No other internal callers
    were found.

    Args:
        redis_url: Redis connection URL to migrate against.
        connection_kwargs: Extra keyword arguments (e.g. TLS settings) passed
            through to :func:`redis.asyncio.from_url`, typically produced by
            :meth:`Config.redis_connection_kwargs_for_url`.

    Raises:
        Exception: Propagates any connection, index-creation, or migration error
            to :func:`main`, which logs it and returns a non-zero exit code.
    """
    r = aioredis.from_url(
        redis_url,
        decode_responses=True,
        **connection_kwargs,
    )
    try:
        await r.ping()
        await ensure_indexes(r)
        nt = await migrate_legacy_tool_hashes_to_redisearch(
            r,
            embeddings_key=TOOL_EMBEDDINGS_HASH_KEY,
            metadata_key=TOOL_METADATA_HASH_KEY,
        )
        ns = await migrate_legacy_skill_hashes_to_redisearch(
            r,
            embeddings_key=SKILL_EMBEDDINGS_HASH_KEY,
            metadata_key=SKILL_METADATA_HASH_KEY,
        )
        logger.info(
            "Migration complete: %d tools, %d skills (RediSearch HASH keys)",
            nt,
            ns,
        )
    finally:
        await r.aclose()


[docs] def main() -> int: """CLI entrypoint that resolves the Redis URL and drives the migration. Parses ``--redis-url`` from ``argv``, loads configuration via :meth:`Config.load`, and resolves the effective Redis URL with the precedence ``--redis-url`` > ``cfg.redis_url`` > ``$REDIS_URL`` > ``redis://localhost:6379/0``. It derives connection keyword arguments through :meth:`Config.redis_connection_kwargs_for_url` and then runs the async :func:`_run` coroutine via :func:`asyncio.run` to perform the actual hash migration. Side effects: triggers the Redis writes performed by :func:`_run` and logs an error (with traceback) if the migration raises. Any exception is caught here and converted into a non-zero return code rather than propagating. Invoked from the module's ``__main__`` guard via ``raise SystemExit(main())``. No other internal callers were found. Returns: ``0`` on a successful migration, ``1`` if :func:`_run` raised. """ parser = argparse.ArgumentParser( description="Migrate legacy embedding hashes to RediSearch tool_emb/skill_emb keys", ) parser.add_argument( "--redis-url", default=None, help=( "Redis URL (default: config redis_url, then $REDIS_URL, " "then redis://localhost:6379/0)" ), ) args = parser.parse_args() cfg = Config.load() redis_url = ( args.redis_url or cfg.redis_url or os.environ.get("REDIS_URL", "redis://localhost:6379/0") ) conn_kw = cfg.redis_connection_kwargs_for_url(redis_url) try: asyncio.run(_run(redis_url, conn_kw)) except Exception as exc: logger.error("%s", exc, exc_info=True) return 1 return 0
if __name__ == "__main__": raise SystemExit(main())