#!/usr/bin/env python3
"""Copy monolithic tool/skill embedding hashes into per-key RediSearch HASH docs.
Run after ``ensure_indexes`` has created ``idx:tool_embeddings`` and
``idx:skill_embeddings``. Safe to re-run (overwrites ``tool_emb:*`` /
``skill_emb:*`` keys from legacy hash data).
Usage::
python -m classifiers.migrate_embeddings_redisearch
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
import redis.asyncio as aioredis
sys.path.insert(
0,
os.path.abspath(os.path.join(os.path.dirname(__file__), "..")),
)
from classifiers.redis_vector_index import ( # noqa: E402
migrate_legacy_skill_hashes_to_redisearch,
migrate_legacy_tool_hashes_to_redisearch,
)
from classifiers.vector_classifier import ( # noqa: E402
SKILL_EMBEDDINGS_HASH_KEY,
SKILL_METADATA_HASH_KEY,
TOOL_EMBEDDINGS_HASH_KEY,
TOOL_METADATA_HASH_KEY,
)
from config import Config # noqa: E402
from init_redis_indexes import ensure_indexes # noqa: E402
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
async def _run(redis_url: str, connection_kwargs: dict) -> None:
"""Connect to Redis and migrate legacy tool/skill embedding hashes.
Opens an async Redis client (``decode_responses=True``) from *redis_url* plus
*connection_kwargs*, pings it to confirm connectivity, then calls
:func:`init_redis_indexes.ensure_indexes` to (re)create the RediSearch indexes
``idx:tool_embeddings`` and ``idx:skill_embeddings`` before copying data into
them. It then calls
:func:`classifiers.redis_vector_index.migrate_legacy_tool_hashes_to_redisearch`
(reading the monolithic ``TOOL_EMBEDDINGS_HASH_KEY`` / ``TOOL_METADATA_HASH_KEY``
hashes) and
:func:`classifiers.redis_vector_index.migrate_legacy_skill_hashes_to_redisearch`
(reading ``SKILL_EMBEDDINGS_HASH_KEY`` / ``SKILL_METADATA_HASH_KEY``), which
fan the legacy blobs out into per-key ``tool_emb:*`` and ``skill_emb:*``
RediSearch HASH documents.
Side effects: writes the per-key embedding documents to Redis and logs a
completion summary with the tool and skill counts. The client is always closed
via ``aclose()`` in a ``finally`` block. Re-running is safe because the
underlying migration overwrites existing ``tool_emb:*`` / ``skill_emb:*`` keys.
Called by :func:`main` through :func:`asyncio.run`. No other internal callers
were found.
Args:
redis_url: Redis connection URL to migrate against.
connection_kwargs: Extra keyword arguments (e.g. TLS settings) passed
through to :func:`redis.asyncio.from_url`, typically produced by
:meth:`Config.redis_connection_kwargs_for_url`.
Raises:
Exception: Propagates any connection, index-creation, or migration error
to :func:`main`, which logs it and returns a non-zero exit code.
"""
r = aioredis.from_url(
redis_url,
decode_responses=True,
**connection_kwargs,
)
try:
await r.ping()
await ensure_indexes(r)
nt = await migrate_legacy_tool_hashes_to_redisearch(
r,
embeddings_key=TOOL_EMBEDDINGS_HASH_KEY,
metadata_key=TOOL_METADATA_HASH_KEY,
)
ns = await migrate_legacy_skill_hashes_to_redisearch(
r,
embeddings_key=SKILL_EMBEDDINGS_HASH_KEY,
metadata_key=SKILL_METADATA_HASH_KEY,
)
logger.info(
"Migration complete: %d tools, %d skills (RediSearch HASH keys)",
nt,
ns,
)
finally:
await r.aclose()
[docs]
def main() -> int:
"""CLI entrypoint that resolves the Redis URL and drives the migration.
Parses ``--redis-url`` from ``argv``, loads configuration via
:meth:`Config.load`, and resolves the effective Redis URL with the precedence
``--redis-url`` > ``cfg.redis_url`` > ``$REDIS_URL`` >
``redis://localhost:6379/0``. It derives connection keyword arguments through
:meth:`Config.redis_connection_kwargs_for_url` and then runs the async
:func:`_run` coroutine via :func:`asyncio.run` to perform the actual hash
migration.
Side effects: triggers the Redis writes performed by :func:`_run` and logs an
error (with traceback) if the migration raises. Any exception is caught here
and converted into a non-zero return code rather than propagating.
Invoked from the module's ``__main__`` guard via ``raise SystemExit(main())``.
No other internal callers were found.
Returns:
``0`` on a successful migration, ``1`` if :func:`_run` raised.
"""
parser = argparse.ArgumentParser(
description="Migrate legacy embedding hashes to RediSearch tool_emb/skill_emb keys",
)
parser.add_argument(
"--redis-url",
default=None,
help=(
"Redis URL (default: config redis_url, then $REDIS_URL, "
"then redis://localhost:6379/0)"
),
)
args = parser.parse_args()
cfg = Config.load()
redis_url = (
args.redis_url
or cfg.redis_url
or os.environ.get("REDIS_URL", "redis://localhost:6379/0")
)
conn_kw = cfg.redis_connection_kwargs_for_url(redis_url)
try:
asyncio.run(_run(redis_url, conn_kw))
except Exception as exc:
logger.error("%s", exc, exc_info=True)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())