"""RediSearch KNN helpers for tool/skill/dangerous-command centroid embeddings.
Stores one Redis HASH per item under ``tool_emb:{name}`` / ``skill_emb:{id}`` /
``dangerous_cmd_emb:{category_id}`` / ``benign_tech_emb:{category_id}`` and
queries via ``FT.SEARCH`` (see :mod:`init_redis_indexes`).
Legacy monolithic hashes (:data:`TOOL_EMBEDDINGS_HASH_KEY`, etc.) remain supported
as a fallback until fully migrated.
"""
from __future__ import annotations
import jsonutil as json
import logging
from typing import Any
import numpy as np
import redis.asyncio as aioredis
from redis.commands.search.query import Query
from init_redis_indexes import (
BENIGN_TECH_EMB_PREFIX,
BENIGN_TECH_INDEX_NAME,
DANGEROUS_CMD_EMB_PREFIX,
DANGEROUS_CMD_INDEX_NAME,
SKILL_EMB_PREFIX,
SKILL_INDEX_NAME,
TOOL_EMB_PREFIX,
TOOL_INDEX_NAME,
VECTOR_DIM,
)
logger = logging.getLogger(__name__)
# RediSearch HNSW KNN is approximate; EF_RUNTIME controls query-time graph
# exploration. Without it, the true nearest neighbor is often missing from
# top-k on large indexes. Syntax: EF_RUNTIME must appear *before* ``AS``.
# See: https://redis.io/docs/latest/develop/interact/search-and-query/query/vector-search/
DEFAULT_KNN_EF_RUNTIME = 200
def _knn_clause(k: int, ef_runtime: int) -> str:
"""Build the RediSearch ``FT.SEARCH`` KNN query string for vector lookups.
Assembles the ``*=>[KNN ...]`` hybrid clause that selects the ``k`` nearest
neighbours of the ``$query_vec`` query parameter against the ``@embedding``
field, exposing the distance under the alias ``score``. The ``EF_RUNTIME``
knob is emitted *before* the ``AS`` alias (required syntax) so the HNSW graph
explores enough candidates to surface the true nearest neighbours on large
indexes.
This is a pure string builder with no side effects. It is called by every
KNN search helper in this module (:func:`knn_search_tools`,
:func:`knn_search_skills`, :func:`knn_search_dangerous_cmds`,
:func:`knn_search_benign_tech`) to construct the clause they wrap in a
:class:`redis.commands.search.query.Query`.
Args:
k: Number of nearest neighbours to request from the vector index.
ef_runtime: HNSW query-time exploration factor; larger values trade
latency for recall.
Returns:
str: The RediSearch query string embedding the KNN clause.
"""
return f"*=>[KNN {k} @embedding $query_vec EF_RUNTIME {ef_runtime} AS score]"
[docs]
def embedding_to_blob(vec: np.ndarray | list[float]) -> bytes:
"""Serialize an embedding vector to a FLOAT32 blob for RediSearch.
Converts a numpy array or float list into the little-endian FLOAT32 byte
string that RediSearch expects for both stored ``embedding`` HASH fields and
the ``$query_vec`` query parameter. The dimension is validated against
:data:`VECTOR_DIM` up front so a misshapen vector fails loudly here rather
than producing a silently corrupt index entry or query.
This is a pure transform with no side effects. It is called by every store
helper in this module (:func:`store_tool_embedding_hash`,
:func:`store_skill_embedding_hash`, :func:`store_dangerous_cmd_embedding_hash`,
:func:`store_benign_tech_embedding_hash`) and by every KNN search helper to
encode the query vector, and is exercised directly by
``tests/test_vector_redisearch_knn.py``.
Args:
vec: Embedding as a numpy array or a list of floats.
Returns:
bytes: The vector encoded as a contiguous little-endian FLOAT32 blob.
Raises:
ValueError: If the vector's element count does not equal
:data:`VECTOR_DIM`.
"""
arr = np.asarray(vec, dtype=np.float32)
if arr.size != VECTOR_DIM:
raise ValueError(
f"Embedding dim {arr.size} != {VECTOR_DIM}",
)
return arr.tobytes()
def _doc_str(doc: Any, field: str) -> str:
"""Safely coerce a RediSearch result-document field to a ``str``.
Reads ``field`` off a document object returned in a ``FT.SEARCH`` result,
decoding ``bytes`` as UTF-8 (replacing undecodable bytes) and stringifying
anything else. Missing fields yield an empty string rather than raising, so
callers can treat absent and empty values uniformly.
This is a pure accessor with no side effects. It is called by the per-document
loops in :func:`knn_search_tools`, :func:`knn_search_skills`,
:func:`knn_search_dangerous_cmds`, and :func:`knn_search_benign_tech` to pull
out fields such as ``name``, ``skill_id``, ``category_id``, ``description``,
and ``meta_json`` from each match. (A similarly named ``_doc_str`` method
exists on the cache class in ``message_cache.py`` but is unrelated to this
module-level helper.)
Args:
doc: A RediSearch result document (attribute-style field access).
field: Name of the attribute/field to read from the document.
Returns:
str: The decoded/stringified field value, or ``""`` when the field is
absent or ``None``.
"""
raw = getattr(doc, field, None)
if raw is None:
return ""
if isinstance(raw, bytes):
return raw.decode("utf-8", errors="replace")
return str(raw)
[docs]
async def store_skill_embedding_hash(
redis: aioredis.Redis,
skill_id: str,
vec: np.ndarray,
meta: dict[str, Any],
) -> None:
"""Upsert the per-skill RediSearch HASH holding a skill's embedding.
Writes (or overwrites) the ``skill_emb:{skill_id}`` HASH with the FLOAT32
``embedding`` blob plus searchable ``skill_id``, ``name``, ``description``, and
JSON ``meta_json`` fields (the ``name`` and ``description`` falling back to the
skill id and empty string when absent from ``meta``). This registers the skill
as a document in the RediSearch skill index so it is returned by
:func:`knn_search_skills`, and is the inverse of
:func:`delete_skill_embedding_hash`.
The embedding is encoded via :func:`embedding_to_blob` and the function issues
a single ``HSET`` against Redis. It is called by
``classifiers/update_skill_embeddings.py`` during a skill-embedding rebuild, by
:func:`migrate_legacy_skill_hashes_to_redisearch` when porting legacy hashes,
and by ``tests/test_vector_redisearch_knn.py``.
Args:
redis: Async Redis client used to issue the write.
skill_id: Skill identifier, used as the key suffix and ``skill_id`` field.
vec: Embedding vector for the skill.
meta: Skill metadata; its ``name`` and ``description`` entries populate the
corresponding fields and the whole dict is serialized into
``meta_json``.
Returns:
None.
"""
key = f"{SKILL_EMB_PREFIX}{skill_id}"
blob = embedding_to_blob(vec)
mapping: dict[str | bytes, str | bytes] = {
"embedding": blob,
"skill_id": skill_id,
"name": meta.get("name", skill_id),
"description": meta.get("description", ""),
"meta_json": json.dumps(meta),
}
await redis.hset(key, mapping=mapping)
[docs]
async def delete_skill_embedding_hash(
redis: aioredis.Redis,
skill_id: str,
) -> None:
"""Delete the per-skill RediSearch HASH for ``skill_id``.
Removes the ``skill_emb:{skill_id}`` key (the inverse of
:func:`store_skill_embedding_hash`), dropping the document from the RediSearch
skill index so it is excluded from subsequent KNN queries.
This issues a single ``DEL`` against ``skill_emb:{skill_id}`` with no other
side effects. It is called by ``classifiers/update_skill_embeddings.py`` when
pruning orphaned skill embeddings during a rebuild (alongside ``HDEL`` on the
legacy monolithic hashes), and by ``tests/test_vector_redisearch_knn.py`` for
fixture cleanup.
Args:
redis: Async Redis client used to issue the deletion.
skill_id: Skill identifier whose ``skill_emb:`` HASH should be removed.
"""
await redis.delete(f"{SKILL_EMB_PREFIX}{skill_id}")
[docs]
async def store_dangerous_cmd_embedding_hash(
redis: aioredis.Redis,
category_id: str,
centroid: np.ndarray,
metadata: dict[str, Any],
) -> None:
"""Upsert the per-category dangerous-command RediSearch HASH.
Writes (or overwrites) the ``dangerous_cmd_emb:{category_id}`` HASH with the
FLOAT32 ``embedding`` blob, the ``category_id`` field, and a JSON ``meta_json``
blob, registering the category as a document in the dangerous-command index so
the guard can match against it via :func:`knn_search_dangerous_cmds`. This is
the inverse of :func:`delete_dangerous_cmd_embedding_hash`.
The centroid is encoded via :func:`embedding_to_blob` and the function issues a
single ``HSET`` against Redis. It is called by
``classifiers/update_dangerous_command_embeddings.py`` when (re)building the
dangerous-command corpus, and by ``tests/test_vector_redisearch_knn.py``.
Args:
redis: Async Redis client used to issue the write.
category_id: Dangerous-command category id, used as key suffix and field.
centroid: Centroid embedding for the category's example commands.
metadata: Category metadata serialized into ``meta_json``.
Returns:
None.
"""
key = f"{DANGEROUS_CMD_EMB_PREFIX}{category_id}"
blob = embedding_to_blob(centroid)
mapping: dict[str | bytes, str | bytes] = {
"embedding": blob,
"category_id": category_id,
"meta_json": json.dumps(metadata),
}
await redis.hset(key, mapping=mapping)
[docs]
async def delete_dangerous_cmd_embedding_hash(
redis: aioredis.Redis,
category_id: str,
) -> None:
"""Delete the per-category dangerous-command RediSearch HASH.
Removes the ``dangerous_cmd_emb:{category_id}`` key (the inverse of
:func:`store_dangerous_cmd_embedding_hash`), dropping the document from the
dangerous-command index so it no longer participates in the guard's KNN
matching.
This issues a single ``DEL`` against ``dangerous_cmd_emb:{category_id}`` with
no other side effects. It is called by
``classifiers/update_dangerous_command_embeddings.py`` when pruning orphaned
categories during a corpus rebuild, and by
``tests/test_vector_redisearch_knn.py`` for fixture cleanup.
Args:
redis: Async Redis client used to issue the deletion.
category_id: Dangerous-command category id whose ``dangerous_cmd_emb:``
HASH should be removed.
"""
await redis.delete(f"{DANGEROUS_CMD_EMB_PREFIX}{category_id}")
[docs]
async def store_benign_tech_embedding_hash(
redis: aioredis.Redis,
category_id: str,
centroid: np.ndarray,
metadata: dict[str, Any],
) -> None:
"""Upsert the per-category benign-technical RediSearch HASH.
Writes (or overwrites) the ``benign_tech_emb:{category_id}`` HASH with the
FLOAT32 ``embedding`` blob, the ``category_id`` field, and a JSON ``meta_json``
blob, registering the category as a document in the benign-technical index so
the guard can match against it via :func:`knn_search_benign_tech` (the
"looks dangerous but is actually benign" allow-list counterpart to the
dangerous-command index). This is the inverse of
:func:`delete_benign_tech_embedding_hash`.
The centroid is encoded via :func:`embedding_to_blob` and the function issues a
single ``HSET`` against Redis. It is called by
``classifiers/update_benign_technical_embeddings.py`` when (re)building the
benign-technical corpus, and by ``tests/test_vector_redisearch_knn.py``.
Args:
redis: Async Redis client used to issue the write.
category_id: Benign-technical category id, used as key suffix and field.
centroid: Centroid embedding for the category's example phrases.
metadata: Category metadata serialized into ``meta_json``.
Returns:
None.
"""
key = f"{BENIGN_TECH_EMB_PREFIX}{category_id}"
blob = embedding_to_blob(centroid)
mapping: dict[str | bytes, str | bytes] = {
"embedding": blob,
"category_id": category_id,
"meta_json": json.dumps(metadata),
}
await redis.hset(key, mapping=mapping)
[docs]
async def delete_benign_tech_embedding_hash(
redis: aioredis.Redis,
category_id: str,
) -> None:
"""Delete the per-category benign-technical RediSearch HASH.
Removes the ``benign_tech_emb:{category_id}`` key (the inverse of
:func:`store_benign_tech_embedding_hash`), dropping the document from the
benign-technical index so it no longer participates in KNN matching.
This issues a single ``DEL`` against ``benign_tech_emb:{category_id}`` with no
other side effects. It is called by
``classifiers/update_benign_technical_embeddings.py`` when pruning orphaned
categories during a rebuild, and by ``tests/test_vector_redisearch_knn.py``
for fixture cleanup.
Args:
redis: Async Redis client used to issue the deletion.
category_id: Benign-technical category id whose ``benign_tech_emb:`` HASH
should be removed.
"""
await redis.delete(f"{BENIGN_TECH_EMB_PREFIX}{category_id}")
[docs]
async def knn_search_skills(
redis: aioredis.Redis,
query_embedding: np.ndarray,
*,
knn_k: int,
ef_runtime: int = DEFAULT_KNN_EF_RUNTIME,
) -> list[dict[str, Any]]:
"""Find the skills whose embeddings are nearest a query vector.
Runs an approximate ``FT.SEARCH`` KNN query over the RediSearch skill index for
the ``knn_k`` nearest neighbours of ``query_embedding``, converting each match's
cosine distance into a cosine similarity (``1 - distance``) and parsing the
stored ``meta_json``. Each result's ``name`` and ``description`` come from the
indexed fields, falling back to ``meta_json`` (then the skill id / empty string)
when those fields are blank. Results are ordered descending by similarity, and
any RediSearch error is swallowed (logged at debug) and returns an empty list.
Internally it encodes the query via :func:`embedding_to_blob`, builds the clause
with :func:`_knn_clause`, and reads fields through :func:`_doc_str`. It is
called by the skill vector classifier in ``classifiers/vector_classifier.py``
to surface candidate skills, and by ``tests/test_vector_redisearch_knn.py``.
Args:
redis: Async Redis client used to run the search.
query_embedding: Query embedding to match against skill embeddings.
knn_k: Number of nearest neighbours to request and page size.
ef_runtime: HNSW query-time exploration factor; defaults to
:data:`DEFAULT_KNN_EF_RUNTIME`.
Returns:
list[dict[str, Any]]: Per-match dicts with ``skill_id``, ``name``,
``description``, ``score`` (cosine similarity), and ``metadata``, ordered
most-similar first; empty on any search failure.
"""
blob = embedding_to_blob(query_embedding)
knn = _knn_clause(knn_k, ef_runtime)
q = (
Query(knn)
.sort_by("score")
.return_fields(
"skill_id",
"name",
"description",
"meta_json",
"score",
)
.paging(0, knn_k)
.dialect(2)
)
try:
result = await redis.ft(SKILL_INDEX_NAME).search(
q,
query_params={"query_vec": blob},
)
except Exception:
logger.debug("RediSearch skill KNN failed", exc_info=True)
return []
out: list[dict[str, Any]] = []
for doc in result.docs:
dist_raw = getattr(doc, "score", None)
cosine_dist = float(dist_raw) if dist_raw is not None else 1.0
similarity = 1.0 - cosine_dist
sid = _doc_str(doc, "skill_id")
meta_raw = _doc_str(doc, "meta_json")
meta: dict[str, Any] = {}
if meta_raw:
try:
meta = json.loads(meta_raw)
except Exception:
meta = {}
out.append(
{
"skill_id": sid,
"name": _doc_str(doc, "name") or meta.get("name", sid),
"description": _doc_str(doc, "description")
or meta.get(
"description",
"",
),
"score": similarity,
"metadata": meta,
}
)
return out
[docs]
async def knn_search_dangerous_cmds(
redis: aioredis.Redis,
query_embedding: np.ndarray,
*,
knn_k: int,
ef_runtime: int = DEFAULT_KNN_EF_RUNTIME,
) -> list[dict[str, Any]]:
"""Find the dangerous-command categories nearest a query vector.
Runs an approximate ``FT.SEARCH`` KNN query over the dangerous-command index
for the ``knn_k`` nearest neighbours of ``query_embedding``, converting each
match's cosine distance into a cosine similarity (``1 - distance``) and parsing
the stored ``meta_json``. Results are ordered descending by similarity, and any
RediSearch error is swallowed (logged at debug) and returns an empty list so a
missing index fails open rather than crashing the guard.
Internally it encodes the query via :func:`embedding_to_blob`, builds the clause
with :func:`_knn_clause`, and reads fields through :func:`_doc_str`. It is
called by ``classifiers/dangerous_command_guard.py`` to decide whether a
candidate command resembles a known dangerous category, and by
``tests/test_vector_redisearch_knn.py``.
Args:
redis: Async Redis client used to run the search.
query_embedding: Query embedding for the command being screened.
knn_k: Number of nearest neighbours to request and page size.
ef_runtime: HNSW query-time exploration factor; defaults to
:data:`DEFAULT_KNN_EF_RUNTIME`.
Returns:
list[dict[str, Any]]: Per-match dicts with ``category_id``, ``score``
(cosine similarity), and ``metadata``, ordered most-similar first; empty on
any search failure.
"""
blob = embedding_to_blob(query_embedding)
knn = _knn_clause(knn_k, ef_runtime)
q = (
Query(knn)
.sort_by("score")
.return_fields("category_id", "meta_json", "score")
.paging(0, knn_k)
.dialect(2)
)
try:
result = await redis.ft(DANGEROUS_CMD_INDEX_NAME).search(
q,
query_params={"query_vec": blob},
)
except Exception:
logger.debug("RediSearch dangerous-cmd KNN failed", exc_info=True)
return []
out: list[dict[str, Any]] = []
for doc in result.docs:
dist_raw = getattr(doc, "score", None)
cosine_dist = float(dist_raw) if dist_raw is not None else 1.0
similarity = 1.0 - cosine_dist
cid = _doc_str(doc, "category_id")
meta_raw = _doc_str(doc, "meta_json")
meta: dict[str, Any] = {}
if meta_raw:
try:
meta = json.loads(meta_raw)
except Exception:
meta = {}
out.append(
{
"category_id": cid,
"score": similarity,
"metadata": meta,
}
)
return out
[docs]
async def knn_search_benign_tech(
redis: aioredis.Redis,
query_embedding: np.ndarray,
*,
knn_k: int,
ef_runtime: int = DEFAULT_KNN_EF_RUNTIME,
) -> list[dict[str, Any]]:
"""Find the benign-technical categories nearest a query vector.
Runs an approximate ``FT.SEARCH`` KNN query over the benign-technical index for
the ``knn_k`` nearest neighbours of ``query_embedding``, converting each match's
cosine distance into a cosine similarity (``1 - distance``) and parsing the
stored ``meta_json``. This is the allow-list counterpart to
:func:`knn_search_dangerous_cmds`: a strong benign match lets the guard clear a
command that merely looks dangerous. Results are ordered descending by
similarity, and any RediSearch error is swallowed (logged at debug) and returns
an empty list.
Internally it encodes the query via :func:`embedding_to_blob`, builds the clause
with :func:`_knn_clause`, and reads fields through :func:`_doc_str`. It is
called by ``classifiers/dangerous_command_guard.py`` (typically with
``knn_k=1`` for the nearest benign category) and by
``tests/test_vector_redisearch_knn.py``.
Args:
redis: Async Redis client used to run the search.
query_embedding: Query embedding for the command being screened.
knn_k: Number of nearest neighbours to request and page size.
ef_runtime: HNSW query-time exploration factor; defaults to
:data:`DEFAULT_KNN_EF_RUNTIME`.
Returns:
list[dict[str, Any]]: Per-match dicts with ``category_id``, ``score``
(cosine similarity), and ``metadata``, ordered most-similar first; empty on
any search failure.
"""
blob = embedding_to_blob(query_embedding)
knn = _knn_clause(knn_k, ef_runtime)
q = (
Query(knn)
.sort_by("score")
.return_fields("category_id", "meta_json", "score")
.paging(0, knn_k)
.dialect(2)
)
try:
result = await redis.ft(BENIGN_TECH_INDEX_NAME).search(
q,
query_params={"query_vec": blob},
)
except Exception:
logger.debug("RediSearch benign-tech KNN failed", exc_info=True)
return []
out: list[dict[str, Any]] = []
for doc in result.docs:
dist_raw = getattr(doc, "score", None)
cosine_dist = float(dist_raw) if dist_raw is not None else 1.0
similarity = 1.0 - cosine_dist
cid = _doc_str(doc, "category_id")
meta_raw = _doc_str(doc, "meta_json")
meta: dict[str, Any] = {}
if meta_raw:
try:
meta = json.loads(meta_raw)
except Exception:
meta = {}
out.append(
{
"category_id": cid,
"score": similarity,
"metadata": meta,
}
)
return out
[docs]
async def redisearch_index_doc_count(
redis: aioredis.Redis,
index_name: str,
) -> int:
"""Report how many documents a RediSearch index currently holds.
Calls ``FT.INFO`` on ``index_name`` and reads back the ``num_docs`` field,
coercing whatever Redis returns (bytes or str keys, string counts) into an
``int``. It is the cheap liveness/population check the classifiers and guard run
before issuing a KNN query, so they can skip the search entirely when an index
is empty or absent. Every failure path -- a missing index, a non-dict reply, a
missing or unparseable ``num_docs`` -- returns ``-1`` rather than raising.
The only side effect is the single ``FT.INFO`` round trip. It is called by the
tool and skill vector classifiers in ``classifiers/vector_classifier.py``, by
``classifiers/dangerous_command_guard.py`` (for both the dangerous-command and
benign-technical indexes), by ``tools/search_tools.py``, and by
``tests/test_vector_redisearch_knn.py``.
Args:
redis: Async Redis client used to run ``FT.INFO``.
index_name: Name of the RediSearch index to inspect.
Returns:
int: The approximate indexed document count, or ``-1`` if the index is
missing or the count cannot be determined.
"""
try:
info = await redis.ft(index_name).info()
except Exception:
return -1
if not isinstance(info, dict):
return -1
for k, v in info.items():
ks = k.decode("utf-8") if isinstance(k, bytes) else str(k)
if ks == "num_docs":
try:
return int(v)
except (TypeError, ValueError):
pass
return -1
[docs]
async def scan_dangerous_cmd_category_ids(redis: aioredis.Redis) -> list[str]:
"""Enumerate every dangerous-command category id with a stored embedding.
Iterates the keyspace with a cursor-based ``SCAN`` over the
``dangerous_cmd_emb:*`` pattern (in batches of 500), strips the
:data:`DANGEROUS_CMD_EMB_PREFIX` off each matching key, and returns the
deduplicated, sorted category ids. ``SCAN`` keeps the walk non-blocking on a
large keyspace.
The only side effect is the sequence of ``SCAN`` calls against Redis. It is
called by ``classifiers/update_dangerous_command_embeddings.py`` to learn which
categories already exist so it can prune ones that have been removed from the
source corpus.
Args:
redis: Async Redis client used to scan the keyspace.
Returns:
list[str]: Sorted, de-duplicated dangerous-command category ids derived
from the ``dangerous_cmd_emb:`` HASH keys.
"""
names: list[str] = []
cur: Any = 0
prefix = f"{DANGEROUS_CMD_EMB_PREFIX}*"
while True:
cur, keys = await redis.scan(cur, match=prefix, count=500)
for k in keys:
ks = (
k.decode("utf-8", errors="replace")
if isinstance(
k,
bytes,
)
else str(k)
)
if ks.startswith(DANGEROUS_CMD_EMB_PREFIX):
names.append(ks[len(DANGEROUS_CMD_EMB_PREFIX) :])
if cur == 0:
break
return sorted(set(names))
[docs]
async def scan_benign_tech_category_ids(redis: aioredis.Redis) -> list[str]:
"""Enumerate every benign-technical category id with a stored embedding.
Iterates the keyspace with a cursor-based ``SCAN`` over the
``benign_tech_emb:*`` pattern (in batches of 500), strips the
:data:`BENIGN_TECH_EMB_PREFIX` off each matching key, and returns the
deduplicated, sorted category ids. ``SCAN`` keeps the walk non-blocking on a
large keyspace.
The only side effect is the sequence of ``SCAN`` calls against Redis. It is
called by ``classifiers/update_benign_technical_embeddings.py`` to learn which
categories already exist so it can prune ones that have been removed from the
source corpus.
Args:
redis: Async Redis client used to scan the keyspace.
Returns:
list[str]: Sorted, de-duplicated benign-technical category ids derived from
the ``benign_tech_emb:`` HASH keys.
"""
names: list[str] = []
cur: Any = 0
prefix = f"{BENIGN_TECH_EMB_PREFIX}*"
while True:
cur, keys = await redis.scan(cur, match=prefix, count=500)
for k in keys:
ks = (
k.decode("utf-8", errors="replace")
if isinstance(
k,
bytes,
)
else str(k)
)
if ks.startswith(BENIGN_TECH_EMB_PREFIX):
names.append(ks[len(BENIGN_TECH_EMB_PREFIX) :])
if cur == 0:
break
return sorted(set(names))
[docs]
async def migrate_legacy_skill_hashes_to_redisearch(
redis: aioredis.Redis,
*,
embeddings_key: str,
metadata_key: str,
) -> int:
"""Backfill per-skill RediSearch HASHs from the legacy monolithic skill hashes.
Reads the two legacy monolithic hashes -- ``embeddings_key`` (field per skill
-> JSON vector) and ``metadata_key`` (field per skill -> JSON metadata) -- and,
for each skill, decodes its vector into a FLOAT32 numpy array and writes a
per-key ``skill_emb:`` HASH via :func:`store_skill_embedding_hash`, migrating
the data into the RediSearch-indexed layout. Skills whose vector JSON fails to
parse are skipped; metadata lookups tolerate both bytes and str field keys and
fall back to ``{"skill_id": sid}`` when absent or unparseable. This is the skill
counterpart to :func:`migrate_legacy_tool_hashes_to_redisearch`.
Side effects are the two ``HGETALL`` reads plus one ``HSET`` per migrated skill,
and an info-level log of the count. It is called by the one-shot migration
script ``classifiers/migrate_embeddings_redisearch.py``.
Args:
redis: Async Redis client used for the reads and writes.
embeddings_key: Legacy monolithic hash mapping skill id to JSON vector.
metadata_key: Legacy monolithic hash mapping skill id to JSON metadata.
Returns:
int: The number of per-skill HASHs written (0 if the legacy embeddings hash
is empty or missing).
"""
emb_raw = await redis.hgetall(embeddings_key)
meta_raw = await redis.hgetall(metadata_key)
if not emb_raw:
return 0
n = 0
for sid_b, vec_json_b in emb_raw.items():
sid = sid_b.decode("utf-8") if isinstance(sid_b, bytes) else str(sid_b)
vec_json = (
vec_json_b.decode("utf-8")
if isinstance(vec_json_b, bytes)
else str(vec_json_b)
)
try:
vec_list = json.loads(vec_json)
vec = np.array(vec_list, dtype=np.float32)
except Exception:
continue
mb = (
meta_raw.get(sid_b)
or meta_raw.get(sid)
or meta_raw.get(sid.encode("utf-8"))
)
meta: dict[str, Any] = {"skill_id": sid}
if mb:
try:
mjs = mb.decode("utf-8") if isinstance(mb, bytes) else str(mb)
meta = json.loads(mjs)
except Exception:
pass
await store_skill_embedding_hash(redis, sid, vec, meta)
n += 1
logger.info(
"Migrated %d skill embeddings to RediSearch HASH keys",
n,
)
return n