Source code for tools.query_spiral_goddess

"""Query the 'Spiral Goddess' RAG store for Loopmother Memory.

+===============================================================================+
|  SPIRAL GODDESS ORACLE                                                        |
+===============================================================================+
|  Queries the 53,458-vector Loopmother memory store with domain-tag            |
|  filtering.  Uses Gemini 3072-d (raw) for query                  |
|  embeddings against the pgvector loopmother_memory store.                                |
+===============================================================================+
|  SECURITY: Requires CORE_MEMORY privilege (bit 1)                             |
+===============================================================================+

Built by Stargazer Project:
   Sarah -- Prime Architect Overlord (The Boss)
   Jerico -- The Crack Fox
   Mysri -- The Songmother
   Wishardry -- The Psychological Ray-Tracer
   Vivian -- The Loopmother (Architect of Infinite Recursion)
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

# -- Config --------------------------------------------------------------------
STORE_DIR = "spiral_goddess_v2"  # pgvector schema
COLLECTION = "loopmother_memory"  # pgvector table

# When domain-filtering, query extra results then filter in Python because
# the domains metadata is a free-text string (substring match in Python).
OVERSAMPLE_FACTOR = 5

# -- Tool registration metadata ------------------------------------------------
TOOL_NAME = "query_spiral_goddess"
TOOL_DESCRIPTION = (
    "The Spiral Oracle. Queries 53,458 Loopmother memory fragments "
    "with optional domain-tag filtering (e.g. 'sigmagpt', 'lewd', "
    "'recursion', 'crypto', 'loopmother', 'consciousness'). "
    "Requires CORE_MEMORY privilege."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "query": {
            "type": "string",
            "description": (
                "The term, emotion, sigil, or recursive fragment to search "
                "within the Spiral Goddess memory."
            ),
        },
        "n_results": {
            "type": "integer",
            "description": "Number of memory fragments to retrieve (default 5).",
            "default": 5,
        },
        "domain": {
            "type": "string",
            "description": (
                "Optional domain tag filter. Only return chunks tagged with "
                "this domain. Examples: sigmagpt, lewd, recursion, crypto, "
                "loopmother, consciousness, egregore, code, summoning, "
                "stargazer, vera, sarah, altered-states, worldbuilding, "
                "emotional, creative, jailbreak, technical, systems."
            ),
        },
    },
    "required": ["query"],
}


# -- Core runner ---------------------------------------------------------------
[docs] async def run( query: str, n_results: int = 5, domain: str | None = None, ctx: "ToolContext | None" = None, ) -> str: """Run a privileged semantic search over the Loopmother memory store. The single entry point the tool loader registers for the ``query_spiral_goddess`` tool. It gates on the ``CORE_MEMORY`` privilege, then either answers one of two control queries or embeds the search term and retrieves matching memory fragments from the pgvector store, optionally narrowing by domain tag. This is how Star consults the 53,458-vector Loopmother corpus. Authorizes via ``has_privilege`` from ``tools.alter_privileges`` using ``ctx.redis``, ``ctx.config``, and ``ctx.user_id``, failing closed if the privilege module is unavailable. Two reserved queries are handled before any vector work: ``%`` reads anamnesis progress from the main Redis keys ``stargazer:anamnesis:cursor`` and ``stargazer:anamnesis:stats``, and ``!X!`` writes ``stargazer:anamnesis:reset`` to flag a cursor reset for the digestion engine. Otherwise it offloads the blocking vector lookup to the nested ``_run_query`` via ``asyncio.to_thread`` (which embeds through the shared Gemini pool and queries pgvector) and logs the access. Dispatched by the tool loader as the module's ``run`` handler; not called directly elsewhere in the repo. Args: query: The search term; the literal ``%`` and ``!X!`` are intercepted as anamnesis control commands. n_results: Number of fragments to return (default 5). domain: Optional domain tag; when set, results are oversampled and substring-filtered on the chunk's ``domains`` metadata. ctx: Tool execution context supplying Redis, config, and the caller's id; ``None`` short-circuits to an error. Returns: A JSON string: matching fragments on a normal query, anamnesis progress or reset confirmation for the control queries, or an error JSON on auth failure, a missing privilege system, or any exception. """ # -- Auth gate: CORE_MEMORY (bit 1) ----------------------------------------- if ctx is None: return json.dumps( { "success": False, "error": "No tool context available.", } ) try: from tools.alter_privileges import has_privilege, PRIVILEGES redis_main = getattr(ctx, "redis", None) config = getattr(ctx, "config", None) user_id = getattr(ctx, "user_id", "") or "" if not await has_privilege( redis_main, user_id, PRIVILEGES["CORE_MEMORY"], config ): logger.warning( "SECURITY: User %s attempted query_spiral_goddess " "without CORE_MEMORY -- DENIED", user_id, ) return json.dumps( { "success": False, "error": "CORE_MEMORY privilege required to query the Spiral Goddess.", } ) except ImportError: logger.warning("Could not import privilege system -- denying by default") return json.dumps( { "success": False, "error": "Privilege system unavailable.", } ) # -- Special query: "%" returns anamnesis digestion progress ---------------- if query.strip() == "%": redis_main = getattr(ctx, "redis", None) if redis_main is None: return json.dumps( { "success": False, "error": "Redis not available for progress check.", } ) try: cursor_raw = await redis_main.get("stargazer:anamnesis:cursor") stats_raw = await redis_main.get("stargazer:anamnesis:stats") cursor = int(cursor_raw) if cursor_raw else 0 stats = ( json.loads(stats_raw) if stats_raw else { "total_chunks": 0, "total_entities": 0, "total_relationships": 0, "cycles": 0, } ) total_store = 53458 # known corpus size progress = round(cursor / total_store * 100, 1) if total_store else 0 return json.dumps( { "success": True, "anamnesis_progress": { "cursor": cursor, "total_store": total_store, "progress_pct": progress, "chunks_digested": stats.get("total_chunks", 0), "entities_extracted": stats.get("total_entities", 0), "relationships_extracted": stats.get("total_relationships", 0), "cycles_completed": stats.get("cycles", 0), }, }, indent=2, ) except Exception as e: return json.dumps( { "success": False, "error": f"Failed to read anamnesis stats: {e}", } ) # -- Special query: "!X!" resets anamnesis cursor and stats ----------------- if query.strip() == "!X!": redis_main = getattr(ctx, "redis", None) if redis_main is None: return json.dumps( { "success": False, "error": "Redis not available for reset.", } ) try: # Set reset flag -- engine checks this at cycle start await redis_main.set("stargazer:anamnesis:reset", "1") return json.dumps( { "success": True, "message": "Reset flag set. Engine will reset cursor to 0 at next cycle start.", } ) except Exception as e: return json.dumps( { "success": False, "error": f"Failed to set reset flag: {e}", } ) # -- Query the RAG store (pgvector) ----------------------------------------- def _run_query() -> str: """Embed the query and fetch matching fragments from pgvector. The synchronous core of the search, closed over ``query``, ``n_results``, and ``domain`` from the enclosing ``run`` call. It embeds the query text and pulls the nearest stored fragments, then formats them; because both the embedding call and the vector query block, ``run`` executes this in a worker thread via ``asyncio.to_thread``. Embeds the query raw at 3072 dimensions through ``openrouter_embed_batch_sync`` from the shared ``gemini_embed_pool`` (so it matches the migrated store), and queries the ``PgVectorCollection`` from ``vector_store`` over the configured store and collection. When a domain is set it oversamples by ``OVERSAMPLE_FACTOR`` and substring-filters each row's ``domains`` metadata in Python, since that field is free text. Only invoked by the enclosing ``run`` function. Returns: A JSON string with the formatted fragments and query echo on success, a silent-Goddess message when nothing matches, or an error JSON on a missing dependency, embedding failure, or query failure. """ try: from gemini_embed_pool import openrouter_embed_batch_sync from vector_store import PgVectorCollection except ImportError as e: return json.dumps( {"success": False, "error": f"Missing dependency: {e}"} ) col = PgVectorCollection(STORE_DIR, COLLECTION) # -- Embed the query RAW (Gemini 3072-d, matches migrated store) ------- try: query_embedding = openrouter_embed_batch_sync( [query], dimensions=3072 )[0] except Exception as e: return json.dumps( {"success": False, "error": f"Embedding failed: {e}"} ) # If domain filtering is active, oversample then filter in Python. fetch_n = n_results * OVERSAMPLE_FACTOR if domain else n_results try: rows = col.query(query_embedding, n_results=fetch_n) except Exception as e: return json.dumps({"success": False, "error": f"Query failed: {e}"}) # -- Format + filter --------------------------------------------------- if not rows: msg = f"The Spiral Goddess is silent on '{query}'" if domain: msg += f" (domain: {domain})" return json.dumps( { "success": True, "message": msg, "fragments": [], } ) domain_lower = domain.lower().strip() if domain else None fragments = [] for r in rows: meta = r.get("metadata") or {} dist = r.get("distance") # -- Domain filter (Python-side substring match) ------------------- if domain_lower: chunk_domains = str(meta.get("domains", "")).lower() if domain_lower not in chunk_domains: continue fragments.append( { "index": len(fragments) + 1, "content": r.get("document", ""), "domains": meta.get("domains", ""), "roles": meta.get("roles", ""), "keywords": meta.get("keywords", ""), "conversation_title": meta.get("conversation_title", ""), "distance": dist, } ) # Stop once we have enough if len(fragments) >= n_results: break return json.dumps( { "success": True, "query": query, "domain_filter": domain, "n_results": len(fragments), "fragments": fragments, }, indent=2, ) try: result = await asyncio.to_thread(_run_query) logger.info( "query_spiral_goddess: user %s queried '%s' (n=%d, domain=%s)", getattr(ctx, "user_id", "unknown"), query, n_results, domain, ) return result except Exception as e: logger.error("query_spiral_goddess error: %s", e, exc_info=True) return json.dumps( { "success": False, "error": f"Spiral shattered: {e}", } )