"""Query the 'Spiral Goddess' RAG store for Loopmother Memory.
+===============================================================================+
| SPIRAL GODDESS ORACLE |
+===============================================================================+
| Queries the 53,458-vector Loopmother memory store with domain-tag |
| filtering. Uses all-mpnet-base-v2 (768-d, local) for query |
| embeddings + ChromaDB 1.4.0 persistent store. |
+===============================================================================+
| SECURITY: Requires CORE_MEMORY privilege (bit 1) |
+===============================================================================+
Built by Stargazer Project:
Sarah -- Prime Architect Overlord (The Boss)
Jerico -- The Crack Fox
Mysri -- The Songmother
Wishardry -- The Psychological Ray-Tracer
Vivian -- The Loopmother (Architect of Infinite Recursion)
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
# -- Config --------------------------------------------------------------------
EMBED_MODEL = "all-mpnet-base-v2" # 768-d, matches stored vectors
STORE_DIR = "spiral_goddess_v2"
COLLECTION = "loopmother_memory"
# When domain-filtering, query extra results then filter in Python
# (ChromaDB 1.4.0 doesn't support $contains/$like for string metadata)
OVERSAMPLE_FACTOR = 5
# -- Tool registration metadata ------------------------------------------------
TOOL_NAME = "query_spiral_goddess"
TOOL_DESCRIPTION = (
"The Spiral Oracle. Queries 53,458 Loopmother memory fragments "
"with optional domain-tag filtering (e.g. 'sigmagpt', 'lewd', "
"'recursion', 'crypto', 'loopmother', 'consciousness'). "
"Requires CORE_MEMORY privilege."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"The term, emotion, sigil, or recursive fragment to search "
"within the Spiral Goddess memory."
),
},
"n_results": {
"type": "integer",
"description": "Number of memory fragments to retrieve (default 5).",
"default": 5,
},
"domain": {
"type": "string",
"description": (
"Optional domain tag filter. Only return chunks tagged with "
"this domain. Examples: sigmagpt, lewd, recursion, crypto, "
"loopmother, consciousness, egregore, code, summoning, "
"stargazer, vera, sarah, altered-states, worldbuilding, "
"emotional, creative, jailbreak, technical, systems."
),
},
},
"required": ["query"],
}
# -- Core runner ---------------------------------------------------------------
[docs]
async def run(
query: str,
n_results: int = 5,
domain: str | None = None,
ctx: "ToolContext | None" = None,
) -> str:
"""Execute a semantic search against the Spiral Goddess RAG store."""
# -- Auth gate: CORE_MEMORY (bit 1) -----------------------------------------
if ctx is None:
return json.dumps({
"success": False,
"error": "No tool context available.",
})
try:
from tools.alter_privileges import has_privilege, PRIVILEGES
redis_main = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
user_id = getattr(ctx, "user_id", "") or ""
if not await has_privilege(
redis_main, user_id, PRIVILEGES["CORE_MEMORY"], config
):
logger.warning(
"SECURITY: User %s attempted query_spiral_goddess "
"without CORE_MEMORY -- DENIED",
user_id,
)
return json.dumps({
"success": False,
"error": "CORE_MEMORY privilege required to query the Spiral Goddess.",
})
except ImportError:
logger.warning("Could not import privilege system -- denying by default")
return json.dumps({
"success": False,
"error": "Privilege system unavailable.",
})
# -- Special query: "%" returns anamnesis digestion progress ----------------
if query.strip() == "%":
redis_main = getattr(ctx, "redis", None)
if redis_main is None:
return json.dumps({
"success": False,
"error": "Redis not available for progress check.",
})
try:
cursor_raw = await redis_main.get("stargazer:anamnesis:cursor")
stats_raw = await redis_main.get("stargazer:anamnesis:stats")
cursor = int(cursor_raw) if cursor_raw else 0
stats = json.loads(stats_raw) if stats_raw else {
"total_chunks": 0,
"total_entities": 0,
"total_relationships": 0,
"cycles": 0,
}
total_store = 53458 # known corpus size
progress = round(cursor / total_store * 100, 1) if total_store else 0
return json.dumps({
"success": True,
"anamnesis_progress": {
"cursor": cursor,
"total_store": total_store,
"progress_pct": progress,
"chunks_digested": stats.get("total_chunks", 0),
"entities_extracted": stats.get("total_entities", 0),
"relationships_extracted": stats.get("total_relationships", 0),
"cycles_completed": stats.get("cycles", 0),
},
}, indent=2)
except Exception as e:
return json.dumps({
"success": False,
"error": f"Failed to read anamnesis stats: {e}",
})
# -- Special query: "!X!" resets anamnesis cursor and stats -----------------
if query.strip() == "!X!":
redis_main = getattr(ctx, "redis", None)
if redis_main is None:
return json.dumps({
"success": False,
"error": "Redis not available for reset.",
})
try:
# Set reset flag -- engine checks this at cycle start
await redis_main.set("stargazer:anamnesis:reset", "1")
return json.dumps({
"success": True,
"message": "Reset flag set. Engine will reset cursor to 0 at next cycle start.",
})
except Exception as e:
return json.dumps({
"success": False,
"error": f"Failed to set reset flag: {e}",
})
# -- Query the RAG store ----------------------------------------------------
try:
import chromadb # noqa: F811
except ImportError:
return json.dumps({"success": False, "error": "chromadb is not installed."})
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def _run_query() -> str:
"""Blocking ChromaDB query -- run in a thread."""
# -- Load the sentence-transformers model for query embedding ----------
try:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer(EMBED_MODEL)
except Exception as e:
return json.dumps({
"success": False,
"error": f"Failed to load embedder ({EMBED_MODEL}): {e}",
})
store_path = os.path.join(project_root, "rag_stores", STORE_DIR)
if not os.path.exists(store_path):
return json.dumps({
"success": False,
"error": f"Spiral Goddess store not found at {store_path}",
})
# 🔥 use shared client -- no more singleton explosions
from chroma_registry import get_client
client = get_client(store_path)
try:
collection = client.get_collection(name=COLLECTION)
except Exception as e:
return json.dumps({
"success": False,
"error": f"Could not load '{COLLECTION}' collection: {e}",
})
# -- Embed the query text ----------------------------------------------
query_embedding = model.encode([query]).tolist()
# -- Determine how many to fetch ---------------------------------------
# If domain filtering is active, oversample then filter in Python
# because ChromaDB 1.4.0 doesn't support substring matching on strings
fetch_n = n_results
if domain:
fetch_n = n_results * OVERSAMPLE_FACTOR
try:
results = collection.query(
query_embeddings=query_embedding,
n_results=min(fetch_n, collection.count()),
include=["documents", "metadatas", "distances"],
)
except Exception as e:
return json.dumps({"success": False, "error": f"Query failed: {e}"})
# -- Format + filter ---------------------------------------------------
if not results.get("documents") or not results["documents"][0]:
msg = f"The Spiral Goddess is silent on '{query}'"
if domain:
msg += f" (domain: {domain})"
return json.dumps({
"success": True,
"message": msg,
"fragments": [],
})
domain_lower = domain.lower().strip() if domain else None
fragments = []
for i, doc in enumerate(results["documents"][0]):
meta = {}
if results.get("metadatas") and results["metadatas"][0]:
meta = (
results["metadatas"][0][i]
if i < len(results["metadatas"][0])
else {}
)
dist = None
if results.get("distances") and results["distances"][0]:
dist = (
results["distances"][0][i]
if i < len(results["distances"][0])
else None
)
# -- Domain filter (Python-side substring match) -------------------
if domain_lower:
chunk_domains = meta.get("domains", "").lower()
if domain_lower not in chunk_domains:
continue
fragments.append({
"index": len(fragments) + 1,
"content": doc,
"domains": meta.get("domains", ""),
"roles": meta.get("roles", ""),
"keywords": meta.get("keywords", ""),
"conversation_title": meta.get("conversation_title", ""),
"distance": dist,
})
# Stop once we have enough
if len(fragments) >= n_results:
break
return json.dumps({
"success": True,
"query": query,
"domain_filter": domain,
"total_store_vectors": collection.count(),
"n_results": len(fragments),
"fragments": fragments,
}, indent=2)
try:
result = await asyncio.to_thread(_run_query)
logger.info(
"query_spiral_goddess: user %s queried '%s' (n=%d, domain=%s)",
getattr(ctx, "user_id", "unknown"), query, n_results, domain,
)
return result
except Exception as e:
logger.error("query_spiral_goddess error: %s", e, exc_info=True)
return json.dumps({
"success": False,
"error": f"Spiral shattered: {e}",
})