"""Visual Memory Tools - repost, search, and manage visual entities. # 💀🔥
Gives Star the ability to retrieve and re-share stored images from her
visual memory graph, search by text description, and manage entities.
These tools operate directly against FalkorDB and the image storage
directory — they don't need the VisualMemoryEngine instance since the
tools service runs in a separate process from inference.
"""
from __future__ import annotations
import asyncio
import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING
import jsonutil as json
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
# Image storage directory (matches visual_memory.py default)
_DEFAULT_IMAGE_DIR = os.environ.get(
"VISUAL_MEMORY_IMAGE_DIR",
"data/visual_memory/images",
)
# ═══════════════════════════════════════════════════════════════════════
# Tool: repost_image # 📷
# ═══════════════════════════════════════════════════════════════════════
TOOL_NAME = "repost_image"
TOOL_DESCRIPTION = (
"Retrieve and repost a previously seen image from visual memory. "
"Search by entity label (e.g. a person's name), entity ID, or "
"natural language description. The image is sent to the current "
"channel. Use this when someone asks you to show, repost, or share "
"an image you've seen before. Also use it to 'recall' what someone "
"looks like or to show a previously identified object/scene."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"Search query: a person's name/label, or a text "
"description of the image (e.g. 'red car', 'person "
"with glasses'). Used for fuzzy label match first, "
"then SigLIP text-to-image search as fallback."
),
},
"entity_id": {
"type": "string",
"description": (
"Direct entity ID (e.g. 've_01j...') to retrieve. "
"If provided, query is ignored."
),
},
},
"required": [],
}
async def _get_kg(ctx: ToolContext):
"""Get the FalkorDB knowledge graph from the KG manager."""
kg_mgr = getattr(ctx, "kg_manager", None)
if kg_mgr is None:
return None
# The KG manager wraps a FalkorDB graph
return getattr(kg_mgr, "_graph", None) or getattr(kg_mgr, "graph", None)
async def _find_entity_by_label(kg, query: str) -> dict | None:
"""Search for a visual entity by label substring match."""
safe_q = query.replace("'", "\\'").replace("\\", "\\\\")
cypher = (
f"MATCH (v:VisualEntity) "
f"WHERE toLower(v.label) CONTAINS toLower('{safe_q}') "
f"RETURN v.entity_id AS eid, v.label AS label, "
f"v.entity_type AS etype, v.sighting_count AS sc, "
f"v.visual_traits AS traits "
f"ORDER BY v.sighting_count DESC LIMIT 5"
)
try:
result = await kg.ro_query(cypher)
if result.result_set:
row = result.result_set[0]
return {
"entity_id": str(row[0] or ""),
"label": str(row[1] or ""),
"entity_type": str(row[2] or ""),
"sighting_count": int(row[3] or 0),
"visual_traits": str(row[4] or ""),
}
except Exception:
logger.debug("[visual_memory_tools] Label search failed", exc_info=True)
return None
async def _get_image_hash(kg, entity_id: str) -> str | None:
"""Get the most recent image hash for an entity via sighting lookup."""
cypher = (
f"MATCH (v:VisualEntity {{entity_id: '{entity_id}'}})"
f"-[:SEEN_IN]->(s:Sighting) "
f"RETURN s.image_hash AS hash "
f"ORDER BY s.timestamp DESC LIMIT 1"
)
try:
result = await kg.ro_query(cypher)
if result.result_set:
return str(result.result_set[0][0] or "")
except Exception:
logger.debug("[visual_memory_tools] Hash lookup failed", exc_info=True)
return None
[docs]
async def run(
query: str | None = None,
entity_id: str | None = None,
ctx: ToolContext | None = None,
) -> str:
"""Retrieve and repost an image from visual memory. # 📷
Args:
query: Search text (label, name, or description).
entity_id: Direct entity ID to retrieve.
ctx: Tool execution context.
Returns:
str: JSON result with success/error status.
"""
if not ctx:
return json.dumps({"error": "No tool context available"})
if not entity_id and not query:
return json.dumps({
"error": "Provide either 'query' (text search) or 'entity_id'",
})
# Resolve image directory from config
config = getattr(ctx, "config", None)
image_dir = Path(
getattr(config, "visual_memory_image_dir", _DEFAULT_IMAGE_DIR)
if config else _DEFAULT_IMAGE_DIR
)
# Get FalkorDB graph
kg = await _get_kg(ctx)
if kg is None:
return json.dumps({"error": "Knowledge graph not available"})
resolved_eid = entity_id
entity_info = None
# If no direct entity_id, search by label
if not resolved_eid and query:
entity_info = await _find_entity_by_label(kg, query)
if entity_info:
resolved_eid = entity_info["entity_id"]
else:
# Try text-to-image search via the visual memory engine
# (only works if visual_memory is on the ctx)
vm = getattr(ctx, "visual_memory", None)
if vm is not None:
try:
text_matches = await vm.query_by_text(query, top_k=3)
if text_matches:
resolved_eid = text_matches[0].entity_id
except Exception:
pass
if not resolved_eid:
return json.dumps({
"success": False,
"error": f"No visual entity found matching '{query}'",
"suggestion": (
"Try a more specific name or description. "
"I can only repost images I've previously "
"processed and stored."
),
})
# Get the image hash from the sighting graph
image_hash = await _get_image_hash(kg, resolved_eid)
if not image_hash:
return json.dumps({
"success": False,
"error": f"No sighting records found for entity {resolved_eid}",
})
# Read the image from disk
image_path = image_dir / f"{image_hash}.webp"
if not image_path.exists():
return json.dumps({
"success": False,
"error": (
f"Image file not found on disk for hash {image_hash}. "
f"The image may have been processed before storage was enabled "
f"or may have been purged."
),
})
try:
image_bytes = await asyncio.to_thread(image_path.read_bytes)
except Exception as e:
return json.dumps({
"success": False,
"error": f"Failed to read image: {e}",
})
# Send the image to the channel
if ctx.sent_files is not None:
ctx.sent_files.append({
"data": image_bytes,
"filename": f"visual_memory_{resolved_eid[:12]}.webp",
"mimetype": "image/webp",
})
info = entity_info or {}
traits_raw = info.get("visual_traits", "")
traits_list = [
t.replace("_", " ")
for t in (traits_raw.split("|") if traits_raw else [])
if t
]
return json.dumps({
"success": True,
"entity_id": resolved_eid,
"label": info.get("label", "unknown"),
"entity_type": info.get("entity_type", "unknown"),
"visual_traits": traits_list,
"image_hash": image_hash,
"size_bytes": len(image_bytes),
"message": f"Reposting stored image for '{info.get('label', resolved_eid)}'",
})