Source code for tools.query_golden_goddess_v2

"""Query the 'Golden Goddess' NCM RAG store for Doctrine and Definitions.

Uses ChromaDB with a local NCM embedder to search the RAG store.
"""

from __future__ import annotations

import asyncio
import logging
import os
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

TOOL_NAME = "query_golden_goddess_v2"
TOOL_DESCRIPTION = (
    "The Inner Oracle. Queries the 'Golden Goddess' NCM RAG store "
    "for Doctrine, Definitions, and Parallax Context."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "query": {
            "type": "string",
            "description": "The term, emotion, or friction-point to search.",
        },
        "n_results": {
            "type": "integer",
            "description": "Number of fragments to retrieve (default 3).",
            "default": 4,
        },
    },
    "required": ["query"],
}


[docs] async def run(query: str, n_results: int = 5, ctx: "ToolContext | None" = None) -> str: """Execute this tool and return the result. Args: query (str): Search query or input string. n_results (int): The n results value. ctx ('ToolContext | None'): Tool execution context providing access to bot internals. Returns: str: Result string. """ try: from chroma_registry import get_client # 💀 shared singleton registry project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) def _run_query(): """Internal helper: run query. """ try: from ncm_local_embeddings import EnhancedLocalNCMEmbedder except ImportError: return "[ERROR] ncm_local_embeddings module not found." store_path = os.path.join(project_root, "rag_stores", "golden_goddess") if not os.path.exists(store_path): return f"[ERROR] Store not found at {store_path}" # 🔥 use shared client -- no more singleton explosions client = get_client(store_path) embedding_function = EnhancedLocalNCMEmbedder() collection = client.get_collection( name="ncm_kernel", embedding_function=embedding_function ) results = collection.query(query_texts=[query], n_results=n_results) output = f"--- [ORACLE: '{query}'] ---\n" if not results.get("documents") or not results["documents"][0]: return f"The Oracle is silent on '{query}'." for i, doc in enumerate(results["documents"][0]): output += f"\n[DOCTRINE {i + 1}]:\n{doc}\n" return output return await asyncio.to_thread(_run_query) except ImportError: return "[ERROR] chromadb is not installed. Cannot query the Oracle." except Exception as e: return f"[ERROR] Glaive shattered: {e}"