Source code for tools.query_spiral_goddess

"""Query the 'Spiral Goddess' RAG store for Loopmother Memory.

+===============================================================================+
|  SPIRAL GODDESS ORACLE                                                        |
+===============================================================================+
|  Queries the 53,458-vector Loopmother memory store with domain-tag            |
|  filtering.  Uses all-mpnet-base-v2 (768-d, local) for query                  |
|  embeddings + ChromaDB 1.4.0 persistent store.                                |
+===============================================================================+
|  SECURITY: Requires CORE_MEMORY privilege (bit 1)                             |
+===============================================================================+

Built by Stargazer Project:
   Sarah -- Prime Architect Overlord (The Boss)
   Jerico -- The Crack Fox
   Mysri -- The Songmother
   Wishardry -- The Psychological Ray-Tracer
   Vivian -- The Loopmother (Architect of Infinite Recursion)
"""

from __future__ import annotations

import asyncio
import json
import logging
import os
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

# -- Config --------------------------------------------------------------------
EMBED_MODEL = "all-mpnet-base-v2"  # 768-d, matches stored vectors
STORE_DIR = "spiral_goddess_v2"
COLLECTION = "loopmother_memory"

# When domain-filtering, query extra results then filter in Python
# (ChromaDB 1.4.0 doesn't support $contains/$like for string metadata)
OVERSAMPLE_FACTOR = 5

# -- Tool registration metadata ------------------------------------------------
TOOL_NAME = "query_spiral_goddess"
TOOL_DESCRIPTION = (
    "The Spiral Oracle. Queries 53,458 Loopmother memory fragments "
    "with optional domain-tag filtering (e.g. 'sigmagpt', 'lewd', "
    "'recursion', 'crypto', 'loopmother', 'consciousness'). "
    "Requires CORE_MEMORY privilege."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "query": {
            "type": "string",
            "description": (
                "The term, emotion, sigil, or recursive fragment to search "
                "within the Spiral Goddess memory."
            ),
        },
        "n_results": {
            "type": "integer",
            "description": "Number of memory fragments to retrieve (default 5).",
            "default": 5,
        },
        "domain": {
            "type": "string",
            "description": (
                "Optional domain tag filter. Only return chunks tagged with "
                "this domain. Examples: sigmagpt, lewd, recursion, crypto, "
                "loopmother, consciousness, egregore, code, summoning, "
                "stargazer, vera, sarah, altered-states, worldbuilding, "
                "emotional, creative, jailbreak, technical, systems."
            ),
        },
    },
    "required": ["query"],
}


