Source code for migrate_kg_overhaul

#!/usr/bin/env python3
"""One-time migration for the knowledge graph overhaul.

Nondestructive -- safe to run multiple times.  Performs:

1. Creates vector + range indexes for new entity labels
   (Rule, Directive, Role).
2. Creates the ``pinned`` range index on ALL entity labels
   (including the 9 pre-existing ones).
3. Backfills ``pinned = false`` on every existing entity that
   lacks the property (WHERE e.pinned IS NULL).

Usage::

    python migrate_kg_overhaul.py [--redis-url redis://localhost:6379/0]

Reads ``$REDIS_URL`` by default.
"""

from __future__ import annotations

import argparse
import asyncio
import logging
import os
import sys

import redis.asyncio as aioredis
from falkordb.asyncio import FalkorDB

logger = logging.getLogger(__name__)

GRAPH_NAME = "knowledge"

ENTITY_LABELS = [
    "Person",
    "Concept",
    "Preference",
    "Fact",
    "Event",
    "Location",
    "Organization",
    "Project",
    "Technology",
    "Rule",
    "Directive",
    "Role",
]


[docs] async def migrate(redis_url: str, ssl_kwargs: dict | None = None) -> None: """Apply the knowledge-graph overhaul migration to FalkorDB. Performs the full nondestructive overhaul in three steps: ensures both the vector index and the ``name`` / ``category`` / ``scope_id`` / ``pinned`` range indexes exist for every label in ``ENTITY_LABELS`` (including the new ``Rule`` / ``Directive`` / ``Role`` labels), then backfills ``pinned = false`` on any existing entity that lacks the property. Each step is idempotent, so re-running this against an already-migrated graph is safe. Opens an async Redis connection at ``redis_url`` (sharing its connection pool with the ``FalkorDB`` client) and drives the ``knowledge`` graph named by ``GRAPH_NAME`` via ``create_node_vector_index``, ``create_node_range_index``, and Cypher ``query`` calls; "already indexed" / "already exists" errors are treated as no-ops. Progress, per-index status, and final node/relationship counts are printed to stdout, and the Redis client is closed on success. Called by this module's ``main`` via ``asyncio.run``; not imported elsewhere. Args: redis_url (str): Redis connection URL whose pool also backs FalkorDB. ssl_kwargs (dict | None): Optional SSL/mTLS keyword arguments forwarded to ``aioredis.from_url`` (typically from ``Config.redis_ssl_kwargs``). Returns: None. Raises: SystemExit: Exits with status 1 if the initial Redis ``ping`` fails. """ r = aioredis.from_url(redis_url, decode_responses=False, **(ssl_kwargs or {})) try: await r.ping() print(f" Connected to Redis at {redis_url}") except Exception as exc: print(f" ERROR: Cannot connect to Redis: {exc}", file=sys.stderr) sys.exit(1) db = FalkorDB(connection_pool=r.connection_pool) graph = db.select_graph(GRAPH_NAME) # ------------------------------------------------------------------ # Step 1 & 2: Ensure vector + range indexes for every label # ------------------------------------------------------------------ print("\n Step 1/2: Ensuring indexes for all entity labels...") for label in ENTITY_LABELS: # Vector index try: await graph.create_node_vector_index( label, "embedding", dim=3072, similarity_function="cosine", ) print(f" [CREATED] vector index on {label}.embedding") except Exception as exc: msg = str(exc).lower() if "already indexed" in msg or "already exists" in msg: print(f" [EXISTS] vector index on {label}.embedding") else: print(f" [WARN] vector index on {label}: {exc}") # Range indexes: name, category, scope_id, pinned for prop in ("name", "category", "scope_id", "pinned"): try: await graph.create_node_range_index(label, prop) print(f" [CREATED] range index on {label}.{prop}") except Exception as exc: msg = str(exc).lower() if "already indexed" in msg or "already exists" in msg: print(f" [EXISTS] range index on {label}.{prop}") else: print(f" [WARN] range index on {label}.{prop}: {exc}") # ------------------------------------------------------------------ # Step 3: Backfill pinned = false on existing entities # ------------------------------------------------------------------ print("\n Step 3: Backfilling pinned = false on existing entities...") q = ( "MATCH (e) " "WHERE e.pinned IS NULL " "SET e.pinned = false " "RETURN count(e) AS updated" ) try: result = await graph.query(q) updated = result.result_set[0][0] if result.result_set else 0 print(f" Backfilled {updated} entities with pinned = false") except Exception as exc: msg = str(exc).lower() if "empty graph" in msg or "no graph" in msg: print(" Graph is empty -- nothing to backfill") else: print(f" [WARN] Backfill query failed: {exc}") # ------------------------------------------------------------------ # Summary # ------------------------------------------------------------------ print("\n Migration complete.") try: nr = await graph.query("MATCH (n) RETURN count(n) AS nodes") rr = await graph.query("MATCH ()-[r]->() RETURN count(r) AS rels") nodes = nr.result_set[0][0] if nr.result_set else 0 rels = rr.result_set[0][0] if rr.result_set else 0 print(f" Graph stats: {nodes} nodes, {rels} relationships") except Exception: pass await r.aclose()
[docs] def main() -> None: """Parse CLI arguments and run the overhaul migration. Command-line entry point for ``python migrate_kg_overhaul.py``. Configures logging, parses ``--redis-url`` (defaulting to ``$REDIS_URL`` or ``redis://localhost:6379/0``), and best-effort loads SSL/mTLS settings from ``Config.redis_ssl_kwargs`` so the migration can reach a TLS-protected Redis. It then drives the migration by handing those values to ``migrate`` through ``asyncio.run``. Invoked only from this module's ``if __name__ == "__main__"`` guard; it has no in-repo callers and prints progress to stdout. Returns: None. Raises: SystemExit: Exits with status 1 if the migration raises, or via ``argparse`` when given invalid arguments. """ logging.basicConfig( level=logging.WARNING, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) parser = argparse.ArgumentParser( description=( "Nondestructive migration for the knowledge graph overhaul. " "Creates indexes for new entity labels (Rule, Directive, Role), " "adds pinned range index on all labels, and backfills " "pinned=false on existing entities." ), ) parser.add_argument( "--redis-url", default=os.environ.get( "REDIS_URL", "redis://localhost:6379/0", ), help=( "Redis connection URL " "(default: $REDIS_URL or redis://localhost:6379/0)" ), ) args = parser.parse_args() try: from config import Config _ssl = Config.load().redis_ssl_kwargs() except Exception: _ssl = {} print("Knowledge graph overhaul migration") print(f"{'=' * 50}") try: asyncio.run(migrate(args.redis_url, ssl_kwargs=_ssl)) except Exception as exc: print(f"\nERROR: {exc}", file=sys.stderr) sys.exit(1)
if __name__ == "__main__": main()