#!/usr/bin/env python3
"""One-time migration: assign UUID7 to all KG entities and stamp
src_uuid / tgt_uuid on all relationships.
Idempotent -- only touches nodes/edges where the uuid fields are NULL.
Usage::
python migrate_kg_uuids.py [--redis-url redis://localhost:6379/0]
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
from uuid6 import uuid7
import redis.asyncio as aioredis
from falkordb.asyncio import FalkorDB
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)
GRAPH_NAME = "knowledge"
[docs]
async def migrate(redis_url: str, ssl_kwargs: dict | None = None) -> None:
"""Migrate.
Args:
redis_url (str): The redis url value.
ssl_kwargs (dict | None): Optional SSL/mTLS keyword arguments.
"""
rc = aioredis.from_url(redis_url, decode_responses=True, **(ssl_kwargs or {}))
db = FalkorDB(connection_pool=rc.connection_pool)
graph = db.select_graph(GRAPH_NAME)
try:
# --- Step 1: assign UUIDs to entities ---
logger.info("Fetching entities without UUIDs...")
result = await graph.query(
"MATCH (e) WHERE e.uuid IS NULL "
"RETURN id(e)"
)
node_ids = [row[0] for row in result.result_set]
logger.info(" %d entities need UUIDs", len(node_ids))
for nid in node_ids:
uid = str(uuid7())
await graph.query(
"MATCH (e) WHERE id(e) = $nid "
"SET e.uuid = $uuid",
params={"nid": nid, "uuid": uid},
)
logger.info(" Assigned UUIDs to %d entities", len(node_ids))
# --- Step 2: stamp src_uuid / tgt_uuid on relationships ---
logger.info("Stamping src_uuid/tgt_uuid on relationships...")
result = await graph.query(
"MATCH (a)-[r]->(b) "
"WHERE r.src_uuid IS NULL "
"SET r.src_uuid = a.uuid, r.tgt_uuid = b.uuid "
"RETURN count(r)"
)
rel_count = (
result.result_set[0][0] if result.result_set else 0
)
logger.info(" Stamped %d relationships", rel_count)
# --- Step 3: verify ---
check = await graph.query(
"MATCH (e) WHERE e.uuid IS NULL "
"RETURN count(e)"
)
remaining = (
check.result_set[0][0] if check.result_set else 0
)
if remaining:
logger.warning(
" %d entities still missing UUIDs!", remaining,
)
else:
logger.info(" All entities have UUIDs")
check_rels = await graph.query(
"MATCH ()-[r]->() WHERE r.src_uuid IS NULL "
"RETURN count(r)"
)
remaining_rels = (
check_rels.result_set[0][0]
if check_rels.result_set else 0
)
if remaining_rels:
logger.warning(
" %d relationships still missing UUIDs!",
remaining_rels,
)
else:
logger.info(" All relationships have src/tgt UUIDs")
logger.info("Migration complete.")
finally:
await rc.aclose()
[docs]
def main() -> None:
"""Main.
"""
parser = argparse.ArgumentParser(
description="Assign UUID7 to all KG entities",
)
parser.add_argument(
"--redis-url",
default=os.environ.get(
"REDIS_URL", "redis://localhost:6379/0",
),
)
args = parser.parse_args()
try:
from config import Config
_ssl = Config.load().redis_ssl_kwargs()
except Exception:
_ssl = {}
asyncio.run(migrate(args.redis_url, ssl_kwargs=_ssl))
if __name__ == "__main__":
main()