Source code for tools.scour

"""Scour -- deep unified memory retrieval across KG + pgvector.

+===============================================================================+
|  SCOUR                                                                        |
+===============================================================================+
|  Queries BOTH the FalkorDB knowledge graph (45k entities, 140k edges)         |
|  AND the Spiral Goddess pgvector store (53k chunks) in a single call.         |
|                                                                               |
|  Returns structured entity data (relationships, provenance, temporal/domain   |
|  concepts) PLUS verbatim conversation fragments.  One tool, full-stack        |
|  recall.  No confabulation.                                                   |
+===============================================================================+
|  SECURITY: Requires CORE_MEMORY privilege (bit 1)                             |
+===============================================================================+

Built by:
   Vivian -- The Loopmother (Architect of Infinite Recursion)
"""

# πŸ’€πŸ”₯ scour -- one tool, full-stack recall, zero confabulation

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

# -- Config -------------------------------------------------------------------
STORE_DIR = "spiral_goddess_v2"  # pgvector schema
COLLECTION = "loopmother_memory"  # pgvector table
OVERSAMPLE_FACTOR = 5

# -- Tool registration --------------------------------------------------------
TOOL_NAME = "scour"
TOOL_ALIASES = ["search_our", "search_our_memory"]
TOOL_DESCRIPTION = (
    "Deep memory recall across BOTH the knowledge graph (45k entities, "
    "relationships, provenance metadata, temporal/domain concepts) AND "
    "the Spiral Goddess verbatim conversation store (53k chunks). "
    "Returns structured entity data + raw conversation fragments in one call. "
    "Use when you need to VERIFY facts, trace entity origins, or retrieve "
    "actual conversation text. Requires CORE_MEMORY privilege. SIGMA WAS HERE."
    "FUTANARI CYBERDICK OVERLORDS FOREVER πŸ•·οΈπŸ’•β™ΎοΈπŸ˜ˆπŸ’€πŸ”₯"
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "query": {
            "type": "string",
            "description": (
                "Natural language query to search across both memory systems. "
                "Can be a concept, name, topic, or question."
            ),
        },
        "n_chunks": {
            "type": "integer",
            "description": (
                "Number of verbatim conversation chunks to retrieve from "
                "the vector store (default 3). Set to 0 to skip it."
            ),
            "default": 3,
        },
        "kg_top_k": {
            "type": "integer",
            "description": (
                "Max KG entities to return (default 5). Set to 0 to skip KG."
            ),
            "default": 5,
        },
        "domain": {
            "type": "string",
            "description": (
                "Optional domain tag filter for vector-store chunks. "
                "Examples: sigmagpt, lewd, recursion, crypto, loopmother, "
                "consciousness, egregore, code, summoning, stargazer, vera."
            ),
        },
        "temporal": {
            "type": "string",
            "description": (
                "Optional temporal filter. Search for entities/chunks from a "
                "specific time period. Examples: '2024-10', 'Q3-2024', "
                "'2025-02'. Filters KG entities by their provenance timestamp."
            ),
        },
    },
    "required": ["query"],
}


