Source code for init_redis_indexes

"""Create the RediSearch indexes used across Stargazer v3.

``ensure_indexes`` creates eight indexes: the message HNSW vector index
plus the tool/skill/dangerous-command/benign-tech centroid vector indexes
and the observability, debug-observability, and LLM-request event-log
indexes.

Run standalone::

    python init_redis_indexes.py [--redis-url redis://localhost:6379/0]

Or import and call during bot startup::

    from init_redis_indexes import ensure_indexes
    await ensure_indexes(redis_client)

Requires the **RediSearch** module to be loaded on the Redis server
(e.g. FalkorDB image, Redis Stack, or ``redis-server --loadmodule redisearch.so``).

Note: The knowledge-graph indexes (entities/relationships) are managed
by :class:`knowledge_graph.KnowledgeGraphManager` via FalkorDB Cypher
``CREATE VECTOR INDEX`` commands, not by this module.
"""

from __future__ import annotations

import argparse
import asyncio
import logging
import os
import sys

import redis.asyncio as aioredis
from redis.commands.search.field import (
    NumericField,
    TagField,
    TextField,
    VectorField,
)
from redis.commands.search.index_definition import IndexDefinition, IndexType

logger = logging.getLogger(__name__)

VECTOR_DIM = 3072
DISTANCE_METRIC = "COSINE"

MSG_INDEX_NAME = "idx:messages"
MSG_PREFIX = "msg:"

# Tool / skill centroid vectors (vector classifier). One HASH per item.
TOOL_INDEX_NAME = "idx:tool_embeddings"
TOOL_EMB_PREFIX = "tool_emb:"
SKILL_INDEX_NAME = "idx:skill_embeddings"
SKILL_EMB_PREFIX = "skill_emb:"
# Destructive-command centroid vectors (similarity guard). One HASH per category.
DANGEROUS_CMD_INDEX_NAME = "idx:dangerous_cmds"
DANGEROUS_CMD_EMB_PREFIX = "dangerous_cmd_emb:"
# Benign technical discussion centroids (counter-centroid for dangerous guard).
BENIGN_TECH_INDEX_NAME = "idx:benign_tech"
BENIGN_TECH_EMB_PREFIX = "benign_tech_emb:"
# Admin observability timeline (HASH per event; queried via FT.SEARCH).
OBS_INDEX_NAME = "idx:observability"
OBS_PREFIX = "obs:"

# Debug observability store — 1-hour TTL forensic event log (§1.1)
# llm_output and payload_json are stored in the HASH but intentionally
# omitted from the FT schema to prevent index memory bloat (A1 fix).
OBS_DEBUG_INDEX_NAME = "idx:obs_debug"
OBS_DEBUG_PREFIX = "obs_debug:"
OBS_DEBUG_TTL = 3600  # 1 hour

# LLM request event log — pagination index for the admin UI.
LLM_REQ_INDEX_NAME = "idx:llm_req"
LLM_REQ_PREFIX = "llm_req:"

# §1.1 — idx:obs_debug
# TagFields: event_type, subsystem, platform, channel_id, hub_id,
#            request_id, run_id, phase, status
# TextField: preview (no_stem — exact-match friendly)
# NumericFields: timestamp (sortable), duration_ms (sortable), token_est (sortable)
# NOTE: llm_output and payload_json are stored-only in the HASH; not indexed.
obs_debug_schema = (
    TagField("event_type"),
    TagField("subsystem"),
    TagField("platform"),
    TagField("channel_id"),
    TagField("hub_id"),
    TagField("request_id"),
    TagField("run_id"),
    TagField("phase"),
    TagField("status"),
    TextField("preview", no_stem=True),
    NumericField("timestamp", sortable=True),
    NumericField("duration_ms", sortable=True),
    NumericField("token_est", sortable=True),
)

HNSW_PARAMS = {
    "TYPE": "FLOAT32",
    "DIM": VECTOR_DIM,
    "DISTANCE_METRIC": DISTANCE_METRIC,
    "M": 16,
    "EF_CONSTRUCTION": 200,
}


