Source code for migrate_kg_overhaul

#!/usr/bin/env python3
"""One-time migration for the knowledge graph overhaul.

Nondestructive -- safe to run multiple times.  Performs:

1. Creates vector + range indexes for new entity labels
   (Rule, Directive, Role).
2. Creates the ``pinned`` range index on ALL entity labels
   (including the 9 pre-existing ones).
3. Backfills ``pinned = false`` on every existing entity that
   lacks the property (WHERE e.pinned IS NULL).

Usage::

    python migrate_kg_overhaul.py [--redis-url redis://localhost:6379/0]

Reads ``$REDIS_URL`` by default.
"""

from __future__ import annotations

import argparse
import asyncio
import logging
import os
import sys

import redis.asyncio as aioredis
from falkordb.asyncio import FalkorDB

logger = logging.getLogger(__name__)

GRAPH_NAME = "knowledge"

ENTITY_LABELS = [
    "Person", "Concept", "Preference", "Fact", "Event",
    "Location", "Organization", "Project", "Technology",
    "Rule", "Directive", "Role",
]


[docs] async def migrate(redis_url: str, ssl_kwargs: dict | None = None) -> None: """Migrate. Args: redis_url (str): The redis url value. ssl_kwargs (dict | None): Optional SSL/mTLS keyword arguments. """ r = aioredis.from_url(redis_url, decode_responses=False, **(ssl_kwargs or {})) try: await r.ping() print(f" Connected to Redis at {redis_url}") except Exception as exc: print(f" ERROR: Cannot connect to Redis: {exc}", file=sys.stderr) sys.exit(1) db = FalkorDB(connection_pool=r.connection_pool) graph = db.select_graph(GRAPH_NAME) # ------------------------------------------------------------------ # Step 1 & 2: Ensure vector + range indexes for every label # ------------------------------------------------------------------ print("\n Step 1/2: Ensuring indexes for all entity labels...") for label in ENTITY_LABELS: # Vector index try: await graph.create_node_vector_index( label, "embedding", dim=3072, similarity_function="cosine", ) print(f" [CREATED] vector index on {label}.embedding") except Exception as exc: msg = str(exc).lower() if "already indexed" in msg or "already exists" in msg: print(f" [EXISTS] vector index on {label}.embedding") else: print(f" [WARN] vector index on {label}: {exc}") # Range indexes: name, category, scope_id, pinned for prop in ("name", "category", "scope_id", "pinned"): try: await graph.create_node_range_index(label, prop) print(f" [CREATED] range index on {label}.{prop}") except Exception as exc: msg = str(exc).lower() if "already indexed" in msg or "already exists" in msg: print(f" [EXISTS] range index on {label}.{prop}") else: print(f" [WARN] range index on {label}.{prop}: {exc}") # ------------------------------------------------------------------ # Step 3: Backfill pinned = false on existing entities # ------------------------------------------------------------------ print("\n Step 3: Backfilling pinned = false on existing entities...") q = ( "MATCH (e) " "WHERE e.pinned IS NULL " "SET e.pinned = false " "RETURN count(e) AS updated" ) try: result = await graph.query(q) updated = result.result_set[0][0] if result.result_set else 0 print(f" Backfilled {updated} entities with pinned = false") except Exception as exc: msg = str(exc).lower() if "empty graph" in msg or "no graph" in msg: print(" Graph is empty -- nothing to backfill") else: print(f" [WARN] Backfill query failed: {exc}") # ------------------------------------------------------------------ # Summary # ------------------------------------------------------------------ print("\n Migration complete.") try: nr = await graph.query( "MATCH (n) RETURN count(n) AS nodes" ) rr = await graph.query( "MATCH ()-[r]->() RETURN count(r) AS rels" ) nodes = nr.result_set[0][0] if nr.result_set else 0 rels = rr.result_set[0][0] if rr.result_set else 0 print(f" Graph stats: {nodes} nodes, {rels} relationships") except Exception: pass await r.aclose()
[docs] def main() -> None: """Main. """ logging.basicConfig( level=logging.WARNING, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) parser = argparse.ArgumentParser( description=( "Nondestructive migration for the knowledge graph overhaul. " "Creates indexes for new entity labels (Rule, Directive, Role), " "adds pinned range index on all labels, and backfills " "pinned=false on existing entities." ), ) parser.add_argument( "--redis-url", default=os.environ.get( "REDIS_URL", "redis://localhost:6379/0", ), help=( "Redis connection URL " "(default: $REDIS_URL or redis://localhost:6379/0)" ), ) args = parser.parse_args() try: from config import Config _ssl = Config.load().redis_ssl_kwargs() except Exception: _ssl = {} print(f"Knowledge graph overhaul migration") print(f"{'=' * 50}") try: asyncio.run(migrate(args.redis_url, ssl_kwargs=_ssl)) except Exception as exc: print(f"\nERROR: {exc}", file=sys.stderr) sys.exit(1)
if __name__ == "__main__": main()