#!/usr/bin/env python3
"""One-time migration for the knowledge graph overhaul.
Nondestructive -- safe to run multiple times. Performs:
1. Creates vector + range indexes for new entity labels
(Rule, Directive, Role).
2. Creates the ``pinned`` range index on ALL entity labels
(including the 9 pre-existing ones).
3. Backfills ``pinned = false`` on every existing entity that
lacks the property (WHERE e.pinned IS NULL).
Usage::
python migrate_kg_overhaul.py [--redis-url redis://localhost:6379/0]
Reads ``$REDIS_URL`` by default.
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
import redis.asyncio as aioredis
from falkordb.asyncio import FalkorDB
logger = logging.getLogger(__name__)
GRAPH_NAME = "knowledge"
ENTITY_LABELS = [
"Person", "Concept", "Preference", "Fact", "Event",
"Location", "Organization", "Project", "Technology",
"Rule", "Directive", "Role",
]
[docs]
async def migrate(redis_url: str, ssl_kwargs: dict | None = None) -> None:
"""Migrate.
Args:
redis_url (str): The redis url value.
ssl_kwargs (dict | None): Optional SSL/mTLS keyword arguments.
"""
r = aioredis.from_url(redis_url, decode_responses=False, **(ssl_kwargs or {}))
try:
await r.ping()
print(f" Connected to Redis at {redis_url}")
except Exception as exc:
print(f" ERROR: Cannot connect to Redis: {exc}", file=sys.stderr)
sys.exit(1)
db = FalkorDB(connection_pool=r.connection_pool)
graph = db.select_graph(GRAPH_NAME)
# ------------------------------------------------------------------
# Step 1 & 2: Ensure vector + range indexes for every label
# ------------------------------------------------------------------
print("\n Step 1/2: Ensuring indexes for all entity labels...")
for label in ENTITY_LABELS:
# Vector index
try:
await graph.create_node_vector_index(
label, "embedding",
dim=3072,
similarity_function="cosine",
)
print(f" [CREATED] vector index on {label}.embedding")
except Exception as exc:
msg = str(exc).lower()
if "already indexed" in msg or "already exists" in msg:
print(f" [EXISTS] vector index on {label}.embedding")
else:
print(f" [WARN] vector index on {label}: {exc}")
# Range indexes: name, category, scope_id, pinned
for prop in ("name", "category", "scope_id", "pinned"):
try:
await graph.create_node_range_index(label, prop)
print(f" [CREATED] range index on {label}.{prop}")
except Exception as exc:
msg = str(exc).lower()
if "already indexed" in msg or "already exists" in msg:
print(f" [EXISTS] range index on {label}.{prop}")
else:
print(f" [WARN] range index on {label}.{prop}: {exc}")
# ------------------------------------------------------------------
# Step 3: Backfill pinned = false on existing entities
# ------------------------------------------------------------------
print("\n Step 3: Backfilling pinned = false on existing entities...")
q = (
"MATCH (e) "
"WHERE e.pinned IS NULL "
"SET e.pinned = false "
"RETURN count(e) AS updated"
)
try:
result = await graph.query(q)
updated = result.result_set[0][0] if result.result_set else 0
print(f" Backfilled {updated} entities with pinned = false")
except Exception as exc:
msg = str(exc).lower()
if "empty graph" in msg or "no graph" in msg:
print(" Graph is empty -- nothing to backfill")
else:
print(f" [WARN] Backfill query failed: {exc}")
# ------------------------------------------------------------------
# Summary
# ------------------------------------------------------------------
print("\n Migration complete.")
try:
nr = await graph.query(
"MATCH (n) RETURN count(n) AS nodes"
)
rr = await graph.query(
"MATCH ()-[r]->() RETURN count(r) AS rels"
)
nodes = nr.result_set[0][0] if nr.result_set else 0
rels = rr.result_set[0][0] if rr.result_set else 0
print(f" Graph stats: {nodes} nodes, {rels} relationships")
except Exception:
pass
await r.aclose()
[docs]
def main() -> None:
"""Main.
"""
logging.basicConfig(
level=logging.WARNING,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
parser = argparse.ArgumentParser(
description=(
"Nondestructive migration for the knowledge graph overhaul. "
"Creates indexes for new entity labels (Rule, Directive, Role), "
"adds pinned range index on all labels, and backfills "
"pinned=false on existing entities."
),
)
parser.add_argument(
"--redis-url",
default=os.environ.get(
"REDIS_URL", "redis://localhost:6379/0",
),
help=(
"Redis connection URL "
"(default: $REDIS_URL or redis://localhost:6379/0)"
),
)
args = parser.parse_args()
try:
from config import Config
_ssl = Config.load().redis_ssl_kwargs()
except Exception:
_ssl = {}
print(f"Knowledge graph overhaul migration")
print(f"{'=' * 50}")
try:
asyncio.run(migrate(args.redis_url, ssl_kwargs=_ssl))
except Exception as exc:
print(f"\nERROR: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()