async def _create_index(
    r: aioredis.Redis,
    index_name: str,
    prefix: str,
    schema: tuple,
) -> bool:
    """Create a single RediSearch index, returning True if created.

    Issues one ``FT.CREATE`` against the Redis server for *index_name* over the
    given key *prefix* and field *schema*, building a HASH-type
    :class:`~redis.commands.search.index_definition.IndexDefinition`. Treats an
    "Index already exists" error as a benign no-op (returns ``False``) so the
    call is idempotent and safe to run on every startup; any other failure is
    re-raised. Requires the RediSearch module to be loaded on the server.

    Touches Redis only (no KG/LLM/filesystem). Called concurrently by
    :func:`ensure_indexes` -- once per entry in its ``index_specs`` list -- via
    :func:`asyncio.gather`.

    Args:
        r: The async Redis client to issue ``FT.CREATE`` against.
        index_name: Name of the RediSearch index to create (e.g.
            ``idx:messages``).
        prefix: Key prefix the index covers (e.g. ``msg:``).
        schema: Tuple of RediSearch field definitions describing the index.

    Returns:
        bool: ``True`` if the index was freshly created, ``False`` if it already
        existed.

    Raises:
        Exception: Re-raised for any ``FT.CREATE`` failure other than the index
            already existing (e.g. RediSearch not loaded).
    """
    definition = IndexDefinition(
        prefix=[prefix],
        index_type=IndexType.HASH,
    )
    try:
        await r.ft(index_name).create_index(
            fields=schema,
            definition=definition,
        )
        logger.info(
            "Created RediSearch index '%s' (prefix=%s)",
            index_name,
            prefix,
        )
        return True
    except Exception as exc:
        if "Index already exists" in str(exc):
            logger.info("RediSearch index '%s' already exists", index_name)
            return False
        raise


