"""Create RediSearch HNSW vector indexes for messages.
Run standalone::
python init_redis_indexes.py [--redis-url redis://localhost:6379/0]
Or import and call during bot startup::
from init_redis_indexes import ensure_indexes
await ensure_indexes(redis_client)
Requires the **RediSearch** module to be loaded on the Redis server
(e.g. FalkorDB image, Redis Stack, or ``redis-server --loadmodule redisearch.so``).
Note: The knowledge-graph indexes (entities/relationships) are managed
by :class:`knowledge_graph.KnowledgeGraphManager` via FalkorDB Cypher
``CREATE VECTOR INDEX`` commands, not by this module.
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
import redis.asyncio as aioredis
from redis.commands.search.field import (
NumericField,
TagField,
TextField,
VectorField,
)
from redis.commands.search.index_definition import IndexDefinition, IndexType
logger = logging.getLogger(__name__)
VECTOR_DIM = 3072
DISTANCE_METRIC = "COSINE"
MSG_INDEX_NAME = "idx:messages"
MSG_PREFIX = "msg:"
HNSW_PARAMS = {
"TYPE": "FLOAT32",
"DIM": VECTOR_DIM,
"DISTANCE_METRIC": DISTANCE_METRIC,
"M": 16,
"EF_CONSTRUCTION": 200,
}
async def _create_index(
r: aioredis.Redis,
index_name: str,
prefix: str,
schema: tuple,
) -> bool:
"""Create a single RediSearch index, returning True if created."""
definition = IndexDefinition(
prefix=[prefix],
index_type=IndexType.HASH,
)
try:
await r.ft(index_name).create_index(
fields=schema,
definition=definition,
)
logger.info(
"Created RediSearch index '%s' (prefix=%s)",
index_name, prefix,
)
return True
except Exception as exc:
if "Index already exists" in str(exc):
logger.info("RediSearch index '%s' already exists", index_name)
return False
raise
[docs]
async def ensure_indexes(r: aioredis.Redis) -> dict[str, bool]:
"""Ensure the message RediSearch index exists.
Returns a dict mapping index name to whether it was freshly created.
Note: Knowledge-graph vector indexes are managed by
:class:`knowledge_graph.KnowledgeGraphManager` via FalkorDB Cypher.
"""
msg_schema = (
VectorField(
"embedding", "HNSW", HNSW_PARAMS,
),
TagField("platform"),
TagField("channel_id"),
TagField("user_id"),
TextField("user_name", no_stem=True),
TextField("text"),
NumericField("timestamp", sortable=True),
)
results: dict[str, bool] = {}
results[MSG_INDEX_NAME] = await _create_index(
r, MSG_INDEX_NAME, MSG_PREFIX, msg_schema,
)
return results
async def _main(redis_url: str, ssl_kwargs: dict | None = None) -> None:
"""Internal helper: main.
Args:
redis_url (str): The redis url value.
ssl_kwargs (dict | None): Optional SSL/mTLS keyword arguments.
"""
r = aioredis.from_url(redis_url, decode_responses=False, **(ssl_kwargs or {}))
try:
await r.ping()
logger.info("Connected to Redis at %s", redis_url)
results = await ensure_indexes(r)
for name, created in results.items():
status = "CREATED" if created else "already exists"
print(f" {name}: {status}")
finally:
await r.aclose()
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
parser = argparse.ArgumentParser(
description="Create RediSearch vector indexes for Stargazer v3",
)
parser.add_argument(
"--redis-url",
default=os.environ.get(
"REDIS_URL", "redis://localhost:6379/0",
),
help=(
"Redis connection URL "
"(default: $REDIS_URL or redis://localhost:6379/0)"
),
)
args = parser.parse_args()
try:
from config import Config
_cfg = Config.load()
_ssl = _cfg.redis_ssl_kwargs()
except Exception:
_ssl = {}
print(f"Initializing RediSearch indexes on {args.redis_url} ...")
try:
asyncio.run(_main(args.redis_url, ssl_kwargs=_ssl))
print("Done.")
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
sys.exit(1)