"""tools/kg_anchoring_tools.py — Agent-facing tools for the Limbic Anchoring
pipeline (Phase 5.2).
Exposes:
kg_anchor_messages — Create SYNTHESIZED_FROM edges between messages and
existing graph entities.
"""
from __future__ import annotations
import logging
from typing import Any
logger = logging.getLogger(__name__)
[docs]
async def kg_anchor_messages(
graph_query: Any,
message_uuids: list[str],
entity_uuids: list[str],
relation: str = "SYNTHESIZED_FROM",
) -> dict[str, Any]:
"""Create typed edges between Message nodes and existing graph entities.
This is an idempotent MERGE operation — calling it multiple times with
the same arguments produces exactly one edge per pair.
Args:
graph_query: Async callable ``(cypher, params) -> result``.
message_uuids: List of ``redis_key`` values identifying Message nodes.
entity_uuids: List of entity ``uuid`` values.
relation: Relationship type (default ``"SYNTHESIZED_FROM"``).
Returns:
Dict with ``{"edges_created": int, "skipped": bool}``
Edge cases:
- Empty ``message_uuids`` or ``entity_uuids``: returns immediately with
``{"edges_created": 0, "skipped": True}`` — no Cypher emitted.
"""
if not message_uuids or not entity_uuids:
return {"edges_created": 0, "skipped": True}
# Build a single batch MERGE so we avoid N² Cypher round-trips.
# The UNWIND pattern processes the full cross-product in one statement.
cypher = (
"UNWIND $msg_keys AS mk "
"UNWIND $ent_uuids AS eu "
"MATCH (m:Message {redis_key: mk}) "
"MATCH (e {uuid: eu}) "
f"MERGE (e)-[:{relation}]->(m) "
"RETURN count(*)"
)
result = await graph_query(
cypher,
{
"msg_keys": message_uuids,
"ent_uuids": entity_uuids,
},
)
try:
edges_created = result.result_set[0][0]
except (AttributeError, IndexError, TypeError):
edges_created = 0
logger.info(
"kg_anchor_messages: created %d %s edges (%d msg × %d entity)",
edges_created,
relation,
len(message_uuids),
len(entity_uuids),
)
return {"edges_created": edges_created, "skipped": False}