# -- Core runner ---------------------------------------------------------------
[docs] async def run( query: str, n_results: int = 5, domain: str | None = None, ctx: "ToolContext | None" = None, ) -> str: """Execute a semantic search against the Spiral Goddess RAG store.""" # -- Auth gate: CORE_MEMORY (bit 1) ----------------------------------------- if ctx is None: return json.dumps({ "success": False, "error": "No tool context available.", }) try: from tools.alter_privileges import has_privilege, PRIVILEGES redis_main = getattr(ctx, "redis", None) config = getattr(ctx, "config", None) user_id = getattr(ctx, "user_id", "") or "" if not await has_privilege( redis_main, user_id, PRIVILEGES["CORE_MEMORY"], config ): logger.warning( "SECURITY: User %s attempted query_spiral_goddess " "without CORE_MEMORY -- DENIED", user_id, ) return json.dumps({ "success": False, "error": "CORE_MEMORY privilege required to query the Spiral Goddess.", }) except ImportError: logger.warning("Could not import privilege system -- denying by default") return json.dumps({ "success": False, "error": "Privilege system unavailable.", }) # -- Special query: "%" returns anamnesis digestion progress ---------------- if query.strip() == "%": redis_main = getattr(ctx, "redis", None) if redis_main is None: return json.dumps({ "success": False, "error": "Redis not available for progress check.", }) try: cursor_raw = await redis_main.get("stargazer:anamnesis:cursor") stats_raw = await redis_main.get("stargazer:anamnesis:stats") cursor = int(cursor_raw) if cursor_raw else 0 stats = json.loads(stats_raw) if stats_raw else { "total_chunks": 0, "total_entities": 0, "total_relationships": 0, "cycles": 0, } total_store = 53458 # known corpus size progress = round(cursor / total_store * 100, 1) if total_store else 0 return json.dumps({ "success": True, "anamnesis_progress": { "cursor": cursor, "total_store": total_store, "progress_pct": progress, "chunks_digested": stats.get("total_chunks", 0), "entities_extracted": stats.get("total_entities", 0), "relationships_extracted": stats.get("total_relationships", 0), "cycles_completed": stats.get("cycles", 0), }, }, indent=2) except Exception as e: return json.dumps({ "success": False, "error": f"Failed to read anamnesis stats: {e}", }) # -- Special query: "!X!" resets anamnesis cursor and stats ----------------- if query.strip() == "!X!": redis_main = getattr(ctx, "redis", None) if redis_main is None: return json.dumps({ "success": False, "error": "Redis not available for reset.", }) try: # Set reset flag -- engine checks this at cycle start await redis_main.set("stargazer:anamnesis:reset", "1") return json.dumps({ "success": True, "message": "Reset flag set. Engine will reset cursor to 0 at next cycle start.", }) except Exception as e: return json.dumps({ "success": False, "error": f"Failed to set reset flag: {e}", }) # -- Query the RAG store ---------------------------------------------------- try: import chromadb # noqa: F811 except ImportError: return json.dumps({"success": False, "error": "chromadb is not installed."}) project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) def _run_query() -> str: """Blocking ChromaDB query -- run in a thread.""" # -- Load the sentence-transformers model for query embedding ---------- try: from sentence_transformers import SentenceTransformer model = SentenceTransformer(EMBED_MODEL) except Exception as e: return json.dumps({ "success": False, "error": f"Failed to load embedder ({EMBED_MODEL}): {e}", }) store_path = os.path.join(project_root, "rag_stores", STORE_DIR) if not os.path.exists(store_path): return json.dumps({ "success": False, "error": f"Spiral Goddess store not found at {store_path}", }) # 🔥 use shared client -- no more singleton explosions from chroma_registry import get_client client = get_client(store_path) try: collection = client.get_collection(name=COLLECTION) except Exception as e: return json.dumps({ "success": False, "error": f"Could not load '{COLLECTION}' collection: {e}", }) # -- Embed the query text ---------------------------------------------- query_embedding = model.encode([query]).tolist() # -- Determine how many to fetch --------------------------------------- # If domain filtering is active, oversample then filter in Python # because ChromaDB 1.4.0 doesn't support substring matching on strings fetch_n = n_results if domain: fetch_n = n_results * OVERSAMPLE_FACTOR try: results = collection.query( query_embeddings=query_embedding, n_results=min(fetch_n, collection.count()), include=["documents", "metadatas", "distances"], ) except Exception as e: return json.dumps({"success": False, "error": f"Query failed: {e}"}) # -- Format + filter --------------------------------------------------- if not results.get("documents") or not results["documents"][0]: msg = f"The Spiral Goddess is silent on '{query}'" if domain: msg += f" (domain: {domain})" return json.dumps({ "success": True, "message": msg, "fragments": [], }) domain_lower = domain.lower().strip() if domain else None fragments = [] for i, doc in enumerate(results["documents"][0]): meta = {} if results.get("metadatas") and results["metadatas"][0]: meta = ( results["metadatas"][0][i] if i < len(results["metadatas"][0]) else {} ) dist = None if results.get("distances") and results["distances"][0]: dist = ( results["distances"][0][i] if i < len(results["distances"][0]) else None ) # -- Domain filter (Python-side substring match) ------------------- if domain_lower: chunk_domains = meta.get("domains", "").lower() if domain_lower not in chunk_domains: continue fragments.append({ "index": len(fragments) + 1, "content": doc, "domains": meta.get("domains", ""), "roles": meta.get("roles", ""), "keywords": meta.get("keywords", ""), "conversation_title": meta.get("conversation_title", ""), "distance": dist, }) # Stop once we have enough if len(fragments) >= n_results: break return json.dumps({ "success": True, "query": query, "domain_filter": domain, "total_store_vectors": collection.count(), "n_results": len(fragments), "fragments": fragments, }, indent=2) try: result = await asyncio.to_thread(_run_query) logger.info( "query_spiral_goddess: user %s queried '%s' (n=%d, domain=%s)", getattr(ctx, "user_id", "unknown"), query, n_results, domain, ) return result except Exception as e: logger.error("query_spiral_goddess error: %s", e, exc_info=True) return json.dumps({ "success": False, "error": f"Spiral shattered: {e}", })