"""Visual Memory Graph -- cross-channel image pattern recognition for Star.
Gives Star the ability to recognize the same human or object across different
channels by maintaining a persistent visual entity graph in FalkorDB with dual
vector indexes (512d face embeddings via InsightFace/ArcFace, 768d appearance
embeddings via SigLIP).
Pipeline:
1. Image arrives on any platform (Discord, Matrix, WebChat)
2. Face detection + embedding (InsightFace ``buffalo_sc``)
3. General appearance embedding (SigLIP ``so400m-patch14-224``)
4. FalkorDB vector search for matches above threshold
5. Create new VisualEntity or update existing sighting
6. Cache results in Redis for fast context injection
# ππ₯ THE EYES SEE ALL. βΎοΈ
"""
from __future__ import annotations
import asyncio
import hashlib
import io
import logging
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, TYPE_CHECKING
import numpy as np
if TYPE_CHECKING:
import redis.asyncio as aioredis
from knowledge_graph.manager import KnowledgeGraphManager
logger = logging.getLogger(__name__)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Scope constants # π
SCOPE_USER = "user"
SCOPE_GENERAL = "general"
SCOPE_CORE = "core"
# Global admin toggle key (Redis) # π’
_GLOBAL_TOGGLE_KEY = "stargazer:visual_memory_beta"
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Constants # π
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Embedding dimensions
FACE_EMBEDDING_DIM = 512 # InsightFace ArcFace # π
APPEARANCE_EMBEDDING_DIM = 768 # SigLIP # π₯
# Redis keys for visual memory cache
_VISUAL_CACHE_PREFIX = "visual_memory:cache:"
_VISUAL_STATS_PREFIX = "visual_memory:stats:"
# FalkorDB vector index names
_FACE_INDEX_LABEL = "VisualEntity"
_FACE_INDEX_PROP = "face_embedding"
_APPEARANCE_INDEX_PROP = "appearance_embedding"
# Entity scoping tiers # π
# Controls visibility boundaries for visual entity recognition.
SCOPE_USER = "user" # tied to a specific user; only surfaces when owner is present
SCOPE_GENERAL = "general" # visible globally across all channels
SCOPE_CORE = "core" # admin-promoted; always matched, highest priority, never pruned
_VALID_SCOPES = frozenset({SCOPE_USER, SCOPE_GENERAL, SCOPE_CORE})
# Visual profile trait vocabulary for SigLIP scoring # π
# Each tuple is (trait_key, text_query) β SigLIP scores each query against
# a face crop and the top-scoring traits become the entity's profile.
_PROFILE_TRAITS: list[tuple[str, str]] = [
# Hair
("dark_hair", "a person with dark hair"),
("light_hair", "a person with light or blonde hair"),
("red_hair", "a person with red hair"),
("short_hair", "a person with short hair"),
("long_hair", "a person with long hair"),
("bald", "a bald person"),
("curly_hair", "a person with curly hair"),
# Facial features
("beard", "a person with a beard"),
("mustache", "a person with a mustache"),
("clean_shaven", "a clean shaven person"),
# Accessories
("glasses", "a person wearing glasses"),
("sunglasses", "a person wearing sunglasses"),
("hat", "a person wearing a hat or cap"),
("headscarf", "a person wearing a headscarf or hijab"),
# Demographics (broad, non-invasive)
("young", "a young person"),
("middle_aged", "a middle-aged person"),
("elderly", "an elderly person"),
# Expression (snapshot, not identity)
("smiling", "a person smiling"),
]
# How many top-scoring traits to keep per profile
_PROFILE_TOP_K = 5
# Minimum SigLIP similarity for a trait to be considered present
_PROFILE_TRAIT_MIN_SCORE = 0.15
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Data Models # π
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
[docs]
@dataclass
class VisualMatch:
"""Result of a visual entity match against the graph.
Carries the matched entity's identity, similarity score, and metadata
so callers can decide whether to update an existing entity or create
a new one.
"""
entity_id: str
entity_type: str # "face" | "object" | "scene"
label: str
similarity: float
scope: str = SCOPE_GENERAL # "user" | "general" | "core" # π
owner_user_id: str = "" # sg_uuid of owner (scope=user only)
visual_traits: list[str] = field(default_factory=list) # π profile descriptors
linked_person_id: str = "" # KG Person node sg_uuid
sighting_count: int = 0
first_seen: float = 0.0
last_seen: float = 0.0
channels_seen: list[str] = field(default_factory=list)
[docs]
@dataclass
class RecognitionResult:
"""Aggregate recognition output for a single image.
Contains all matched/new entities found in one image, along with
co-occurrence data for entities that appeared together.
"""
matches: list[VisualMatch] = field(default_factory=list)
new_entities: list[str] = field(default_factory=list) # entity_ids
co_occurrences: list[tuple[str, str]] = field(default_factory=list)
processing_time_ms: float = 0.0
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Visual Memory Engine # ππ₯
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
[docs]
class VisualMemoryEngine:
"""Cross-channel visual entity recognition backed by FalkorDB.
Manages the full lifecycle of visual entities: detection, embedding,
matching, sighting tracking, co-occurrence analysis, and context
injection. Models are lazy-loaded on first use to avoid startup cost
when no images are processed.
Thread Safety:
All CPU-heavy model inference runs in ``asyncio.to_thread()`` so
the async event loop is never blocked. FalkorDB queries use the
KG manager's existing concurrency/priority system.
"""
def __init__(
self,
kg_manager: KnowledgeGraphManager | None = None,
redis_client: aioredis.Redis | None = None,
config: Any = None,
) -> None:
self._kg = kg_manager
self._redis = redis_client
self._config = config
# Lazy-loaded models # π·οΈ
self._face_app: Any = None
self._siglip_model: Any = None
self._siglip_processor: Any = None
self._models_lock = asyncio.Lock()
self._models_loaded = False
self._indexes_created = False
# Config with defaults # π
self._face_threshold = float(
getattr(config, "visual_memory_face_threshold", 0.65)
)
self._object_threshold = float(
getattr(config, "visual_memory_object_threshold", 0.75)
)
self._insightface_model = str(
getattr(config, "visual_memory_insightface_model", "buffalo_sc")
)
self._siglip_model_name = str(
getattr(
config,
"visual_memory_siglip_model",
"google/siglip-so400m-patch14-224",
)
)
self._max_entities = int(
getattr(config, "visual_memory_max_entities_per_image", 10)
)
self._cache_ttl = int(
getattr(config, "visual_memory_cache_ttl_seconds", 300)
)
self._min_sightings = int(
getattr(config, "visual_memory_min_sightings_to_report", 2)
)
self._enabled = bool(
getattr(config, "visual_memory_enabled", True)
)
self._text_density_threshold = float(
getattr(config, "visual_memory_text_density_threshold", 0.15)
)
# Image storage directory for reposting # π·
self._image_dir = Path(
getattr(config, "visual_memory_image_dir", "data/visual_memory/images")
)
self._image_dir.mkdir(parents=True, exist_ok=True)
@property
def enabled(self) -> bool:
"""Whether visual memory processing is active."""
return self._enabled and self._kg is not None
async def _is_globally_enabled(self) -> bool:
"""Check the global admin toggle in Redis. # π’
Returns True only if the Redis key ``stargazer:visual_memory_beta``
is set to ``"1"``. Defaults to OFF β an admin must explicitly
enable via ``!visual_memory_beta on``.
"""
if not self._redis:
return False
try:
val = await self._redis.get(_GLOBAL_TOGGLE_KEY)
return val == "1" or val == b"1"
except Exception:
logger.debug(
"[visual_memory] Failed to check global toggle", exc_info=True
)
return False
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Model Loading (lazy, thread-safe) # π
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def _ensure_models(self) -> bool:
"""Load InsightFace and SigLIP models on first use.
Returns True if models are ready, False on failure. Thread-safe
via asyncio.Lock; actual model loading runs in a thread pool.
"""
if self._models_loaded:
return True
async with self._models_lock:
if self._models_loaded:
return True
try:
await asyncio.to_thread(self._load_models_sync)
self._models_loaded = True
logger.info(
"[visual_memory] Models loaded: InsightFace=%s, SigLIP=%s",
self._insightface_model,
self._siglip_model_name,
)
return True
except Exception:
logger.warning(
"[visual_memory] Failed to load vision models",
exc_info=True,
)
return False
def _load_models_sync(self) -> None:
"""Synchronous model loading (runs in thread pool). # π
Loads InsightFace FaceAnalysis with CPU provider and SigLIP
vision model + processor from HuggingFace transformers.
"""
# --- InsightFace ---
try:
from insightface.app import FaceAnalysis
self._face_app = FaceAnalysis(
name=self._insightface_model,
providers=["CPUExecutionProvider"],
)
self._face_app.prepare(ctx_id=-1, det_size=(640, 640))
logger.info(
"[visual_memory] InsightFace loaded: %s",
self._insightface_model,
)
except ImportError:
logger.warning(
"[visual_memory] insightface not installed, face recognition disabled"
)
self._face_app = None
except Exception:
logger.warning(
"[visual_memory] InsightFace init failed", exc_info=True
)
self._face_app = None
# --- SigLIP ---
try:
from transformers import AutoProcessor, AutoModel
self._siglip_processor = AutoProcessor.from_pretrained(
self._siglip_model_name
)
self._siglip_model = AutoModel.from_pretrained(
self._siglip_model_name
)
self._siglip_model.eval()
logger.info(
"[visual_memory] SigLIP loaded: %s", self._siglip_model_name
)
except ImportError:
logger.warning(
"[visual_memory] transformers not installed, "
"object/scene recognition disabled"
)
self._siglip_model = None
self._siglip_processor = None
except Exception:
logger.warning(
"[visual_memory] SigLIP init failed", exc_info=True
)
self._siglip_model = None
self._siglip_processor = None
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# FalkorDB Index Management # βΎοΈ
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
[docs]
async def ensure_indexes(self) -> None:
"""Create the dual vector indexes for VisualEntity nodes.
Creates separate HNSW indexes for face (512d) and appearance
(768d) embeddings. Idempotent -- skips if already exists.
Also creates range indexes on Sighting nodes.
"""
if self._indexes_created or self._kg is None:
return
# Face embedding index (512d)
try:
await self._kg.query(
"CREATE VECTOR INDEX FOR (v:VisualEntity) "
"ON (v.face_embedding) "
"OPTIONS {dimension: 512, similarityFunction: 'cosine'}"
)
logger.info("[visual_memory] Face embedding vector index created (512d)")
except Exception as exc:
msg = str(exc).lower()
if "already" not in msg:
logger.warning(
"[visual_memory] Face index creation error: %s", exc
)
else:
logger.debug("[visual_memory] Face index already exists")
# Appearance embedding index (768d)
try:
await self._kg.query(
"CREATE VECTOR INDEX FOR (v:VisualEntity) "
"ON (v.appearance_embedding) "
"OPTIONS {dimension: 768, similarityFunction: 'cosine'}"
)
logger.info(
"[visual_memory] Appearance embedding vector index created (768d)"
)
except Exception as exc:
msg = str(exc).lower()
if "already" not in msg:
logger.warning(
"[visual_memory] Appearance index creation error: %s", exc
)
else:
logger.debug("[visual_memory] Appearance index already exists")
# Range indexes on Sighting nodes
for prop in ("timestamp", "channel_id", "platform", "image_hash"):
try:
await self._kg.query(
f"CREATE INDEX FOR (s:Sighting) ON (s.{prop})"
)
except Exception:
pass # already exists or other benign error
# Range indexes on VisualEntity
for prop in ("entity_type", "label", "entity_id", "first_seen", "last_seen", "scope", "owner_user_id"):
try:
await self._kg.query(
f"CREATE INDEX FOR (v:VisualEntity) ON (v.{prop})"
)
except Exception:
pass
self._indexes_created = True
logger.info("[visual_memory] All visual memory indexes verified")
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Text-Heavy Image Filter # π«
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _is_text_heavy_sync(
self, image_bytes: bytes,
) -> bool:
"""Detect text-heavy images (screenshots, code, documents, etc.).
Uses two lightweight OpenCV heuristics that run in ~1ms on CPU:
1. **Canny edge density** -- text creates very dense fine edges
(thin horizontal/vertical strokes). Natural photos have sparse,
organic edges. A ratio > threshold indicates text dominance.
2. **Low color channel variance** -- text images are mostly
monochrome (black text on white, terminal green on black, etc.).
Natural photos have rich, varied color distributions.
Both signals must agree before an image is rejected, which
prevents false positives on high-texture natural scenes (bark,
gravel, fabric) that have high edge density but also rich color.
Returns True if the image should be skipped.
"""
try:
import cv2
nparr = np.frombuffer(image_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
return False # can't decode = let downstream handle it
h, w = img.shape[:2]
total_pixels = h * w
if total_pixels < 100:
return False
# --- Heuristic 1: Canny edge density ---
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
edge_ratio = float(np.count_nonzero(edges)) / total_pixels
# --- Heuristic 2: Color channel variance ---
# Low per-channel std dev means monochrome (text-like).
# Natural photos typically have std > 40 on at least one channel.
channel_stds = [float(img[:, :, c].std()) for c in range(3)]
max_channel_std = max(channel_stds)
low_color = max_channel_std < 35.0
# --- Heuristic 3: Grayscale uniformity ---
# Text images often have bimodal histograms (text color + bg).
# Compute the ratio of pixels near the two dominant intensity
# peaks. High bimodality + high edges = text.
hist = cv2.calcHist([gray], [0], None, [16], [0, 256]).flatten()
hist_norm = hist / hist.sum()
top2 = float(np.sort(hist_norm)[-2:][::-1].sum())
bimodal = top2 > 0.55 # top 2 bins hold >55% of pixels
is_text = edge_ratio > self._text_density_threshold and (
low_color or bimodal
)
if is_text:
logger.debug(
"[visual_memory] Skipping text-heavy image "
"(edge_ratio=%.3f, max_std=%.1f, bimodal=%.2f)",
edge_ratio,
max_channel_std,
top2,
)
return is_text
except Exception:
logger.debug(
"[visual_memory] Text detection failed, proceeding",
exc_info=True,
)
return False # fail-open: process the image anyway
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Embedding Extraction # π§¬
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _extract_faces_sync(
self, image_bytes: bytes,
) -> list[dict[str, Any]]:
"""Extract face embeddings from raw image bytes (sync, thread-pool).
Returns a list of dicts, each with:
- embedding: np.ndarray (512d, L2-normalized)
- bbox: [x1, y1, x2, y2]
- det_score: float (detection confidence)
"""
if self._face_app is None:
return []
try:
import cv2
nparr = np.frombuffer(image_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
return []
faces = self._face_app.get(img)
results = []
for face in faces[: self._max_entities]:
emb = getattr(face, "normed_embedding", None)
if emb is None:
emb = getattr(face, "embedding", None)
if emb is not None:
norm = np.linalg.norm(emb)
if norm > 0:
emb = emb / norm
if emb is not None:
results.append(
{
"embedding": emb.astype(np.float32),
"bbox": face.bbox.tolist()
if hasattr(face, "bbox")
else [],
"det_score": float(
getattr(face, "det_score", 0.0)
),
}
)
return results
except Exception:
logger.debug(
"[visual_memory] Face extraction failed", exc_info=True
)
return []
def _extract_appearance_sync(
self, image_bytes: bytes,
) -> np.ndarray | None:
"""Extract SigLIP appearance embedding (sync, thread-pool).
Returns a 768d L2-normalized numpy array, or None on failure.
"""
if self._siglip_model is None or self._siglip_processor is None:
return None
try:
import torch
from PIL import Image
img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
inputs = self._siglip_processor(images=img, return_tensors="pt")
with torch.no_grad():
features = self._siglip_model.get_image_features(**inputs)
# L2 normalize
emb = features[0].numpy().astype(np.float32)
norm = np.linalg.norm(emb)
if norm > 0:
emb = emb / norm
return emb
except Exception:
logger.debug(
"[visual_memory] Appearance extraction failed", exc_info=True
)
return None
def _extract_visual_profile_sync(
self,
image_bytes: bytes,
bbox: list[float] | None = None,
) -> list[str]:
"""Score a face crop against the trait vocabulary via SigLIP. # π
Crops the image to the face bounding box (if provided), then
computes text-image similarity for each trait query. Returns
the top-K trait keys whose scores exceed the minimum threshold.
This gives Star human-readable descriptors to differentiate
between users: 'dark_hair, glasses' vs 'blonde, beard'.
"""
if self._siglip_model is None or self._siglip_processor is None:
return []
try:
import torch
from PIL import Image
img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
# Crop to face region if bbox available (with padding)
if bbox and len(bbox) == 4:
w, h = img.size
x1, y1, x2, y2 = bbox
# Add 30% padding around face for context (hair, accessories)
pad_w = (x2 - x1) * 0.3
pad_h = (y2 - y1) * 0.3
x1 = max(0, int(x1 - pad_w))
y1 = max(0, int(y1 - pad_h))
x2 = min(w, int(x2 + pad_w))
y2 = min(h, int(y2 + pad_h))
if x2 > x1 and y2 > y1:
img = img.crop((x1, y1, x2, y2))
# Score all trait queries at once (batched)
trait_texts = [t[1] for t in _PROFILE_TRAITS]
inputs = self._siglip_processor(
text=trait_texts,
images=img,
return_tensors="pt",
padding=True,
)
with torch.no_grad():
outputs = self._siglip_model(**inputs)
# SigLIP logits_per_image: [1, num_texts]
logits = outputs.logits_per_image[0]
# Sigmoid for independent probabilities (SigLIP uses sigmoid, not softmax)
probs = torch.sigmoid(logits).numpy()
# Pick top traits above threshold
scored = [
(_PROFILE_TRAITS[i][0], float(probs[i]))
for i in range(len(_PROFILE_TRAITS))
if float(probs[i]) > _PROFILE_TRAIT_MIN_SCORE
]
scored.sort(key=lambda x: x[1], reverse=True)
top_traits = [s[0] for s in scored[:_PROFILE_TOP_K]]
if top_traits:
logger.debug(
"[visual_memory] Profile traits extracted: %s",
", ".join(top_traits),
)
return top_traits
except Exception:
logger.debug(
"[visual_memory] Profile extraction failed", exc_info=True
)
return []
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# FalkorDB Match & Store # π₯
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def _match_face(
self, embedding: np.ndarray, top_k: int = 5,
*, scope_user_id: str = "",
) -> list[VisualMatch]:
"""Search FalkorDB for face embedding matches.
Scope filtering: user-scoped entities only match when
``scope_user_id`` matches their ``owner_user_id``, or when
they're scope=general/core (always visible).
"""
if self._kg is None:
return []
vec_str = ", ".join(f"{v:.8f}" for v in embedding.tolist())
# Build scope-aware WHERE clause # π
scope_filter = f"score >= {self._face_threshold}"
if scope_user_id:
# user-scoped entities only match their owner; general/core always match
scope_filter += (
f" AND (node.scope <> '{SCOPE_USER}' "
f"OR node.owner_user_id = '{scope_user_id}')"
)
query = (
f"CALL db.idx.vector.queryNodes("
f"'VisualEntity', 'face_embedding', {top_k}, "
f"vecf32([{vec_str}])) "
f"YIELD node, score "
f"WHERE {scope_filter} "
f"RETURN node.entity_id AS eid, node.entity_type AS etype, "
f"node.label AS label, score, "
f"node.sighting_count AS sc, "
f"node.first_seen AS fs, node.last_seen AS ls, "
f"node.scope AS scope, node.owner_user_id AS owner, "
f"node.visual_traits AS traits, node.linked_person_id AS pid"
)
try:
result = await self._kg.ro_query(query)
matches = []
for row in result.result_set or []:
traits_raw = str(row[9] or "")
matches.append(
VisualMatch(
entity_id=str(row[0] or ""),
entity_type=str(row[1] or "face"),
label=str(row[2] or ""),
similarity=float(row[3] or 0),
sighting_count=int(row[4] or 0),
first_seen=float(row[5] or 0),
last_seen=float(row[6] or 0),
scope=str(row[7] or SCOPE_GENERAL),
owner_user_id=str(row[8] or ""),
visual_traits=traits_raw.split("|") if traits_raw else [],
linked_person_id=str(row[10] or ""),
)
)
return matches
except Exception:
logger.debug(
"[visual_memory] Face match query failed", exc_info=True
)
return []
async def _match_appearance(
self, embedding: np.ndarray, top_k: int = 5,
*, scope_user_id: str = "",
) -> list[VisualMatch]:
"""Search FalkorDB for appearance embedding matches.
Same scope filtering as ``_match_face``.
"""
if self._kg is None:
return []
vec_str = ", ".join(f"{v:.8f}" for v in embedding.tolist())
scope_filter = f"score >= {self._object_threshold}"
if scope_user_id:
scope_filter += (
f" AND (node.scope <> '{SCOPE_USER}' "
f"OR node.owner_user_id = '{scope_user_id}')"
)
query = (
f"CALL db.idx.vector.queryNodes("
f"'VisualEntity', 'appearance_embedding', {top_k}, "
f"vecf32([{vec_str}])) "
f"YIELD node, score "
f"WHERE {scope_filter} "
f"RETURN node.entity_id AS eid, node.entity_type AS etype, "
f"node.label AS label, score, "
f"node.sighting_count AS sc, "
f"node.first_seen AS fs, node.last_seen AS ls, "
f"node.scope AS scope, node.owner_user_id AS owner, "
f"node.visual_traits AS traits, node.linked_person_id AS pid"
)
try:
result = await self._kg.ro_query(query)
matches = []
for row in result.result_set or []:
traits_raw = str(row[9] or "")
matches.append(
VisualMatch(
entity_id=str(row[0] or ""),
entity_type=str(row[1] or "object"),
label=str(row[2] or ""),
similarity=float(row[3] or 0),
sighting_count=int(row[4] or 0),
first_seen=float(row[5] or 0),
last_seen=float(row[6] or 0),
scope=str(row[7] or SCOPE_GENERAL),
owner_user_id=str(row[8] or ""),
visual_traits=traits_raw.split("|") if traits_raw else [],
linked_person_id=str(row[10] or ""),
)
)
return matches
except Exception:
logger.debug(
"[visual_memory] Appearance match query failed", exc_info=True
)
return []
async def _create_entity(
self,
entity_type: str,
face_embedding: np.ndarray | None,
appearance_embedding: np.ndarray | None,
channel_id: str,
platform: str,
user_id: str,
image_hash: str,
timestamp: float,
visual_traits: list[str] | None = None,
) -> str:
"""Create a new VisualEntity node in FalkorDB. # π
Auto-assigns scope based on entity type:
- face β ``user`` (tied to uploader's identity)
- object/scene β ``general`` (visible everywhere)
When ``visual_traits`` is provided, stores the profile on the
node for human-readable differentiation.
Returns the generated entity_id.
"""
if self._kg is None:
return ""
from uuid6 import uuid7
entity_id = f"ve_{uuid7()}"
now = timestamp or time.time()
# Auto-assign scope: faces are user-scoped, objects are general # π
scope = SCOPE_USER if entity_type == "face" else SCOPE_GENERAL
# Encode visual traits as pipe-delimited string for FalkorDB
traits_str = "|".join(visual_traits) if visual_traits else ""
# Build SET clauses for embeddings
set_parts = [
f"v.entity_id = '{entity_id}'",
f"v.entity_type = '{entity_type}'",
f"v.label = 'unknown_{entity_type}_{entity_id[:8]}'",
f"v.scope = '{scope}'",
f"v.owner_user_id = '{user_id}'",
f"v.visual_traits = '{traits_str}'",
f"v.linked_person_id = ''",
f"v.first_seen = {now}",
f"v.last_seen = {now}",
f"v.ingested_at = {time.time()}",
"v.sighting_count = 1",
f"v.created_by_user = '{user_id}'",
]
if face_embedding is not None:
vec_str = ", ".join(f"{v:.8f}" for v in face_embedding.tolist())
set_parts.append(f"v.face_embedding = vecf32([{vec_str}])")
if appearance_embedding is not None:
vec_str = ", ".join(
f"{v:.8f}" for v in appearance_embedding.tolist()
)
set_parts.append(f"v.appearance_embedding = vecf32([{vec_str}])")
set_clause = ", ".join(set_parts)
query = f"CREATE (v:VisualEntity) SET {set_clause}"
try:
await self._kg.query(query)
logger.info(
"[visual_memory] Created VisualEntity %s (%s) from %s:%s",
entity_id,
entity_type,
platform,
channel_id,
)
except Exception:
logger.warning(
"[visual_memory] Failed to create VisualEntity",
exc_info=True,
)
return ""
# Create initial sighting
await self._record_sighting(
entity_id, channel_id, platform, image_hash, timestamp, 1.0
)
return entity_id
async def _update_sighting(
self,
entity_id: str,
channel_id: str,
platform: str,
image_hash: str,
timestamp: float,
similarity: float,
) -> None:
"""Update an existing entity's sighting count and last_seen. # π"""
if self._kg is None:
return
now = timestamp or time.time()
query = (
f"MATCH (v:VisualEntity {{entity_id: '{entity_id}'}}) "
f"SET v.last_seen = {now}, "
f"v.sighting_count = v.sighting_count + 1"
)
try:
await self._kg.query(query)
except Exception:
logger.debug(
"[visual_memory] Failed to update sighting count",
exc_info=True,
)
await self._record_sighting(
entity_id, channel_id, platform, image_hash, timestamp, similarity
)
async def _record_sighting(
self,
entity_id: str,
channel_id: str,
platform: str,
image_hash: str,
timestamp: float,
confidence: float,
) -> None:
"""Create a Sighting node and SEEN_IN relationship. # π"""
if self._kg is None:
return
from uuid6 import uuid7
sighting_id = f"sight_{uuid7()}"
now = timestamp or time.time()
query = (
f"MATCH (v:VisualEntity {{entity_id: '{entity_id}'}}) "
f"CREATE (s:Sighting {{"
f"sighting_id: '{sighting_id}', "
f"timestamp: {now}, "
f"channel_id: '{channel_id}', "
f"platform: '{platform}', "
f"image_hash: '{image_hash}', "
f"confidence: {confidence}"
f"}}) "
f"CREATE (v)-[:SEEN_IN {{timestamp: {now}, confidence: {confidence}}}]->(s)"
)
try:
await self._kg.query(query)
except Exception:
logger.debug(
"[visual_memory] Failed to record sighting", exc_info=True
)
async def _record_co_occurrence(
self, entity_id_a: str, entity_id_b: str, timestamp: float,
) -> None:
"""Create or strengthen an APPEARS_WITH edge between two entities. # π·οΈ"""
if self._kg is None or entity_id_a == entity_id_b:
return
# Sort IDs for consistent edge direction
a, b = sorted([entity_id_a, entity_id_b])
now = timestamp or time.time()
query = (
f"MATCH (va:VisualEntity {{entity_id: '{a}'}}), "
f"(vb:VisualEntity {{entity_id: '{b}'}}) "
f"MERGE (va)-[r:APPEARS_WITH]->(vb) "
f"ON CREATE SET r.count = 1, r.first_seen = {now}, r.last_seen = {now} "
f"ON MATCH SET r.count = r.count + 1, r.last_seen = {now}"
)
try:
await self._kg.query(query)
except Exception:
logger.debug(
"[visual_memory] Failed to record co-occurrence",
exc_info=True,
)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Image Storage for Reposting # π·
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _store_image_sync(
self,
image_hash: str,
image_bytes: bytes,
mimetype: str,
) -> str | None:
"""Persist image bytes to disk as WebP for later retrieval.
Returns the file path on success, or None on failure.
Images are stored as ``{image_hash}.webp`` in the configured
image directory. If the file already exists, this is a no-op.
"""
try:
from PIL import Image
# Check if already stored
out_path = self._image_dir / f"{image_hash}.webp"
if out_path.exists():
return str(out_path)
# Convert to WebP for consistent format + compression
img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
# Cap resolution to 1024px max dimension to save disk
max_dim = 1024
if max(img.size) > max_dim:
img.thumbnail((max_dim, max_dim), Image.LANCZOS)
img.save(out_path, format="WEBP", quality=85)
logger.debug(
"[visual_memory] Stored image %s (%d bytes -> %s)",
image_hash,
len(image_bytes),
out_path,
)
return str(out_path)
except Exception:
logger.debug(
"[visual_memory] Failed to store image", exc_info=True
)
return None
[docs]
async def retrieve_image(
self, entity_id: str,
) -> tuple[bytes, str] | None:
"""Retrieve stored image bytes for a visual entity. # π·
Looks up the entity's most recent sighting image_hash,
then reads the corresponding WebP file from disk.
Returns (image_bytes, "image/webp") or None if not found.
"""
if self._kg is None:
return None
# Get the most recent image_hash for this entity
query = (
f"MATCH (v:VisualEntity {{entity_id: '{entity_id}'}})"
f"-[:SEEN_IN]->(s:Sighting) "
f"RETURN s.image_hash AS hash "
f"ORDER BY s.timestamp DESC LIMIT 1"
)
try:
result = await self._kg.ro_query(query)
if not result.result_set:
return None
image_hash = str(result.result_set[0][0] or "")
if not image_hash:
return None
image_path = self._image_dir / f"{image_hash}.webp"
if not image_path.exists():
return None
data = await asyncio.to_thread(image_path.read_bytes)
return data, "image/webp"
except Exception:
logger.debug(
"[visual_memory] Failed to retrieve image", exc_info=True
)
return None
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# KG Person Linking # π
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def _link_to_person(
self, entity_id: str, user_id: str,
) -> bool:
"""Create VISUALLY_IDENTIFIED_AS edge from entity to Person.
Links a face VisualEntity to the KG Person node matching
the user_id. Also sets ``linked_person_id`` on the entity.
This is fire-and-forget from the pipeline.
"""
if self._kg is None:
return False
# Try to find Person node by sg_uuid
query = (
f"MATCH (v:VisualEntity {{entity_id: '{entity_id}'}}), "
f"(p:Person {{sg_uuid: '{user_id}'}}) "
f"SET v.linked_person_id = '{user_id}' "
f"MERGE (v)-[r:VISUALLY_IDENTIFIED_AS]->(p) "
f"ON CREATE SET r.created_at = {time.time()} "
f"RETURN v.entity_id"
)
try:
result = await self._kg.query(query)
linked = bool(result.result_set)
if linked:
logger.info(
"[visual_memory] Linked entity %s -> Person %s",
entity_id,
user_id,
)
return linked
except Exception:
logger.debug(
"[visual_memory] Failed to link entity to person",
exc_info=True,
)
return False
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Main Processing Pipeline # ππ₯
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def _extract_and_match(
self,
image_bytes: bytes,
mimetype: str,
channel_id: str,
platform: str,
user_id: str,
timestamp: float,
) -> RecognitionResult:
"""Core pipeline: extract embeddings, match against graph,
create/update entities, track co-occurrences.
This is the heart of the visual memory system. # βΎοΈ
"""
t0 = time.monotonic()
result = RecognitionResult()
if not image_bytes or len(image_bytes) < 100:
return result
# --- Text-heavy image filter --- # π«
# Skip screenshots, code snippets, documents, etc. before
# wasting CPU on face/object embeddings.
is_text = await asyncio.to_thread(
self._is_text_heavy_sync, image_bytes
)
if is_text:
logger.info(
"[visual_memory] Skipped text-heavy image in %s:%s",
platform,
channel_id,
)
result.processing_time_ms = (time.monotonic() - t0) * 1000
return result
# Image hash for dedup
image_hash = hashlib.sha256(image_bytes).hexdigest()[:16]
# --- Extract embeddings in parallel threads ---
face_task = asyncio.to_thread(self._extract_faces_sync, image_bytes)
appearance_task = asyncio.to_thread(
self._extract_appearance_sync, image_bytes
)
faces, appearance_emb = await asyncio.gather(
face_task, appearance_task
)
all_entity_ids: list[str] = []
# --- Store image to disk for reposting --- # π·
await asyncio.to_thread(
self._store_image_sync, image_hash, image_bytes, mimetype
)
# --- Process each detected face ---
for face_data in faces:
face_emb = face_data["embedding"]
# Extract visual profile traits for this face # π
face_traits = await asyncio.to_thread(
self._extract_visual_profile_sync,
image_bytes,
face_data.get("bbox"),
)
matches = await self._match_face(
face_emb, scope_user_id=user_id,
)
if matches:
# Best match wins
best = matches[0]
result.matches.append(best)
all_entity_ids.append(best.entity_id)
await self._update_sighting(
best.entity_id,
channel_id,
platform,
image_hash,
timestamp,
best.similarity,
)
logger.info(
"[visual_memory] Face MATCH: %s (%.2f) in %s:%s",
best.entity_id,
best.similarity,
platform,
channel_id,
)
else:
# New face entity
entity_id = await self._create_entity(
"face",
face_emb,
appearance_emb, # also store appearance for this face
channel_id,
platform,
user_id,
image_hash,
timestamp,
visual_traits=face_traits,
)
if entity_id:
result.new_entities.append(entity_id)
all_entity_ids.append(entity_id)
# Auto-link face to KG Person if user_id known # π
if user_id:
asyncio.create_task(
self._link_to_person(entity_id, user_id),
name=f"visual_link_person_{entity_id[:8]}",
)
# --- Process whole-image appearance (if no faces, or as supplement) ---
if appearance_emb is not None and not faces:
# Only create a standalone appearance entity when no faces found
app_matches = await self._match_appearance(
appearance_emb, scope_user_id=user_id,
)
if app_matches:
best = app_matches[0]
result.matches.append(best)
all_entity_ids.append(best.entity_id)
await self._update_sighting(
best.entity_id,
channel_id,
platform,
image_hash,
timestamp,
best.similarity,
)
logger.info(
"[visual_memory] Appearance MATCH: %s (%.2f) in %s:%s",
best.entity_id,
best.similarity,
platform,
channel_id,
)
else:
entity_id = await self._create_entity(
"object",
None,
appearance_emb,
channel_id,
platform,
user_id,
image_hash,
timestamp,
)
if entity_id:
result.new_entities.append(entity_id)
all_entity_ids.append(entity_id)
# --- Co-occurrence tracking --- # π·οΈ
if len(all_entity_ids) > 1:
for i, eid_a in enumerate(all_entity_ids):
for eid_b in all_entity_ids[i + 1 :]:
await self._record_co_occurrence(eid_a, eid_b, timestamp)
result.co_occurrences.append((eid_a, eid_b))
result.processing_time_ms = (time.monotonic() - t0) * 1000
return result
[docs]
async def process_message_images(
self, msg: Any,
) -> list[RecognitionResult]:
"""Main entry point: process all image attachments in a message.
Called as a fire-and-forget background task from the message
pipeline. Results are cached in Redis for context injection.
Args:
msg: An IncomingMessage with attachments.
Returns:
List of RecognitionResult, one per image processed.
"""
if not self.enabled:
return []
# Global admin toggle β check Redis # π’
if not await self._is_globally_enabled():
return []
# Filter image attachments
from platforms.base import Attachment
image_attachments: list[Attachment] = [
a
for a in (getattr(msg, "attachments", None) or [])
if (getattr(a, "mimetype", "") or "").lower().startswith("image/")
and getattr(a, "data", b"")
]
if not image_attachments:
return []
# Ensure models are loaded
if not await self._ensure_models():
return []
# Ensure indexes exist
await self.ensure_indexes()
timestamp = time.time()
channel_id = getattr(msg, "channel_id", "")
platform = getattr(msg, "platform", "")
user_id = getattr(msg, "user_id", "")
results: list[RecognitionResult] = []
for att in image_attachments:
try:
result = await self._extract_and_match(
att.data,
att.mimetype,
channel_id,
platform,
user_id,
timestamp,
)
results.append(result)
if result.matches or result.new_entities:
logger.info(
"[visual_memory] Processed image from %s:%s -- "
"%d matches, %d new entities (%.0f ms)",
platform,
channel_id,
len(result.matches),
len(result.new_entities),
result.processing_time_ms,
)
except Exception:
logger.warning(
"[visual_memory] Failed to process image attachment",
exc_info=True,
)
# Cache results for context injection # π
if results and self._redis:
await self._cache_results(channel_id, platform, results)
return results
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Redis Cache for Context Injection # π
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def _cache_results(
self,
channel_id: str,
platform: str,
results: list[RecognitionResult],
) -> None:
"""Cache recognition results in Redis for fast context injection."""
if not self._redis:
return
import jsonutil as json
cache_key = f"{_VISUAL_CACHE_PREFIX}{platform}:{channel_id}"
payload = {
"timestamp": time.time(),
"matches": [
{
"entity_id": m.entity_id,
"entity_type": m.entity_type,
"label": m.label,
"similarity": round(m.similarity, 3),
"sighting_count": m.sighting_count,
"first_seen": m.first_seen,
"last_seen": m.last_seen,
"scope": m.scope,
"owner_user_id": m.owner_user_id,
"visual_traits": m.visual_traits,
"linked_person_id": m.linked_person_id,
}
for r in results
for m in r.matches
],
"new_entities": [
eid for r in results for eid in r.new_entities
],
"co_occurrences": [
list(pair) for r in results for pair in r.co_occurrences
],
}
try:
await self._redis.set(
cache_key,
json.dumps(payload),
ex=self._cache_ttl,
)
except Exception:
logger.debug(
"[visual_memory] Failed to cache results", exc_info=True
)
[docs]
async def get_visual_context(
self, channel_id: str, platform: str,
) -> str | None:
"""Retrieve formatted visual memory context for injection. # π₯
Reads cached recognition results from Redis and formats them
as a context block Star can reference in her response.
Returns None if no recent visual memories are available.
"""
if not self._redis:
return None
# Global admin toggle β skip if disabled # π’
if not await self._is_globally_enabled():
return None
import jsonutil as json
cache_key = f"{_VISUAL_CACHE_PREFIX}{platform}:{channel_id}"
try:
raw = await self._redis.get(cache_key)
if not raw:
return None
data = json.loads(raw)
matches = data.get("matches", [])
new_entities = data.get("new_entities", [])
if not matches and not new_entities:
return None
lines: list[str] = []
# Report recognized entities (only those seen before)
# Scope-aware: user-scoped entities only shown if owner
# is present; general/core always shown. # π
for m in matches:
sc = m.get("sighting_count", 0)
if sc < self._min_sightings:
continue
m_scope = m.get("scope", SCOPE_GENERAL)
m_owner = m.get("owner_user_id", "")
# user-scoped: skip if we don't know who's in the channel
# (the scope filter already ran at query time, but this
# is a safety net for cached results across turns)
scope_tag = ""
if m_scope == SCOPE_CORE:
scope_tag = " [CORE]"
elif m_scope == SCOPE_USER and m_owner:
scope_tag = f" [user-linked: {m_owner[:8]}]"
sim_pct = round(m.get("similarity", 0) * 100, 1)
etype = m.get("entity_type", "entity")
label = m.get("label", "unknown")
first_ts = m.get("first_seen", 0)
# Format first seen as relative time
age_str = _format_age(first_ts) if first_ts else "unknown"
# Visual profile traits for differentiation # π
traits = m.get("visual_traits", [])
traits_str = ""
if traits and isinstance(traits, list):
readable = [t.replace("_", " ") for t in traits if t]
if readable:
traits_str = f" | traits: {', '.join(readable)}"
# Person association
person_id = m.get("linked_person_id", "")
person_str = ""
if person_id:
person_str = f" | linked to user {person_id[:8]}"
lines.append(
f"- Recognized {etype}: **{label}**{scope_tag} "
f"(confidence: {sim_pct}%, "
f"seen {sc} times, "
f"first seen {age_str}"
f"{traits_str}{person_str})"
)
if new_entities:
lines.append(
f"- {len(new_entities)} new visual "
f"{'entity' if len(new_entities) == 1 else 'entities'} "
f"catalogued for future recognition"
)
if not lines:
return None
return (
"[VISUAL MEMORY RECALL -- "
"pattern recognition results from image analysis]\n"
+ "\n".join(lines)
)
except Exception:
logger.debug(
"[visual_memory] Failed to read visual context cache",
exc_info=True,
)
return None
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Cross-Modal Text-to-Image Query # βΎοΈ
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
[docs]
async def query_by_text(
self, text: str, top_k: int = 5,
) -> list[VisualMatch]:
"""Search visual entities by text description via SigLIP.
Uses SigLIP's text encoder to embed the query, then searches
the appearance_embedding index. Enables queries like
"find images with that red car" or "person in blue jacket".
"""
if (
self._siglip_model is None
or self._siglip_processor is None
or self._kg is None
):
return []
try:
import torch
def _encode_text() -> np.ndarray:
inputs = self._siglip_processor(
text=[text], return_tensors="pt", padding=True
)
with torch.no_grad():
features = self._siglip_model.get_text_features(**inputs)
emb = features[0].numpy().astype(np.float32)
norm = np.linalg.norm(emb)
if norm > 0:
emb = emb / norm
return emb
text_emb = await asyncio.to_thread(_encode_text)
return await self._match_appearance(text_emb, top_k=top_k)
except Exception:
logger.debug(
"[visual_memory] Text-to-image query failed", exc_info=True
)
return []
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Entity Label Management # π
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
[docs]
async def label_entity(
self, entity_id: str, label: str,
) -> bool:
"""Assign a human-readable label to a visual entity.
Called by admin tools or by Star herself when she learns
someone's name. Returns True on success.
"""
if self._kg is None:
return False
# Sanitize label for Cypher
safe_label = label.replace("'", "\\'").replace("\\", "\\\\")
query = (
f"MATCH (v:VisualEntity {{entity_id: '{entity_id}'}}) "
f"SET v.label = '{safe_label}' "
f"RETURN v.entity_id"
)
try:
result = await self._kg.query(query)
return bool(result.result_set)
except Exception:
logger.debug(
"[visual_memory] Failed to label entity", exc_info=True
)
return False
[docs]
async def set_scope(
self, entity_id: str, scope: str,
*, owner_user_id: str | None = None,
) -> bool:
"""Change the visibility scope of a visual entity. # π
Valid scopes: ``user``, ``general``, ``core``.
When setting scope to ``user``, an ``owner_user_id`` should be
provided. When promoting to ``core`` or ``general``, the owner
is cleared (entity becomes globally visible).
Returns True on success.
"""
if self._kg is None:
return False
if scope not in _VALID_SCOPES:
logger.warning(
"[visual_memory] Invalid scope '%s' for entity %s",
scope,
entity_id,
)
return False
set_parts = [f"v.scope = '{scope}'"]
if scope == SCOPE_USER and owner_user_id:
set_parts.append(f"v.owner_user_id = '{owner_user_id}'")
elif scope in (SCOPE_GENERAL, SCOPE_CORE):
# Clear owner restriction for global visibility
set_parts.append("v.owner_user_id = ''")
set_clause = ", ".join(set_parts)
query = (
f"MATCH (v:VisualEntity {{entity_id: '{entity_id}'}}) "
f"SET {set_clause} "
f"RETURN v.entity_id"
)
try:
result = await self._kg.query(query)
ok = bool(result.result_set)
if ok:
logger.info(
"[visual_memory] Scope changed: %s -> %s",
entity_id,
scope,
)
return ok
except Exception:
logger.debug(
"[visual_memory] Failed to set scope", exc_info=True
)
return False
[docs]
async def get_entity_history(
self, entity_id: str, limit: int = 20,
) -> list[dict[str, Any]]:
"""Get sighting history for a visual entity. # π
Returns a list of sighting records ordered by timestamp desc.
"""
if self._kg is None:
return []
query = (
f"MATCH (v:VisualEntity {{entity_id: '{entity_id}'}})"
f"-[:SEEN_IN]->(s:Sighting) "
f"RETURN s.sighting_id AS sid, s.timestamp AS ts, "
f"s.channel_id AS ch, s.platform AS pl, "
f"s.confidence AS conf "
f"ORDER BY s.timestamp DESC LIMIT {limit}"
)
try:
result = await self._kg.ro_query(query)
return [
{
"sighting_id": str(row[0] or ""),
"timestamp": float(row[1] or 0),
"channel_id": str(row[2] or ""),
"platform": str(row[3] or ""),
"confidence": float(row[4] or 0),
}
for row in (result.result_set or [])
]
except Exception:
logger.debug(
"[visual_memory] Failed to get entity history",
exc_info=True,
)
return []
[docs]
async def get_co_occurrences(
self, entity_id: str,
) -> list[dict[str, Any]]:
"""Get entities that frequently appear alongside this one. # π·οΈ"""
if self._kg is None:
return []
query = (
f"MATCH (v:VisualEntity {{entity_id: '{entity_id}'}})"
f"-[r:APPEARS_WITH]-(other:VisualEntity) "
f"RETURN other.entity_id AS eid, other.label AS label, "
f"other.entity_type AS etype, r.count AS cnt "
f"ORDER BY r.count DESC LIMIT 10"
)
try:
result = await self._kg.ro_query(query)
return [
{
"entity_id": str(row[0] or ""),
"label": str(row[1] or ""),
"entity_type": str(row[2] or ""),
"co_occurrence_count": int(row[3] or 0),
}
for row in (result.result_set or [])
]
except Exception:
logger.debug(
"[visual_memory] Failed to get co-occurrences",
exc_info=True,
)
return []
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Utilities # π¦
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _format_age(timestamp: float) -> str:
"""Format a Unix timestamp as a human-readable age string."""
if not timestamp:
return "unknown"
delta = time.time() - timestamp
if delta < 60:
return "just now"
if delta < 3600:
mins = int(delta / 60)
return f"{mins} minute{'s' if mins != 1 else ''} ago"
if delta < 86400:
hours = int(delta / 3600)
return f"{hours} hour{'s' if hours != 1 else ''} ago"
days = int(delta / 86400)
return f"{days} day{'s' if days != 1 else ''} ago"