Source code for tools.kg_anchoring_tools

"""tools/kg_anchoring_tools.py — Agent-facing tools for the Limbic Anchoring
pipeline (Phase 5.2).

Exposes:
  kg_anchor_messages  — Create SYNTHESIZED_FROM edges between messages and
                        existing graph entities.
"""

from __future__ import annotations

import logging
from typing import Any

logger = logging.getLogger(__name__)


[docs] async def kg_anchor_messages( graph_query: Any, message_uuids: list[str], entity_uuids: list[str], relation: str = "SYNTHESIZED_FROM", ) -> dict[str, Any]: """Create typed edges between Message nodes and existing graph entities. This is an idempotent MERGE operation — calling it multiple times with the same arguments produces exactly one edge per pair. Args: graph_query: Async callable ``(cypher, params) -> result``. message_uuids: List of ``redis_key`` values identifying Message nodes. entity_uuids: List of entity ``uuid`` values. relation: Relationship type (default ``"SYNTHESIZED_FROM"``). Returns: Dict with ``{"edges_created": int, "skipped": bool}`` Edge cases: - Empty ``message_uuids`` or ``entity_uuids``: returns immediately with ``{"edges_created": 0, "skipped": True}`` — no Cypher emitted. """ if not message_uuids or not entity_uuids: return {"edges_created": 0, "skipped": True} # Build a single batch MERGE so we avoid N² Cypher round-trips. # The UNWIND pattern processes the full cross-product in one statement. cypher = ( "UNWIND $msg_keys AS mk " "UNWIND $ent_uuids AS eu " "MATCH (m:Message {redis_key: mk}) " "MATCH (e {uuid: eu}) " f"MERGE (e)-[:{relation}]->(m) " "RETURN count(*)" ) result = await graph_query( cypher, { "msg_keys": message_uuids, "ent_uuids": entity_uuids, }, ) try: edges_created = result.result_set[0][0] except (AttributeError, IndexError, TypeError): edges_created = 0 logger.info( "kg_anchor_messages: created %d %s edges (%d msg × %d entity)", edges_created, relation, len(message_uuids), len(entity_uuids), ) return {"edges_created": edges_created, "skipped": False}