Source code for migrate_kg_uuids

#!/usr/bin/env python3
"""One-time migration: assign UUID7 to all KG entities and stamp
src_uuid / tgt_uuid on all relationships.

Idempotent -- only touches nodes/edges where the uuid fields are NULL.

Usage::

    python migrate_kg_uuids.py [--redis-url redis://localhost:6379/0]
"""

from __future__ import annotations

import argparse
import asyncio
import logging
import os
import sys

from uuid6 import uuid7
import redis.asyncio as aioredis
from falkordb.asyncio import FalkorDB

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)

GRAPH_NAME = "knowledge"


[docs] async def migrate(redis_url: str, ssl_kwargs: dict | None = None) -> None: """Migrate. Args: redis_url (str): The redis url value. ssl_kwargs (dict | None): Optional SSL/mTLS keyword arguments. """ rc = aioredis.from_url(redis_url, decode_responses=True, **(ssl_kwargs or {})) db = FalkorDB(connection_pool=rc.connection_pool) graph = db.select_graph(GRAPH_NAME) try: # --- Step 1: assign UUIDs to entities --- logger.info("Fetching entities without UUIDs...") result = await graph.query( "MATCH (e) WHERE e.uuid IS NULL " "RETURN id(e)" ) node_ids = [row[0] for row in result.result_set] logger.info(" %d entities need UUIDs", len(node_ids)) for nid in node_ids: uid = str(uuid7()) await graph.query( "MATCH (e) WHERE id(e) = $nid " "SET e.uuid = $uuid", params={"nid": nid, "uuid": uid}, ) logger.info(" Assigned UUIDs to %d entities", len(node_ids)) # --- Step 2: stamp src_uuid / tgt_uuid on relationships --- logger.info("Stamping src_uuid/tgt_uuid on relationships...") result = await graph.query( "MATCH (a)-[r]->(b) " "WHERE r.src_uuid IS NULL " "SET r.src_uuid = a.uuid, r.tgt_uuid = b.uuid " "RETURN count(r)" ) rel_count = ( result.result_set[0][0] if result.result_set else 0 ) logger.info(" Stamped %d relationships", rel_count) # --- Step 3: verify --- check = await graph.query( "MATCH (e) WHERE e.uuid IS NULL " "RETURN count(e)" ) remaining = ( check.result_set[0][0] if check.result_set else 0 ) if remaining: logger.warning( " %d entities still missing UUIDs!", remaining, ) else: logger.info(" All entities have UUIDs") check_rels = await graph.query( "MATCH ()-[r]->() WHERE r.src_uuid IS NULL " "RETURN count(r)" ) remaining_rels = ( check_rels.result_set[0][0] if check_rels.result_set else 0 ) if remaining_rels: logger.warning( " %d relationships still missing UUIDs!", remaining_rels, ) else: logger.info(" All relationships have src/tgt UUIDs") logger.info("Migration complete.") finally: await rc.aclose()
[docs] def main() -> None: """Main. """ parser = argparse.ArgumentParser( description="Assign UUID7 to all KG entities", ) parser.add_argument( "--redis-url", default=os.environ.get( "REDIS_URL", "redis://localhost:6379/0", ), ) args = parser.parse_args() try: from config import Config _ssl = Config.load().redis_ssl_kwargs() except Exception: _ssl = {} asyncio.run(migrate(args.redis_url, ssl_kwargs=_ssl))
if __name__ == "__main__": main()