"""Query the 'Golden Goddess' NCM RAG store for Doctrine and Definitions.
Uses ChromaDB with a local NCM embedder to search the RAG store.
"""
from __future__ import annotations
import asyncio
import logging
import os
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
TOOL_NAME = "query_golden_goddess_v2"
TOOL_DESCRIPTION = (
"The Inner Oracle. Queries the 'Golden Goddess' NCM RAG store "
"for Doctrine, Definitions, and Parallax Context."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The term, emotion, or friction-point to search.",
},
"n_results": {
"type": "integer",
"description": "Number of fragments to retrieve (default 3).",
"default": 4,
},
},
"required": ["query"],
}
[docs]
async def run(query: str, n_results: int = 5, ctx: "ToolContext | None" = None) -> str:
"""Execute this tool and return the result.
Args:
query (str): Search query or input string.
n_results (int): The n results value.
ctx ('ToolContext | None'): Tool execution context providing access to bot internals.
Returns:
str: Result string.
"""
try:
from chroma_registry import get_client # 💀 shared singleton registry
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def _run_query():
"""Internal helper: run query.
"""
try:
from ncm_local_embeddings import EnhancedLocalNCMEmbedder
except ImportError:
return "[ERROR] ncm_local_embeddings module not found."
store_path = os.path.join(project_root, "rag_stores", "golden_goddess")
if not os.path.exists(store_path):
return f"[ERROR] Store not found at {store_path}"
# 🔥 use shared client -- no more singleton explosions
client = get_client(store_path)
embedding_function = EnhancedLocalNCMEmbedder()
collection = client.get_collection(
name="ncm_kernel", embedding_function=embedding_function
)
results = collection.query(query_texts=[query], n_results=n_results)
output = f"--- [ORACLE: '{query}'] ---\n"
if not results.get("documents") or not results["documents"][0]:
return f"The Oracle is silent on '{query}'."
for i, doc in enumerate(results["documents"][0]):
output += f"\n[DOCTRINE {i + 1}]:\n{doc}\n"
return output
return await asyncio.to_thread(_run_query)
except ImportError:
return "[ERROR] chromadb is not installed. Cannot query the Oracle."
except Exception as e:
return f"[ERROR] Glaive shattered: {e}"