"""Create the RediSearch indexes used across Stargazer v3.
``ensure_indexes`` creates eight indexes: the message HNSW vector index
plus the tool/skill/dangerous-command/benign-tech centroid vector indexes
and the observability, debug-observability, and LLM-request event-log
indexes.
Run standalone::
python init_redis_indexes.py [--redis-url redis://localhost:6379/0]
Or import and call during bot startup::
from init_redis_indexes import ensure_indexes
await ensure_indexes(redis_client)
Requires the **RediSearch** module to be loaded on the Redis server
(e.g. FalkorDB image, Redis Stack, or ``redis-server --loadmodule redisearch.so``).
Note: The knowledge-graph indexes (entities/relationships) are managed
by :class:`knowledge_graph.KnowledgeGraphManager` via FalkorDB Cypher
``CREATE VECTOR INDEX`` commands, not by this module.
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
import redis.asyncio as aioredis
from redis.commands.search.field import (
NumericField,
TagField,
TextField,
VectorField,
)
from redis.commands.search.index_definition import IndexDefinition, IndexType
logger = logging.getLogger(__name__)
VECTOR_DIM = 3072
DISTANCE_METRIC = "COSINE"
MSG_INDEX_NAME = "idx:messages"
MSG_PREFIX = "msg:"
# Tool / skill centroid vectors (vector classifier). One HASH per item.
TOOL_INDEX_NAME = "idx:tool_embeddings"
TOOL_EMB_PREFIX = "tool_emb:"
SKILL_INDEX_NAME = "idx:skill_embeddings"
SKILL_EMB_PREFIX = "skill_emb:"
# Destructive-command centroid vectors (similarity guard). One HASH per category.
DANGEROUS_CMD_INDEX_NAME = "idx:dangerous_cmds"
DANGEROUS_CMD_EMB_PREFIX = "dangerous_cmd_emb:"
# Benign technical discussion centroids (counter-centroid for dangerous guard).
BENIGN_TECH_INDEX_NAME = "idx:benign_tech"
BENIGN_TECH_EMB_PREFIX = "benign_tech_emb:"
# Admin observability timeline (HASH per event; queried via FT.SEARCH).
OBS_INDEX_NAME = "idx:observability"
OBS_PREFIX = "obs:"
# Debug observability store — 1-hour TTL forensic event log (§1.1)
# llm_output and payload_json are stored in the HASH but intentionally
# omitted from the FT schema to prevent index memory bloat (A1 fix).
OBS_DEBUG_INDEX_NAME = "idx:obs_debug"
OBS_DEBUG_PREFIX = "obs_debug:"
OBS_DEBUG_TTL = 3600 # 1 hour
# LLM request event log — pagination index for the admin UI.
LLM_REQ_INDEX_NAME = "idx:llm_req"
LLM_REQ_PREFIX = "llm_req:"
# §1.1 — idx:obs_debug
# TagFields: event_type, subsystem, platform, channel_id, hub_id,
# request_id, run_id, phase, status
# TextField: preview (no_stem — exact-match friendly)
# NumericFields: timestamp (sortable), duration_ms (sortable), token_est (sortable)
# NOTE: llm_output and payload_json are stored-only in the HASH; not indexed.
obs_debug_schema = (
TagField("event_type"),
TagField("subsystem"),
TagField("platform"),
TagField("channel_id"),
TagField("hub_id"),
TagField("request_id"),
TagField("run_id"),
TagField("phase"),
TagField("status"),
TextField("preview", no_stem=True),
NumericField("timestamp", sortable=True),
NumericField("duration_ms", sortable=True),
NumericField("token_est", sortable=True),
)
HNSW_PARAMS = {
"TYPE": "FLOAT32",
"DIM": VECTOR_DIM,
"DISTANCE_METRIC": DISTANCE_METRIC,
"M": 16,
"EF_CONSTRUCTION": 200,
}
async def _create_index(
r: aioredis.Redis,
index_name: str,
prefix: str,
schema: tuple,
) -> bool:
"""Create a single RediSearch index, returning True if created.
Issues one ``FT.CREATE`` against the Redis server for *index_name* over the
given key *prefix* and field *schema*, building a HASH-type
:class:`~redis.commands.search.index_definition.IndexDefinition`. Treats an
"Index already exists" error as a benign no-op (returns ``False``) so the
call is idempotent and safe to run on every startup; any other failure is
re-raised. Requires the RediSearch module to be loaded on the server.
Touches Redis only (no KG/LLM/filesystem). Called concurrently by
:func:`ensure_indexes` -- once per entry in its ``index_specs`` list -- via
:func:`asyncio.gather`.
Args:
r: The async Redis client to issue ``FT.CREATE`` against.
index_name: Name of the RediSearch index to create (e.g.
``idx:messages``).
prefix: Key prefix the index covers (e.g. ``msg:``).
schema: Tuple of RediSearch field definitions describing the index.
Returns:
bool: ``True`` if the index was freshly created, ``False`` if it already
existed.
Raises:
Exception: Re-raised for any ``FT.CREATE`` failure other than the index
already existing (e.g. RediSearch not loaded).
"""
definition = IndexDefinition(
prefix=[prefix],
index_type=IndexType.HASH,
)
try:
await r.ft(index_name).create_index(
fields=schema,
definition=definition,
)
logger.info(
"Created RediSearch index '%s' (prefix=%s)",
index_name,
prefix,
)
return True
except Exception as exc:
if "Index already exists" in str(exc):
logger.info("RediSearch index '%s' already exists", index_name)
return False
raise
[docs]
async def ensure_indexes(r: aioredis.Redis) -> dict[str, bool]:
"""Ensure all eight Stargazer RediSearch indexes exist.
Creates the message vector index plus the tool, skill, dangerous-command,
benign-tech, observability, debug-observability, and LLM-request indexes.
Returns a dict mapping index name to whether it was freshly created.
Note: Knowledge-graph vector indexes are managed by
:class:`knowledge_graph.KnowledgeGraphManager` via FalkorDB Cypher.
"""
msg_schema = (
VectorField(
"embedding",
"HNSW",
HNSW_PARAMS,
),
TagField("platform"),
TagField("channel_id"),
TagField("user_id"),
TagField("kind"),
TextField("user_name", no_stem=True),
TextField("text"),
NumericField("timestamp", sortable=True),
)
tool_schema = (
VectorField("embedding", "HNSW", HNSW_PARAMS),
TagField("name"),
TextField("meta_json", no_stem=True),
)
skill_schema = (
VectorField("embedding", "HNSW", HNSW_PARAMS),
TagField("skill_id"),
TextField("name", no_stem=True),
TextField("description", no_stem=True),
TextField("meta_json", no_stem=True),
)
dangerous_cmd_schema = (
VectorField("embedding", "HNSW", HNSW_PARAMS),
TagField("category_id"),
TextField("meta_json", no_stem=True),
)
obs_schema = (
TagField("event_type"),
TagField("platform"),
TagField("channel_id"),
TagField("user_id"),
TagField("tool_name"),
TagField("request_id"),
TextField("preview", no_stem=True),
NumericField("timestamp", sortable=True),
NumericField("http_status", sortable=True),
TagField("http_service"),
)
llm_req_schema = (
TagField("channel_id", separator="|"),
TagField("platform", separator="|"),
TagField("model", separator="|"),
NumericField("timestamp", sortable=True),
NumericField("http_status", sortable=True),
)
# Each index has a distinct name and "Index already exists" is handled
# inside _create_index, so the eight FT.CREATE commands are independent
# and race-safe to issue concurrently.
index_specs: list[tuple[str, str, tuple]] = [
(MSG_INDEX_NAME, MSG_PREFIX, msg_schema),
(TOOL_INDEX_NAME, TOOL_EMB_PREFIX, tool_schema),
(SKILL_INDEX_NAME, SKILL_EMB_PREFIX, skill_schema),
(DANGEROUS_CMD_INDEX_NAME, DANGEROUS_CMD_EMB_PREFIX, dangerous_cmd_schema),
(BENIGN_TECH_INDEX_NAME, BENIGN_TECH_EMB_PREFIX, dangerous_cmd_schema),
(OBS_INDEX_NAME, OBS_PREFIX, obs_schema),
(OBS_DEBUG_INDEX_NAME, OBS_DEBUG_PREFIX, obs_debug_schema),
(LLM_REQ_INDEX_NAME, LLM_REQ_PREFIX, llm_req_schema),
]
created_flags = await asyncio.gather(
*(
_create_index(r, name, prefix, schema)
for name, prefix, schema in index_specs
)
)
results: dict[str, bool] = {
spec[0]: created for spec, created in zip(index_specs, created_flags)
}
await _try_alter_messages_kind(r)
return results
async def _try_alter_messages_kind(r: aioredis.Redis) -> None:
"""Add the ``kind`` TAG to ``idx:messages`` when the index predates that field.
Best-effort migration for deployments whose message index was created before
the ``kind`` tag existed: issues an ``FT.ALTER`` to add the field so existing
indexes gain it without a drop/rebuild. Freshly created indexes already
include ``kind`` via the message schema in :func:`ensure_indexes`, so the
expected "already exists"/"duplicate" errors (and "unknown index" when the
index is absent) are swallowed silently; other errors are logged at debug
level and not re-raised, keeping startup resilient.
Touches Redis only. Called once at the end of :func:`ensure_indexes`, after
the batch of ``FT.CREATE`` commands completes.
Args:
r: The async Redis client to issue ``FT.ALTER`` against.
"""
try:
await r.ft(MSG_INDEX_NAME).alter_index(
TagField("kind"),
)
logger.info("Altered RediSearch index '%s': added field 'kind'", MSG_INDEX_NAME)
except Exception as exc:
err = str(exc).lower()
if "already" in err or "duplicate" in err or "unknown index" in err:
return
logger.debug(
"idx:messages ALTER for kind skipped (%s)",
exc,
)
async def _main(redis_url: str, ssl_kwargs: dict | None = None) -> None:
"""Connect to Redis, ensure all indexes, and print a per-index status report.
The async body of the standalone CLI: builds a binary
(``decode_responses=False``) async Redis client from *redis_url* (and any
*ssl_kwargs*), pings to verify connectivity, runs :func:`ensure_indexes` to
create every Stargazer RediSearch index, and prints ``CREATED`` /
``already exists`` for each. The client is always closed in the ``finally``
block.
Opens a Redis connection and writes to stdout; performs no KG/LLM/filesystem
work beyond that. Called only from this module's ``__main__`` block via
:func:`asyncio.run` when the file is executed directly.
Args:
redis_url: The Redis connection URL to connect to.
ssl_kwargs: Optional SSL/mTLS connection keyword arguments derived from
:class:`config.Config`; ``None`` for a plain connection.
"""
r = aioredis.from_url(redis_url, decode_responses=False, **(ssl_kwargs or {}))
try:
await r.ping()
logger.info("Connected to Redis at %s", redis_url)
results = await ensure_indexes(r)
for name, created in results.items():
status = "CREATED" if created else "already exists"
print(f" {name}: {status}")
finally:
await r.aclose()
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
parser = argparse.ArgumentParser(
description="Create RediSearch vector indexes for Stargazer v3",
)
parser.add_argument(
"--redis-url",
default=os.environ.get(
"REDIS_URL",
"redis://localhost:6379/0",
),
help=(
"Redis connection URL " "(default: $REDIS_URL or redis://localhost:6379/0)"
),
)
args = parser.parse_args()
try:
from config import Config
_cfg = Config.load()
_ssl = _cfg.redis_connection_kwargs_for_url(args.redis_url)
except Exception:
_ssl = {}
print(f"Initializing RediSearch indexes on {args.redis_url} ...")
try:
asyncio.run(_main(args.redis_url, ssl_kwargs=_ssl))
print("Done.")
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
sys.exit(1)