#!/usr/bin/env python3
"""One-time migration: assign UUID7 to all KG entities and stamp
src_uuid / tgt_uuid on all relationships.
Idempotent -- only touches nodes/edges where the uuid fields are NULL.
Usage::
python migrate_kg_uuids.py [--redis-url redis://localhost:6379/0]
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
from uuid6 import uuid7
import redis.asyncio as aioredis
from falkordb.asyncio import FalkorDB
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)
GRAPH_NAME = "knowledge"
[docs]
async def migrate(redis_url: str, ssl_kwargs: dict | None = None) -> None:
"""Backfill UUID7 identifiers across the knowledge graph.
Walks the ``knowledge`` FalkorDB graph and, in three steps, assigns a fresh
``uuid7`` to every entity whose ``uuid`` property is NULL, then stamps
``src_uuid`` / ``tgt_uuid`` onto every relationship from its endpoints'
UUIDs, then verifies that no nodes or edges remain unstamped. The migration
is idempotent: only NULL fields are touched, so re-running it leaves
already-stamped records alone.
Opens an async Redis connection at ``redis_url`` (its pool also backing the
``FalkorDB`` client named by ``GRAPH_NAME``) and issues per-node and bulk
Cypher ``query`` calls; entity UUIDs are written one node at a time while
relationship stamping is a single set-based query. Progress, counts, and any
still-missing UUIDs are reported through the module ``logger`` (warnings on
leftovers), and the Redis client is always closed in a ``finally`` block.
Called by this module's ``main`` via ``asyncio.run``; not imported elsewhere.
Args:
redis_url (str): Redis connection URL whose pool also backs FalkorDB.
ssl_kwargs (dict | None): Optional SSL/mTLS keyword arguments forwarded
to ``aioredis.from_url`` (typically from ``Config.redis_ssl_kwargs``).
Returns:
None.
"""
rc = aioredis.from_url(redis_url, decode_responses=True, **(ssl_kwargs or {}))
db = FalkorDB(connection_pool=rc.connection_pool)
graph = db.select_graph(GRAPH_NAME)
try:
# --- Step 1: assign UUIDs to entities ---
logger.info("Fetching entities without UUIDs...")
result = await graph.query("MATCH (e) WHERE e.uuid IS NULL " "RETURN id(e)")
node_ids = [row[0] for row in result.result_set]
logger.info(" %d entities need UUIDs", len(node_ids))
for nid in node_ids:
uid = str(uuid7())
await graph.query(
"MATCH (e) WHERE id(e) = $nid " "SET e.uuid = $uuid",
params={"nid": nid, "uuid": uid},
)
logger.info(" Assigned UUIDs to %d entities", len(node_ids))
# --- Step 2: stamp src_uuid / tgt_uuid on relationships ---
logger.info("Stamping src_uuid/tgt_uuid on relationships...")
result = await graph.query(
"MATCH (a)-[r]->(b) "
"WHERE r.src_uuid IS NULL "
"SET r.src_uuid = a.uuid, r.tgt_uuid = b.uuid "
"RETURN count(r)"
)
rel_count = result.result_set[0][0] if result.result_set else 0
logger.info(" Stamped %d relationships", rel_count)
# --- Step 3: verify ---
check = await graph.query("MATCH (e) WHERE e.uuid IS NULL " "RETURN count(e)")
remaining = check.result_set[0][0] if check.result_set else 0
if remaining:
logger.warning(
" %d entities still missing UUIDs!",
remaining,
)
else:
logger.info(" All entities have UUIDs")
check_rels = await graph.query(
"MATCH ()-[r]->() WHERE r.src_uuid IS NULL " "RETURN count(r)"
)
remaining_rels = check_rels.result_set[0][0] if check_rels.result_set else 0
if remaining_rels:
logger.warning(
" %d relationships still missing UUIDs!",
remaining_rels,
)
else:
logger.info(" All relationships have src/tgt UUIDs")
logger.info("Migration complete.")
finally:
await rc.aclose()
[docs]
def main() -> None:
"""Parse CLI arguments and run the UUID backfill migration.
Command-line entry point for ``python migrate_kg_uuids.py``. Parses
``--redis-url`` (defaulting to ``$REDIS_URL`` or
``redis://localhost:6379/0``), best-effort loads SSL/mTLS settings from
``Config.redis_ssl_kwargs`` so a TLS-protected Redis can be reached, and
hands both to ``migrate`` through ``asyncio.run``.
Invoked only from this module's ``if __name__ == "__main__"`` guard; it has
no in-repo callers. Logging is configured at import time at module scope.
Returns:
None.
Raises:
SystemExit: Raised by ``argparse`` when given invalid arguments.
"""
parser = argparse.ArgumentParser(
description="Assign UUID7 to all KG entities",
)
parser.add_argument(
"--redis-url",
default=os.environ.get(
"REDIS_URL",
"redis://localhost:6379/0",
),
)
args = parser.parse_args()
try:
from config import Config
_ssl = Config.load().redis_ssl_kwargs()
except Exception:
_ssl = {}
asyncio.run(migrate(args.redis_url, ssl_kwargs=_ssl))
if __name__ == "__main__":
main()