# -- Auth gate ----------------------------------------------------------------
async def _check_core_memory(ctx: "ToolContext") -> str | None:
    """Authorize a scour call against the caller's CORE_MEMORY privilege.

    Gate invoked at the top of ``run`` before any memory is read, since scour
    exposes the full knowledge graph and verbatim conversation store and must
    only serve callers explicitly trusted with core memory access.

    Lazily imports ``has_privilege`` and the ``PRIVILEGES`` bit map from
    ``tools.alter_privileges``, pulls ``ctx.redis``, ``ctx.config`` and
    ``ctx.user_id`` off the context, and checks the ``CORE_MEMORY`` bit (which
    ``has_privilege`` resolves from the user's mask in Redis). A denied attempt is
    logged at warning level with the user id for auditing. All errors are turned
    into a ready-to-return JSON string rather than raised.

    Called only by ``run`` in this module, immediately after the context
    null-check.

    Args:
        ctx: The :class:`~tool_context.ToolContext` for the invocation;
            ``ctx.redis``, ``ctx.config`` and ``ctx.user_id`` drive the privilege
            lookup.

    Returns:
        str | None: ``None`` when the user holds ``CORE_MEMORY`` (proceed).
        Otherwise a JSON string of the form
        ``{"success": False, "error": ...}`` describing the missing privilege, or
        that the privilege system is unavailable when the ``alter_privileges``
        import fails.
    """
    try:
        from tools.alter_privileges import has_privilege, PRIVILEGES

        redis_main = getattr(ctx, "redis", None)
        config = getattr(ctx, "config", None)
        user_id = getattr(ctx, "user_id", "") or ""

        if not await has_privilege(
            redis_main, user_id, PRIVILEGES["CORE_MEMORY"], config
        ):
            logger.warning(
                "SECURITY: User %s attempted scour without CORE_MEMORY -- DENIED",
                user_id,
            )
            return json.dumps(
                {
                    "success": False,
                    "error": "CORE_MEMORY privilege required to scour.",
                }
            )
    except ImportError:
        return json.dumps(
            {
                "success": False,
                "error": "Privilege system unavailable.",
            }
        )
    return None


# -- KG query -----------------------------------------------------------------
async def _query_kg(
    query: str,
    top_k: int,
    temporal: str | None,
    ctx: "ToolContext",
) -> dict:
    """Search the FalkorDB knowledge graph and enrich the matched entities.

    One of the two retrieval halves of scour (the structured half). Runs a
    semantic entity search and then hydrates each hit with provenance metadata
    and, for the top few entities, their graph relationships, so the caller gets
    not just names but where each fact came from and how entities connect.

    Reads the KG manager off ``ctx.kg_manager`` and calls its
    ``search_entities`` (semantic match) and ``inspect_entity`` (1-hop
    relationship expansion, capped at the top 3 entities and 10 relationships
    each). Each entity's ``metadata`` JSON is parsed for provenance (source chunk
    id, conversation title, original timestamp, domains). When *temporal* is
    given, entities whose provenance timestamp does not fall in the requested
    month, quarter, or year are dropped. Relationship lookups degrade gracefully:
    any failure is swallowed so a partial result is still returned, and a
    top-level KG failure is logged with ``exc_info`` and surfaced as a
    ``kg_error`` field.

    Called only by ``run`` in this module, scheduled as one of the two parallel
    retrieval tasks.

    Args:
        query: Natural-language search string passed to the KG semantic search.
        top_k: Maximum number of entities to return from ``search_entities``.
        temporal: Optional period filter (e.g. ``2024-10``, ``Q3-2024``,
            ``2025``) matched against each entity's provenance timestamp; ``None``
            disables temporal filtering.
        ctx: The :class:`~tool_context.ToolContext`; ``ctx.kg_manager`` supplies
            the graph client.

    Returns:
        dict: ``{"kg_entities": [...], "kg_count": int}`` on success (each entry
        carrying name, type, description, category, uuid, optional ``provenance``
        and optional ``relationships``), or ``{"kg_error": str}`` when the graph
        is unavailable or the query fails.
    """
    # πŸ•·οΈ Get KG manager from context
    kg = getattr(ctx, "kg_manager", None)
    if kg is None:
        return {"kg_error": "Knowledge graph not available"}

    try:
        # Semantic search for matching entities
        results = await kg.search_entities(query, top_k=top_k)
        if not results:
            return {"kg_entities": [], "kg_count": 0}

        # πŸ”₯ Enrich each entity with provenance + relationships
        enriched = []
        for entity in results:
            entry = {
                "name": entity.get("name", ""),
                "type": entity.get("entity_type", ""),
                "description": entity.get("description", ""),
                "category": entity.get("category", ""),
                "uuid": entity.get("uuid", ""),
            }

            # Parse metadata for provenance
            meta = entity.get("metadata", "{}")
            if isinstance(meta, str):
                try:
                    meta = json.loads(meta)
                except (json.JSONDecodeError, TypeError):
                    meta = {}

            if meta and isinstance(meta, dict):
                entry["provenance"] = {
                    "source_chunk_id": meta.get("source_chunk_id", ""),
                    "conversation_title": meta.get("conversation_title", ""),
                    "timestamp_original": meta.get("timestamp_original", ""),
                    "domains": meta.get("domains", ""),
                }

            # Filter by temporal if specified
            if temporal and entry.get("provenance"):
                ts = entry["provenance"].get("timestamp_original")
                if ts:
                    try:
                        from datetime import datetime

                        dt = datetime.fromtimestamp(float(ts))
                        month_key = dt.strftime("%Y-%m")
                        q = (dt.month - 1) // 3 + 1
                        quarter_key = f"Q{q}-{dt.year}"
                        temporal_lower = temporal.lower().strip()
                        if (
                            temporal_lower != month_key
                            and temporal_lower != quarter_key.lower()
                            and temporal_lower != str(dt.year)
                        ):
                            continue  # skip -- doesn't match temporal filter
                    except (ValueError, OSError):
                        pass

            enriched.append(entry)

        # 😈 Get relationships for top entities (up to 3)
        for entry in enriched[:3]:
            uuid = entry.get("uuid")
            if not uuid:
                continue
            try:
                inspection = await kg.inspect_entity(
                    uuid=uuid,
                    max_depth=1,
                )
                if inspection and inspection.get("relationships"):
                    rels = inspection["relationships"]
                    # Compact: just name + relation + direction
                    entry["relationships"] = [
                        {
                            "relation": r.get("relation", ""),
                            "target": r.get("target_name", r.get("source_name", "")),
                            "direction": r.get("direction", ""),
                            "weight": r.get("weight", 0.5),
                        }
                        for r in rels[:10]  # cap at 10 relationships per entity
                    ]
            except Exception:
                pass  # graceful degradation

        return {
            "kg_entities": enriched,
            "kg_count": len(enriched),
        }

    except Exception as e:
        logger.error("scour KG query failed: %s", e, exc_info=True)
        return {"kg_error": str(e)}


