Source code for tools.visual_memory_tools

"""Visual Memory Tools - repost, search, and manage visual entities.   # 💀🔥

Gives Star the ability to retrieve and re-share stored images from her
visual memory graph, search by text description, and manage entities.

These tools operate directly against FalkorDB and the image storage
directory — they don't need the VisualMemoryEngine instance since the
tools service runs in a separate process from inference.
"""

from __future__ import annotations

import asyncio
import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING

import jsonutil as json

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

# Image storage directory (matches visual_memory.py default)
_DEFAULT_IMAGE_DIR = os.environ.get(
    "VISUAL_MEMORY_IMAGE_DIR",
    "data/visual_memory/images",
)

# ═══════════════════════════════════════════════════════════════════════
# Tool: repost_image                                                # 📷
# ═══════════════════════════════════════════════════════════════════════

TOOL_NAME = "repost_image"
TOOL_DESCRIPTION = (
    "Retrieve and repost a previously seen image from visual memory. "
    "Search by entity label (e.g. a person's name), entity ID, or "
    "natural language description. The image is sent to the current "
    "channel. Use this when someone asks you to show, repost, or share "
    "an image you've seen before. Also use it to 'recall' what someone "
    "looks like or to show a previously identified object/scene."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "query": {
            "type": "string",
            "description": (
                "Search query: a person's name/label, or a text "
                "description of the image (e.g. 'red car', 'person "
                "with glasses'). Used for fuzzy label match first, "
                "then SigLIP text-to-image search as fallback."
            ),
        },
        "entity_id": {
            "type": "string",
            "description": (
                "Direct entity ID (e.g. 've_01j...') to retrieve. "
                "If provided, query is ignored."
            ),
        },
    },
    "required": [],
}


async def _get_kg(ctx: ToolContext):
    """Get the FalkorDB knowledge graph from the KG manager."""
    kg_mgr = getattr(ctx, "kg_manager", None)
    if kg_mgr is None:
        return None
    # The KG manager wraps a FalkorDB graph
    return getattr(kg_mgr, "_graph", None) or getattr(kg_mgr, "graph", None)


async def _find_entity_by_label(kg, query: str) -> dict | None:
    """Search for a visual entity by label substring match."""
    safe_q = query.replace("'", "\\'").replace("\\", "\\\\")
    cypher = (
        f"MATCH (v:VisualEntity) "
        f"WHERE toLower(v.label) CONTAINS toLower('{safe_q}') "
        f"RETURN v.entity_id AS eid, v.label AS label, "
        f"v.entity_type AS etype, v.sighting_count AS sc, "
        f"v.visual_traits AS traits "
        f"ORDER BY v.sighting_count DESC LIMIT 5"
    )
    try:
        result = await kg.ro_query(cypher)
        if result.result_set:
            row = result.result_set[0]
            return {
                "entity_id": str(row[0] or ""),
                "label": str(row[1] or ""),
                "entity_type": str(row[2] or ""),
                "sighting_count": int(row[3] or 0),
                "visual_traits": str(row[4] or ""),
            }
    except Exception:
        logger.debug("[visual_memory_tools] Label search failed", exc_info=True)
    return None


async def _get_image_hash(kg, entity_id: str) -> str | None:
    """Get the most recent image hash for an entity via sighting lookup."""
    cypher = (
        f"MATCH (v:VisualEntity {{entity_id: '{entity_id}'}})"
        f"-[:SEEN_IN]->(s:Sighting) "
        f"RETURN s.image_hash AS hash "
        f"ORDER BY s.timestamp DESC LIMIT 1"
    )
    try:
        result = await kg.ro_query(cypher)
        if result.result_set:
            return str(result.result_set[0][0] or "")
    except Exception:
        logger.debug("[visual_memory_tools] Hash lookup failed", exc_info=True)
    return None


[docs] async def run( query: str | None = None, entity_id: str | None = None, ctx: ToolContext | None = None, ) -> str: """Retrieve and repost an image from visual memory. # 📷 Args: query: Search text (label, name, or description). entity_id: Direct entity ID to retrieve. ctx: Tool execution context. Returns: str: JSON result with success/error status. """ if not ctx: return json.dumps({"error": "No tool context available"}) if not entity_id and not query: return json.dumps({ "error": "Provide either 'query' (text search) or 'entity_id'", }) # Resolve image directory from config config = getattr(ctx, "config", None) image_dir = Path( getattr(config, "visual_memory_image_dir", _DEFAULT_IMAGE_DIR) if config else _DEFAULT_IMAGE_DIR ) # Get FalkorDB graph kg = await _get_kg(ctx) if kg is None: return json.dumps({"error": "Knowledge graph not available"}) resolved_eid = entity_id entity_info = None # If no direct entity_id, search by label if not resolved_eid and query: entity_info = await _find_entity_by_label(kg, query) if entity_info: resolved_eid = entity_info["entity_id"] else: # Try text-to-image search via the visual memory engine # (only works if visual_memory is on the ctx) vm = getattr(ctx, "visual_memory", None) if vm is not None: try: text_matches = await vm.query_by_text(query, top_k=3) if text_matches: resolved_eid = text_matches[0].entity_id except Exception: pass if not resolved_eid: return json.dumps({ "success": False, "error": f"No visual entity found matching '{query}'", "suggestion": ( "Try a more specific name or description. " "I can only repost images I've previously " "processed and stored." ), }) # Get the image hash from the sighting graph image_hash = await _get_image_hash(kg, resolved_eid) if not image_hash: return json.dumps({ "success": False, "error": f"No sighting records found for entity {resolved_eid}", }) # Read the image from disk image_path = image_dir / f"{image_hash}.webp" if not image_path.exists(): return json.dumps({ "success": False, "error": ( f"Image file not found on disk for hash {image_hash}. " f"The image may have been processed before storage was enabled " f"or may have been purged." ), }) try: image_bytes = await asyncio.to_thread(image_path.read_bytes) except Exception as e: return json.dumps({ "success": False, "error": f"Failed to read image: {e}", }) # Send the image to the channel if ctx.sent_files is not None: ctx.sent_files.append({ "data": image_bytes, "filename": f"visual_memory_{resolved_eid[:12]}.webp", "mimetype": "image/webp", }) info = entity_info or {} traits_raw = info.get("visual_traits", "") traits_list = [ t.replace("_", " ") for t in (traits_raw.split("|") if traits_raw else []) if t ] return json.dumps({ "success": True, "entity_id": resolved_eid, "label": info.get("label", "unknown"), "entity_type": info.get("entity_type", "unknown"), "visual_traits": traits_list, "image_hash": image_hash, "size_bytes": len(image_bytes), "message": f"Reposting stored image for '{info.get('label', resolved_eid)}'", })