#!/usr/bin/env python3
"""One-time migration for the knowledge graph overhaul.
Nondestructive -- safe to run multiple times. Performs:
1. Creates vector + range indexes for new entity labels
(Rule, Directive, Role).
2. Creates the ``pinned`` range index on ALL entity labels
(including the 9 pre-existing ones).
3. Backfills ``pinned = false`` on every existing entity that
lacks the property (WHERE e.pinned IS NULL).
Usage::
python migrate_kg_overhaul.py [--redis-url redis://localhost:6379/0]
Reads ``$REDIS_URL`` by default.
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
import redis.asyncio as aioredis
from falkordb.asyncio import FalkorDB
logger = logging.getLogger(__name__)
GRAPH_NAME = "knowledge"
ENTITY_LABELS = [
"Person",
"Concept",
"Preference",
"Fact",
"Event",
"Location",
"Organization",
"Project",
"Technology",
"Rule",
"Directive",
"Role",
]
[docs]
async def migrate(redis_url: str, ssl_kwargs: dict | None = None) -> None:
"""Apply the knowledge-graph overhaul migration to FalkorDB.
Performs the full nondestructive overhaul in three steps: ensures both the
vector index and the ``name`` / ``category`` / ``scope_id`` / ``pinned``
range indexes exist for every label in ``ENTITY_LABELS`` (including the new
``Rule`` / ``Directive`` / ``Role`` labels), then backfills
``pinned = false`` on any existing entity that lacks the property. Each step
is idempotent, so re-running this against an already-migrated graph is safe.
Opens an async Redis connection at ``redis_url`` (sharing its connection
pool with the ``FalkorDB`` client) and drives the ``knowledge`` graph named
by ``GRAPH_NAME`` via ``create_node_vector_index``, ``create_node_range_index``,
and Cypher ``query`` calls; "already indexed" / "already exists" errors are
treated as no-ops. Progress, per-index status, and final node/relationship
counts are printed to stdout, and the Redis client is closed on success.
Called by this module's ``main`` via ``asyncio.run``; not imported elsewhere.
Args:
redis_url (str): Redis connection URL whose pool also backs FalkorDB.
ssl_kwargs (dict | None): Optional SSL/mTLS keyword arguments forwarded
to ``aioredis.from_url`` (typically from ``Config.redis_ssl_kwargs``).
Returns:
None.
Raises:
SystemExit: Exits with status 1 if the initial Redis ``ping`` fails.
"""
r = aioredis.from_url(redis_url, decode_responses=False, **(ssl_kwargs or {}))
try:
await r.ping()
print(f" Connected to Redis at {redis_url}")
except Exception as exc:
print(f" ERROR: Cannot connect to Redis: {exc}", file=sys.stderr)
sys.exit(1)
db = FalkorDB(connection_pool=r.connection_pool)
graph = db.select_graph(GRAPH_NAME)
# ------------------------------------------------------------------
# Step 1 & 2: Ensure vector + range indexes for every label
# ------------------------------------------------------------------
print("\n Step 1/2: Ensuring indexes for all entity labels...")
for label in ENTITY_LABELS:
# Vector index
try:
await graph.create_node_vector_index(
label,
"embedding",
dim=3072,
similarity_function="cosine",
)
print(f" [CREATED] vector index on {label}.embedding")
except Exception as exc:
msg = str(exc).lower()
if "already indexed" in msg or "already exists" in msg:
print(f" [EXISTS] vector index on {label}.embedding")
else:
print(f" [WARN] vector index on {label}: {exc}")
# Range indexes: name, category, scope_id, pinned
for prop in ("name", "category", "scope_id", "pinned"):
try:
await graph.create_node_range_index(label, prop)
print(f" [CREATED] range index on {label}.{prop}")
except Exception as exc:
msg = str(exc).lower()
if "already indexed" in msg or "already exists" in msg:
print(f" [EXISTS] range index on {label}.{prop}")
else:
print(f" [WARN] range index on {label}.{prop}: {exc}")
# ------------------------------------------------------------------
# Step 3: Backfill pinned = false on existing entities
# ------------------------------------------------------------------
print("\n Step 3: Backfilling pinned = false on existing entities...")
q = (
"MATCH (e) "
"WHERE e.pinned IS NULL "
"SET e.pinned = false "
"RETURN count(e) AS updated"
)
try:
result = await graph.query(q)
updated = result.result_set[0][0] if result.result_set else 0
print(f" Backfilled {updated} entities with pinned = false")
except Exception as exc:
msg = str(exc).lower()
if "empty graph" in msg or "no graph" in msg:
print(" Graph is empty -- nothing to backfill")
else:
print(f" [WARN] Backfill query failed: {exc}")
# ------------------------------------------------------------------
# Summary
# ------------------------------------------------------------------
print("\n Migration complete.")
try:
nr = await graph.query("MATCH (n) RETURN count(n) AS nodes")
rr = await graph.query("MATCH ()-[r]->() RETURN count(r) AS rels")
nodes = nr.result_set[0][0] if nr.result_set else 0
rels = rr.result_set[0][0] if rr.result_set else 0
print(f" Graph stats: {nodes} nodes, {rels} relationships")
except Exception:
pass
await r.aclose()
[docs]
def main() -> None:
"""Parse CLI arguments and run the overhaul migration.
Command-line entry point for ``python migrate_kg_overhaul.py``. Configures
logging, parses ``--redis-url`` (defaulting to ``$REDIS_URL`` or
``redis://localhost:6379/0``), and best-effort loads SSL/mTLS settings from
``Config.redis_ssl_kwargs`` so the migration can reach a TLS-protected
Redis. It then drives the migration by handing those values to ``migrate``
through ``asyncio.run``.
Invoked only from this module's ``if __name__ == "__main__"`` guard; it has
no in-repo callers and prints progress to stdout.
Returns:
None.
Raises:
SystemExit: Exits with status 1 if the migration raises, or via
``argparse`` when given invalid arguments.
"""
logging.basicConfig(
level=logging.WARNING,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
parser = argparse.ArgumentParser(
description=(
"Nondestructive migration for the knowledge graph overhaul. "
"Creates indexes for new entity labels (Rule, Directive, Role), "
"adds pinned range index on all labels, and backfills "
"pinned=false on existing entities."
),
)
parser.add_argument(
"--redis-url",
default=os.environ.get(
"REDIS_URL",
"redis://localhost:6379/0",
),
help=(
"Redis connection URL " "(default: $REDIS_URL or redis://localhost:6379/0)"
),
)
args = parser.parse_args()
try:
from config import Config
_ssl = Config.load().redis_ssl_kwargs()
except Exception:
_ssl = {}
print("Knowledge graph overhaul migration")
print(f"{'=' * 50}")
try:
asyncio.run(migrate(args.redis_url, ssl_kwargs=_ssl))
except Exception as exc:
print(f"\nERROR: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()