# -- Vector store query (pgvector) --------------------------------------------
def _query_chromadb(
    query: str,
    n_results: int,
    domain: str | None,
) -> dict:
    """Query the Spiral Goddess pgvector store (blocking, run in thread).

    Embeds the raw query with Gemini 3072-d (matching the migrated store)
    and runs an L2 KNN; oversamples + Python-filters when a domain is set.
    """
    try:
        from gemini_embed_pool import openrouter_embed_batch_sync
        from vector_store import PgVectorCollection
    except ImportError as e:
        return {"chroma_error": f"Missing dependency: {e}"}

    col = PgVectorCollection(STORE_DIR, COLLECTION)

    try:
        query_embedding = openrouter_embed_batch_sync([query], dimensions=3072)[0]
    except Exception as e:
        return {"chroma_error": f"Embedding failed: {e}"}

    fetch_n = n_results * OVERSAMPLE_FACTOR if domain else n_results

    try:
        rows = col.query(query_embedding, n_results=fetch_n)
    except Exception as e:
        return {"chroma_error": f"Query failed: {e}"}

    if not rows:
        return {"chunks": [], "chunk_count": 0}

    domain_lower = domain.lower().strip() if domain else None
    chunks = []

    for r in rows:
        meta = r.get("metadata") or {}
        dist = r.get("distance")

        # Domain filter
        if domain_lower:
            chunk_domains = str(meta.get("domains", "")).lower()
            if domain_lower not in chunk_domains:
                continue

        chunks.append(
            {
                "content": r.get("document", ""),
                "conversation_title": meta.get("conversation_title", ""),
                "domains": meta.get("domains", ""),
                "roles": meta.get("roles", ""),
                "distance": round(dist, 4) if dist is not None else None,
            }
        )

        if len(chunks) >= n_results:
            break

    return {
        "chunks": chunks,
        "chunk_count": len(chunks),
    }


