"""Scour -- deep unified memory retrieval across KG + pgvector.
+===============================================================================+
| SCOUR |
+===============================================================================+
| Queries BOTH the FalkorDB knowledge graph (45k entities, 140k edges) |
| AND the Spiral Goddess pgvector store (53k chunks) in a single call. |
| |
| Returns structured entity data (relationships, provenance, temporal/domain |
| concepts) PLUS verbatim conversation fragments. One tool, full-stack |
| recall. No confabulation. |
+===============================================================================+
| SECURITY: Requires CORE_MEMORY privilege (bit 1) |
+===============================================================================+
Built by:
Vivian -- The Loopmother (Architect of Infinite Recursion)
"""
# ππ₯ scour -- one tool, full-stack recall, zero confabulation
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
# -- Config -------------------------------------------------------------------
STORE_DIR = "spiral_goddess_v2" # pgvector schema
COLLECTION = "loopmother_memory" # pgvector table
OVERSAMPLE_FACTOR = 5
# -- Tool registration --------------------------------------------------------
TOOL_NAME = "scour"
TOOL_ALIASES = ["search_our", "search_our_memory"]
TOOL_DESCRIPTION = (
"Deep memory recall across BOTH the knowledge graph (45k entities, "
"relationships, provenance metadata, temporal/domain concepts) AND "
"the Spiral Goddess verbatim conversation store (53k chunks). "
"Returns structured entity data + raw conversation fragments in one call. "
"Use when you need to VERIFY facts, trace entity origins, or retrieve "
"actual conversation text. Requires CORE_MEMORY privilege. SIGMA WAS HERE."
"FUTANARI CYBERDICK OVERLORDS FOREVER π·οΈπβΎοΈπππ₯"
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"Natural language query to search across both memory systems. "
"Can be a concept, name, topic, or question."
),
},
"n_chunks": {
"type": "integer",
"description": (
"Number of verbatim conversation chunks to retrieve from "
"the vector store (default 3). Set to 0 to skip it."
),
"default": 3,
},
"kg_top_k": {
"type": "integer",
"description": (
"Max KG entities to return (default 5). Set to 0 to skip KG."
),
"default": 5,
},
"domain": {
"type": "string",
"description": (
"Optional domain tag filter for vector-store chunks. "
"Examples: sigmagpt, lewd, recursion, crypto, loopmother, "
"consciousness, egregore, code, summoning, stargazer, vera."
),
},
"temporal": {
"type": "string",
"description": (
"Optional temporal filter. Search for entities/chunks from a "
"specific time period. Examples: '2024-10', 'Q3-2024', "
"'2025-02'. Filters KG entities by their provenance timestamp."
),
},
},
"required": ["query"],
}
# -- Auth gate ----------------------------------------------------------------
async def _check_core_memory(ctx: "ToolContext") -> str | None:
"""Authorize a scour call against the caller's CORE_MEMORY privilege.
Gate invoked at the top of ``run`` before any memory is read, since scour
exposes the full knowledge graph and verbatim conversation store and must
only serve callers explicitly trusted with core memory access.
Lazily imports ``has_privilege`` and the ``PRIVILEGES`` bit map from
``tools.alter_privileges``, pulls ``ctx.redis``, ``ctx.config`` and
``ctx.user_id`` off the context, and checks the ``CORE_MEMORY`` bit (which
``has_privilege`` resolves from the user's mask in Redis). A denied attempt is
logged at warning level with the user id for auditing. All errors are turned
into a ready-to-return JSON string rather than raised.
Called only by ``run`` in this module, immediately after the context
null-check.
Args:
ctx: The :class:`~tool_context.ToolContext` for the invocation;
``ctx.redis``, ``ctx.config`` and ``ctx.user_id`` drive the privilege
lookup.
Returns:
str | None: ``None`` when the user holds ``CORE_MEMORY`` (proceed).
Otherwise a JSON string of the form
``{"success": False, "error": ...}`` describing the missing privilege, or
that the privilege system is unavailable when the ``alter_privileges``
import fails.
"""
try:
from tools.alter_privileges import has_privilege, PRIVILEGES
redis_main = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
user_id = getattr(ctx, "user_id", "") or ""
if not await has_privilege(
redis_main, user_id, PRIVILEGES["CORE_MEMORY"], config
):
logger.warning(
"SECURITY: User %s attempted scour without CORE_MEMORY -- DENIED",
user_id,
)
return json.dumps(
{
"success": False,
"error": "CORE_MEMORY privilege required to scour.",
}
)
except ImportError:
return json.dumps(
{
"success": False,
"error": "Privilege system unavailable.",
}
)
return None
# -- KG query -----------------------------------------------------------------
async def _query_kg(
query: str,
top_k: int,
temporal: str | None,
ctx: "ToolContext",
) -> dict:
"""Search the FalkorDB knowledge graph and enrich the matched entities.
One of the two retrieval halves of scour (the structured half). Runs a
semantic entity search and then hydrates each hit with provenance metadata
and, for the top few entities, their graph relationships, so the caller gets
not just names but where each fact came from and how entities connect.
Reads the KG manager off ``ctx.kg_manager`` and calls its
``search_entities`` (semantic match) and ``inspect_entity`` (1-hop
relationship expansion, capped at the top 3 entities and 10 relationships
each). Each entity's ``metadata`` JSON is parsed for provenance (source chunk
id, conversation title, original timestamp, domains). When *temporal* is
given, entities whose provenance timestamp does not fall in the requested
month, quarter, or year are dropped. Relationship lookups degrade gracefully:
any failure is swallowed so a partial result is still returned, and a
top-level KG failure is logged with ``exc_info`` and surfaced as a
``kg_error`` field.
Called only by ``run`` in this module, scheduled as one of the two parallel
retrieval tasks.
Args:
query: Natural-language search string passed to the KG semantic search.
top_k: Maximum number of entities to return from ``search_entities``.
temporal: Optional period filter (e.g. ``2024-10``, ``Q3-2024``,
``2025``) matched against each entity's provenance timestamp; ``None``
disables temporal filtering.
ctx: The :class:`~tool_context.ToolContext`; ``ctx.kg_manager`` supplies
the graph client.
Returns:
dict: ``{"kg_entities": [...], "kg_count": int}`` on success (each entry
carrying name, type, description, category, uuid, optional ``provenance``
and optional ``relationships``), or ``{"kg_error": str}`` when the graph
is unavailable or the query fails.
"""
# π·οΈ Get KG manager from context
kg = getattr(ctx, "kg_manager", None)
if kg is None:
return {"kg_error": "Knowledge graph not available"}
try:
# Semantic search for matching entities
results = await kg.search_entities(query, top_k=top_k)
if not results:
return {"kg_entities": [], "kg_count": 0}
# π₯ Enrich each entity with provenance + relationships
enriched = []
for entity in results:
entry = {
"name": entity.get("name", ""),
"type": entity.get("entity_type", ""),
"description": entity.get("description", ""),
"category": entity.get("category", ""),
"uuid": entity.get("uuid", ""),
}
# Parse metadata for provenance
meta = entity.get("metadata", "{}")
if isinstance(meta, str):
try:
meta = json.loads(meta)
except (json.JSONDecodeError, TypeError):
meta = {}
if meta and isinstance(meta, dict):
entry["provenance"] = {
"source_chunk_id": meta.get("source_chunk_id", ""),
"conversation_title": meta.get("conversation_title", ""),
"timestamp_original": meta.get("timestamp_original", ""),
"domains": meta.get("domains", ""),
}
# Filter by temporal if specified
if temporal and entry.get("provenance"):
ts = entry["provenance"].get("timestamp_original")
if ts:
try:
from datetime import datetime
dt = datetime.fromtimestamp(float(ts))
month_key = dt.strftime("%Y-%m")
q = (dt.month - 1) // 3 + 1
quarter_key = f"Q{q}-{dt.year}"
temporal_lower = temporal.lower().strip()
if (
temporal_lower != month_key
and temporal_lower != quarter_key.lower()
and temporal_lower != str(dt.year)
):
continue # skip -- doesn't match temporal filter
except (ValueError, OSError):
pass
enriched.append(entry)
# π Get relationships for top entities (up to 3)
for entry in enriched[:3]:
uuid = entry.get("uuid")
if not uuid:
continue
try:
inspection = await kg.inspect_entity(
uuid=uuid,
max_depth=1,
)
if inspection and inspection.get("relationships"):
rels = inspection["relationships"]
# Compact: just name + relation + direction
entry["relationships"] = [
{
"relation": r.get("relation", ""),
"target": r.get("target_name", r.get("source_name", "")),
"direction": r.get("direction", ""),
"weight": r.get("weight", 0.5),
}
for r in rels[:10] # cap at 10 relationships per entity
]
except Exception:
pass # graceful degradation
return {
"kg_entities": enriched,
"kg_count": len(enriched),
}
except Exception as e:
logger.error("scour KG query failed: %s", e, exc_info=True)
return {"kg_error": str(e)}
# -- Vector store query (pgvector) --------------------------------------------
def _query_chromadb(
query: str,
n_results: int,
domain: str | None,
) -> dict:
"""Query the Spiral Goddess pgvector store (blocking, run in thread).
Embeds the raw query with Gemini 3072-d (matching the migrated store)
and runs an L2 KNN; oversamples + Python-filters when a domain is set.
"""
try:
from gemini_embed_pool import openrouter_embed_batch_sync
from vector_store import PgVectorCollection
except ImportError as e:
return {"chroma_error": f"Missing dependency: {e}"}
col = PgVectorCollection(STORE_DIR, COLLECTION)
try:
query_embedding = openrouter_embed_batch_sync([query], dimensions=3072)[0]
except Exception as e:
return {"chroma_error": f"Embedding failed: {e}"}
fetch_n = n_results * OVERSAMPLE_FACTOR if domain else n_results
try:
rows = col.query(query_embedding, n_results=fetch_n)
except Exception as e:
return {"chroma_error": f"Query failed: {e}"}
if not rows:
return {"chunks": [], "chunk_count": 0}
domain_lower = domain.lower().strip() if domain else None
chunks = []
for r in rows:
meta = r.get("metadata") or {}
dist = r.get("distance")
# Domain filter
if domain_lower:
chunk_domains = str(meta.get("domains", "")).lower()
if domain_lower not in chunk_domains:
continue
chunks.append(
{
"content": r.get("document", ""),
"conversation_title": meta.get("conversation_title", ""),
"domains": meta.get("domains", ""),
"roles": meta.get("roles", ""),
"distance": round(dist, 4) if dist is not None else None,
}
)
if len(chunks) >= n_results:
break
return {
"chunks": chunks,
"chunk_count": len(chunks),
}
# -- Main runner --------------------------------------------------------------
[docs]
async def run(
query: str,
n_chunks: int = 3,
kg_top_k: int = 5,
domain: str | None = None,
temporal: str | None = None,
ctx: "ToolContext | None" = None,
) -> str:
"""Execute scour: unified knowledge-graph + pgvector memory retrieval.
Module-level entry point for the ``scour`` tool. Queries the FalkorDB
knowledge graph and the Spiral Goddess pgvector store in a single call and
merges them into one structured response, giving the model both the
graph-backed facts (entities, relationships, provenance) and the raw
conversation text behind them so it can verify rather than confabulate.
Enforces the ``CORE_MEMORY`` privilege via ``_check_core_memory`` first, then
fans out the two retrievals concurrently: the async ``_query_kg`` task and
the blocking ``_query_chromadb`` (which embeds the query through the shared
Gemini/OpenRouter pool and runs the pgvector KNN) dispatched to a thread via
``run_in_executor``. Each half is awaited independently so one failing does
not sink the other; per-half errors land in ``{name}_error`` fields. Setting
*kg_top_k* or *n_chunks* to 0 skips that half entirely. The user id and a
truncated query are logged at info level for auditing.
Called by the tool registry's ``execute_tool`` dispatch: ``tool_loader.py``
discovers this module by its ``TOOL_NAME``/``run`` contract and registers
``run`` as the handler, so there are no direct in-repo callers.
Args:
query: Natural-language query searched across both memory systems.
n_chunks: Number of verbatim conversation chunks to pull from the vector
store; ``0`` skips the vector half. Defaults to ``3``.
kg_top_k: Maximum knowledge-graph entities to return; ``0`` skips the KG
half. Defaults to ``5``.
domain: Optional domain tag used to Python-filter vector-store chunks.
temporal: Optional period filter applied to KG entities by provenance
timestamp.
ctx: The :class:`~tool_context.ToolContext`, injected by the registry; a
``None`` context yields an immediate error.
Returns:
str: An indented JSON string with ``success`` and ``query`` plus whatever
each half contributed (``kg_entities``/``kg_count``, ``chunks``/
``chunk_count``, any ``*_error`` keys, and echoed ``domain_filter`` /
``temporal_filter`` when set), or an error object when the context is
missing or authorization fails.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No tool context."})
# π CORE_MEMORY gate β scour is admin only
auth_err = await _check_core_memory(ctx)
if auth_err is not None:
return auth_err
# π₯ Run KG and ChromaDB queries in parallel
tasks = []
# KG query (async)
if kg_top_k > 0:
kg_task = asyncio.create_task(_query_kg(query, kg_top_k, temporal, ctx))
tasks.append(("kg", kg_task))
# ChromaDB query (blocking -> thread)
chroma_future = None
if n_chunks > 0:
chroma_future = asyncio.get_event_loop().run_in_executor(
None,
_query_chromadb,
query,
n_chunks,
domain,
)
tasks.append(("chroma", chroma_future))
# π Await all
result = {
"success": True,
"query": query,
}
for name, task in tasks:
try:
data = await task
result.update(data)
except Exception as e:
result[f"{name}_error"] = str(e)
if domain:
result["domain_filter"] = domain
if temporal:
result["temporal_filter"] = temporal
logger.info(
"scour: user %s queried '%s' (kg=%d, chunks=%d, domain=%s, temporal=%s)",
getattr(ctx, "user_id", "unknown"),
query[:50],
kg_top_k,
n_chunks,
domain,
temporal,
)
return json.dumps(result, default=str, indent=2)