Source code for init_redis_indexes

"""Create RediSearch HNSW vector indexes for messages.

Run standalone::

    python init_redis_indexes.py [--redis-url redis://localhost:6379/0]

Or import and call during bot startup::

    from init_redis_indexes import ensure_indexes
    await ensure_indexes(redis_client)

Requires the **RediSearch** module to be loaded on the Redis server
(e.g. FalkorDB image, Redis Stack, or ``redis-server --loadmodule redisearch.so``).

Note: The knowledge-graph indexes (entities/relationships) are managed
by :class:`knowledge_graph.KnowledgeGraphManager` via FalkorDB Cypher
``CREATE VECTOR INDEX`` commands, not by this module.
"""

from __future__ import annotations

import argparse
import asyncio
import logging
import os
import sys

import redis.asyncio as aioredis
from redis.commands.search.field import (
    NumericField,
    TagField,
    TextField,
    VectorField,
)
from redis.commands.search.index_definition import IndexDefinition, IndexType

logger = logging.getLogger(__name__)

VECTOR_DIM = 3072
DISTANCE_METRIC = "COSINE"

MSG_INDEX_NAME = "idx:messages"
MSG_PREFIX = "msg:"

HNSW_PARAMS = {
    "TYPE": "FLOAT32",
    "DIM": VECTOR_DIM,
    "DISTANCE_METRIC": DISTANCE_METRIC,
    "M": 16,
    "EF_CONSTRUCTION": 200,
}


async def _create_index(
    r: aioredis.Redis,
    index_name: str,
    prefix: str,
    schema: tuple,
) -> bool:
    """Create a single RediSearch index, returning True if created."""
    definition = IndexDefinition(
        prefix=[prefix],
        index_type=IndexType.HASH,
    )
    try:
        await r.ft(index_name).create_index(
            fields=schema,
            definition=definition,
        )
        logger.info(
            "Created RediSearch index '%s' (prefix=%s)",
            index_name, prefix,
        )
        return True
    except Exception as exc:
        if "Index already exists" in str(exc):
            logger.info("RediSearch index '%s' already exists", index_name)
            return False
        raise


[docs] async def ensure_indexes(r: aioredis.Redis) -> dict[str, bool]: """Ensure the message RediSearch index exists. Returns a dict mapping index name to whether it was freshly created. Note: Knowledge-graph vector indexes are managed by :class:`knowledge_graph.KnowledgeGraphManager` via FalkorDB Cypher. """ msg_schema = ( VectorField( "embedding", "HNSW", HNSW_PARAMS, ), TagField("platform"), TagField("channel_id"), TagField("user_id"), TextField("user_name", no_stem=True), TextField("text"), NumericField("timestamp", sortable=True), ) results: dict[str, bool] = {} results[MSG_INDEX_NAME] = await _create_index( r, MSG_INDEX_NAME, MSG_PREFIX, msg_schema, ) return results
async def _main(redis_url: str, ssl_kwargs: dict | None = None) -> None: """Internal helper: main. Args: redis_url (str): The redis url value. ssl_kwargs (dict | None): Optional SSL/mTLS keyword arguments. """ r = aioredis.from_url(redis_url, decode_responses=False, **(ssl_kwargs or {})) try: await r.ping() logger.info("Connected to Redis at %s", redis_url) results = await ensure_indexes(r) for name, created in results.items(): status = "CREATED" if created else "already exists" print(f" {name}: {status}") finally: await r.aclose() if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) parser = argparse.ArgumentParser( description="Create RediSearch vector indexes for Stargazer v3", ) parser.add_argument( "--redis-url", default=os.environ.get( "REDIS_URL", "redis://localhost:6379/0", ), help=( "Redis connection URL " "(default: $REDIS_URL or redis://localhost:6379/0)" ), ) args = parser.parse_args() try: from config import Config _cfg = Config.load() _ssl = _cfg.redis_ssl_kwargs() except Exception: _ssl = {} print(f"Initializing RediSearch indexes on {args.redis_url} ...") try: asyncio.run(_main(args.redis_url, ssl_kwargs=_ssl)) print("Done.") except Exception as exc: print(f"ERROR: {exc}", file=sys.stderr) sys.exit(1)