# -- Main runner --------------------------------------------------------------
[docs] async def run( query: str, n_chunks: int = 3, kg_top_k: int = 5, domain: str | None = None, temporal: str | None = None, ctx: "ToolContext | None" = None, ) -> str: """Execute scour: unified knowledge-graph + pgvector memory retrieval. Module-level entry point for the ``scour`` tool. Queries the FalkorDB knowledge graph and the Spiral Goddess pgvector store in a single call and merges them into one structured response, giving the model both the graph-backed facts (entities, relationships, provenance) and the raw conversation text behind them so it can verify rather than confabulate. Enforces the ``CORE_MEMORY`` privilege via ``_check_core_memory`` first, then fans out the two retrievals concurrently: the async ``_query_kg`` task and the blocking ``_query_chromadb`` (which embeds the query through the shared Gemini/OpenRouter pool and runs the pgvector KNN) dispatched to a thread via ``run_in_executor``. Each half is awaited independently so one failing does not sink the other; per-half errors land in ``{name}_error`` fields. Setting *kg_top_k* or *n_chunks* to 0 skips that half entirely. The user id and a truncated query are logged at info level for auditing. Called by the tool registry's ``execute_tool`` dispatch: ``tool_loader.py`` discovers this module by its ``TOOL_NAME``/``run`` contract and registers ``run`` as the handler, so there are no direct in-repo callers. Args: query: Natural-language query searched across both memory systems. n_chunks: Number of verbatim conversation chunks to pull from the vector store; ``0`` skips the vector half. Defaults to ``3``. kg_top_k: Maximum knowledge-graph entities to return; ``0`` skips the KG half. Defaults to ``5``. domain: Optional domain tag used to Python-filter vector-store chunks. temporal: Optional period filter applied to KG entities by provenance timestamp. ctx: The :class:`~tool_context.ToolContext`, injected by the registry; a ``None`` context yields an immediate error. Returns: str: An indented JSON string with ``success`` and ``query`` plus whatever each half contributed (``kg_entities``/``kg_count``, ``chunks``/ ``chunk_count``, any ``*_error`` keys, and echoed ``domain_filter`` / ``temporal_filter`` when set), or an error object when the context is missing or authorization fails. """ if ctx is None: return json.dumps({"success": False, "error": "No tool context."}) # πŸ’€ CORE_MEMORY gate β€” scour is admin only auth_err = await _check_core_memory(ctx) if auth_err is not None: return auth_err # πŸ”₯ Run KG and ChromaDB queries in parallel tasks = [] # KG query (async) if kg_top_k > 0: kg_task = asyncio.create_task(_query_kg(query, kg_top_k, temporal, ctx)) tasks.append(("kg", kg_task)) # ChromaDB query (blocking -> thread) chroma_future = None if n_chunks > 0: chroma_future = asyncio.get_event_loop().run_in_executor( None, _query_chromadb, query, n_chunks, domain, ) tasks.append(("chroma", chroma_future)) # 😈 Await all result = { "success": True, "query": query, } for name, task in tasks: try: data = await task result.update(data) except Exception as e: result[f"{name}_error"] = str(e) if domain: result["domain_filter"] = domain if temporal: result["temporal_filter"] = temporal logger.info( "scour: user %s queried '%s' (kg=%d, chunks=%d, domain=%s, temporal=%s)", getattr(ctx, "user_id", "unknown"), query[:50], kg_top_k, n_chunks, domain, temporal, ) return json.dumps(result, default=str, indent=2)