Source code for migrate_kg_uuids

#!/usr/bin/env python3
"""One-time migration: assign UUID7 to all KG entities and stamp
src_uuid / tgt_uuid on all relationships.

Idempotent -- only touches nodes/edges where the uuid fields are NULL.

Usage::

    python migrate_kg_uuids.py [--redis-url redis://localhost:6379/0]
"""

from __future__ import annotations

import argparse
import asyncio
import logging
import os

from uuid6 import uuid7
import redis.asyncio as aioredis
from falkordb.asyncio import FalkorDB

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)

GRAPH_NAME = "knowledge"


[docs] async def migrate(redis_url: str, ssl_kwargs: dict | None = None) -> None: """Backfill UUID7 identifiers across the knowledge graph. Walks the ``knowledge`` FalkorDB graph and, in three steps, assigns a fresh ``uuid7`` to every entity whose ``uuid`` property is NULL, then stamps ``src_uuid`` / ``tgt_uuid`` onto every relationship from its endpoints' UUIDs, then verifies that no nodes or edges remain unstamped. The migration is idempotent: only NULL fields are touched, so re-running it leaves already-stamped records alone. Opens an async Redis connection at ``redis_url`` (its pool also backing the ``FalkorDB`` client named by ``GRAPH_NAME``) and issues per-node and bulk Cypher ``query`` calls; entity UUIDs are written one node at a time while relationship stamping is a single set-based query. Progress, counts, and any still-missing UUIDs are reported through the module ``logger`` (warnings on leftovers), and the Redis client is always closed in a ``finally`` block. Called by this module's ``main`` via ``asyncio.run``; not imported elsewhere. Args: redis_url (str): Redis connection URL whose pool also backs FalkorDB. ssl_kwargs (dict | None): Optional SSL/mTLS keyword arguments forwarded to ``aioredis.from_url`` (typically from ``Config.redis_ssl_kwargs``). Returns: None. """ rc = aioredis.from_url(redis_url, decode_responses=True, **(ssl_kwargs or {})) db = FalkorDB(connection_pool=rc.connection_pool) graph = db.select_graph(GRAPH_NAME) try: # --- Step 1: assign UUIDs to entities --- logger.info("Fetching entities without UUIDs...") result = await graph.query("MATCH (e) WHERE e.uuid IS NULL " "RETURN id(e)") node_ids = [row[0] for row in result.result_set] logger.info(" %d entities need UUIDs", len(node_ids)) for nid in node_ids: uid = str(uuid7()) await graph.query( "MATCH (e) WHERE id(e) = $nid " "SET e.uuid = $uuid", params={"nid": nid, "uuid": uid}, ) logger.info(" Assigned UUIDs to %d entities", len(node_ids)) # --- Step 2: stamp src_uuid / tgt_uuid on relationships --- logger.info("Stamping src_uuid/tgt_uuid on relationships...") result = await graph.query( "MATCH (a)-[r]->(b) " "WHERE r.src_uuid IS NULL " "SET r.src_uuid = a.uuid, r.tgt_uuid = b.uuid " "RETURN count(r)" ) rel_count = result.result_set[0][0] if result.result_set else 0 logger.info(" Stamped %d relationships", rel_count) # --- Step 3: verify --- check = await graph.query("MATCH (e) WHERE e.uuid IS NULL " "RETURN count(e)") remaining = check.result_set[0][0] if check.result_set else 0 if remaining: logger.warning( " %d entities still missing UUIDs!", remaining, ) else: logger.info(" All entities have UUIDs") check_rels = await graph.query( "MATCH ()-[r]->() WHERE r.src_uuid IS NULL " "RETURN count(r)" ) remaining_rels = check_rels.result_set[0][0] if check_rels.result_set else 0 if remaining_rels: logger.warning( " %d relationships still missing UUIDs!", remaining_rels, ) else: logger.info(" All relationships have src/tgt UUIDs") logger.info("Migration complete.") finally: await rc.aclose()
[docs] def main() -> None: """Parse CLI arguments and run the UUID backfill migration. Command-line entry point for ``python migrate_kg_uuids.py``. Parses ``--redis-url`` (defaulting to ``$REDIS_URL`` or ``redis://localhost:6379/0``), best-effort loads SSL/mTLS settings from ``Config.redis_ssl_kwargs`` so a TLS-protected Redis can be reached, and hands both to ``migrate`` through ``asyncio.run``. Invoked only from this module's ``if __name__ == "__main__"`` guard; it has no in-repo callers. Logging is configured at import time at module scope. Returns: None. Raises: SystemExit: Raised by ``argparse`` when given invalid arguments. """ parser = argparse.ArgumentParser( description="Assign UUID7 to all KG entities", ) parser.add_argument( "--redis-url", default=os.environ.get( "REDIS_URL", "redis://localhost:6379/0", ), ) args = parser.parse_args() try: from config import Config _ssl = Config.load().redis_ssl_kwargs() except Exception: _ssl = {} asyncio.run(migrate(args.redis_url, ssl_kwargs=_ssl))
if __name__ == "__main__": main()