[docs] async def ensure_indexes(r: aioredis.Redis) -> dict[str, bool]: """Ensure all eight Stargazer RediSearch indexes exist. Creates the message vector index plus the tool, skill, dangerous-command, benign-tech, observability, debug-observability, and LLM-request indexes. Returns a dict mapping index name to whether it was freshly created. Note: Knowledge-graph vector indexes are managed by :class:`knowledge_graph.KnowledgeGraphManager` via FalkorDB Cypher. """ msg_schema = ( VectorField( "embedding", "HNSW", HNSW_PARAMS, ), TagField("platform"), TagField("channel_id"), TagField("user_id"), TagField("kind"), TextField("user_name", no_stem=True), TextField("text"), NumericField("timestamp", sortable=True), ) tool_schema = ( VectorField("embedding", "HNSW", HNSW_PARAMS), TagField("name"), TextField("meta_json", no_stem=True), ) skill_schema = ( VectorField("embedding", "HNSW", HNSW_PARAMS), TagField("skill_id"), TextField("name", no_stem=True), TextField("description", no_stem=True), TextField("meta_json", no_stem=True), ) dangerous_cmd_schema = ( VectorField("embedding", "HNSW", HNSW_PARAMS), TagField("category_id"), TextField("meta_json", no_stem=True), ) obs_schema = ( TagField("event_type"), TagField("platform"), TagField("channel_id"), TagField("user_id"), TagField("tool_name"), TagField("request_id"), TextField("preview", no_stem=True), NumericField("timestamp", sortable=True), NumericField("http_status", sortable=True), TagField("http_service"), ) llm_req_schema = ( TagField("channel_id", separator="|"), TagField("platform", separator="|"), TagField("model", separator="|"), NumericField("timestamp", sortable=True), NumericField("http_status", sortable=True), ) # Each index has a distinct name and "Index already exists" is handled # inside _create_index, so the eight FT.CREATE commands are independent # and race-safe to issue concurrently. index_specs: list[tuple[str, str, tuple]] = [ (MSG_INDEX_NAME, MSG_PREFIX, msg_schema), (TOOL_INDEX_NAME, TOOL_EMB_PREFIX, tool_schema), (SKILL_INDEX_NAME, SKILL_EMB_PREFIX, skill_schema), (DANGEROUS_CMD_INDEX_NAME, DANGEROUS_CMD_EMB_PREFIX, dangerous_cmd_schema), (BENIGN_TECH_INDEX_NAME, BENIGN_TECH_EMB_PREFIX, dangerous_cmd_schema), (OBS_INDEX_NAME, OBS_PREFIX, obs_schema), (OBS_DEBUG_INDEX_NAME, OBS_DEBUG_PREFIX, obs_debug_schema), (LLM_REQ_INDEX_NAME, LLM_REQ_PREFIX, llm_req_schema), ] created_flags = await asyncio.gather( *( _create_index(r, name, prefix, schema) for name, prefix, schema in index_specs ) ) results: dict[str, bool] = { spec[0]: created for spec, created in zip(index_specs, created_flags) } await _try_alter_messages_kind(r) return results
async def _try_alter_messages_kind(r: aioredis.Redis) -> None: """Add the ``kind`` TAG to ``idx:messages`` when the index predates that field. Best-effort migration for deployments whose message index was created before the ``kind`` tag existed: issues an ``FT.ALTER`` to add the field so existing indexes gain it without a drop/rebuild. Freshly created indexes already include ``kind`` via the message schema in :func:`ensure_indexes`, so the expected "already exists"/"duplicate" errors (and "unknown index" when the index is absent) are swallowed silently; other errors are logged at debug level and not re-raised, keeping startup resilient. Touches Redis only. Called once at the end of :func:`ensure_indexes`, after the batch of ``FT.CREATE`` commands completes. Args: r: The async Redis client to issue ``FT.ALTER`` against. """ try: await r.ft(MSG_INDEX_NAME).alter_index( TagField("kind"), ) logger.info("Altered RediSearch index '%s': added field 'kind'", MSG_INDEX_NAME) except Exception as exc: err = str(exc).lower() if "already" in err or "duplicate" in err or "unknown index" in err: return logger.debug( "idx:messages ALTER for kind skipped (%s)", exc, ) async def _main(redis_url: str, ssl_kwargs: dict | None = None) -> None: """Connect to Redis, ensure all indexes, and print a per-index status report. The async body of the standalone CLI: builds a binary (``decode_responses=False``) async Redis client from *redis_url* (and any *ssl_kwargs*), pings to verify connectivity, runs :func:`ensure_indexes` to create every Stargazer RediSearch index, and prints ``CREATED`` / ``already exists`` for each. The client is always closed in the ``finally`` block. Opens a Redis connection and writes to stdout; performs no KG/LLM/filesystem work beyond that. Called only from this module's ``__main__`` block via :func:`asyncio.run` when the file is executed directly. Args: redis_url: The Redis connection URL to connect to. ssl_kwargs: Optional SSL/mTLS connection keyword arguments derived from :class:`config.Config`; ``None`` for a plain connection. """ r = aioredis.from_url(redis_url, decode_responses=False, **(ssl_kwargs or {})) try: await r.ping() logger.info("Connected to Redis at %s", redis_url) results = await ensure_indexes(r) for name, created in results.items(): status = "CREATED" if created else "already exists" print(f" {name}: {status}") finally: await r.aclose() if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) parser = argparse.ArgumentParser( description="Create RediSearch vector indexes for Stargazer v3", ) parser.add_argument( "--redis-url", default=os.environ.get( "REDIS_URL", "redis://localhost:6379/0", ), help=( "Redis connection URL " "(default: $REDIS_URL or redis://localhost:6379/0)" ), ) args = parser.parse_args() try: from config import Config _cfg = Config.load() _ssl = _cfg.redis_connection_kwargs_for_url(args.redis_url) except Exception: _ssl = {} print(f"Initializing RediSearch indexes on {args.redis_url} ...") try: asyncio.run(_main(args.redis_url, ssl_kwargs=_ssl)) print("Done.") except Exception as exc: print(f"ERROR: {exc}", file=sys.stderr) sys.exit(1)