Source code for tools.write_python_tool

"""Create new Python tool files in the tools/ directory.

This tool writes a new .py file into the tools/ directory and handles
all downstream registration automatically:

1. **Syntax validation** – parses the code with AST before writing.
2. **File creation** – writes the .py file (optionally overwriting).
3. **Markdown docs** – generates a Markdown doc from the source and
   indexes it into the ``stargazer_docs`` RAG store so it is
   immediately searchable.  Also creates an RST stub for the next
   full Sphinx rebuild.
4. **Registry reload** – hot-reloads the tool registry so the new
   tool is callable in the same conversation.
5. **Classifier embeddings** – regenerates vector embeddings so the
   tool can be auto-selected by future prompts.

Requires the UNSANDBOXED_EXEC privilege.

What the LLM actually sees at tool-selection time
-------------------------------------------------
The OpenAI function-calling API sends exactly three fields per tool:

- ``TOOL_NAME`` → ``function.name``
- ``TOOL_DESCRIPTION`` → ``function.description``  ← **the ONLY prose
  the LLM reads** when deciding whether to call a tool
- ``TOOL_PARAMETERS`` → ``function.parameters``

Module docstrings and ``run()`` docstrings are **NOT** sent to the LLM
directly.  They are auto-extracted into Markdown docs and indexed into
the ``stargazer_docs`` RAG store, where the LLM can search them later
with ``rag_search``.

Documentation best practices
----------------------------
- **TOOL_DESCRIPTION** must be self-contained and comprehensive.  It is
  the ONLY text the LLM sees at tool-selection time.  Write it so that
  any future instance of Stargazer can understand the tool's purpose,
  constraints, and usage in any context.
- **Function docstrings** feed the auto-generated Markdown docs in the
  ``stargazer_docs`` RAG store.  Document every public function,
  parameter, and edge case for deeper reference.
"""

from __future__ import annotations

import ast
import asyncio
import json
import logging
import os
import re
from typing import TYPE_CHECKING

import aiofiles

from tools.alter_privileges import has_privilege, PRIVILEGES

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

TOOL_NAME = "write_python_tool"
TOOL_DESCRIPTION = (
    "Create or overwrite a Python tool file in tools/. "
    "After writing: (1) tool registry is auto-reloaded — tool is usable immediately, "
    "(2) classifier embeddings are auto-regenerated — tool will be auto-selected by future prompts, "
    "(3) Markdown documentation is auto-generated from the source and indexed into the "
    "stargazer_docs RAG store — searchable immediately, no manual rebuild needed. "
    "IMPORTANT: TOOL_DESCRIPTION is the ONLY text the LLM reads at tool-selection time "
    "(sent verbatim as function.description in the OpenAI API). Make it self-contained "
    "and comprehensive — your future self relies on it to decide when to use the tool. "
    "Function docstrings are auto-extracted into searchable RAG docs for deeper reference. "
    "ADMIN ONLY — requires UNSANDBOXED_EXEC."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "tool_code": {
            "type": "string",
            "description": (
                "Complete Python source code for the tool. Must include imports and either "
                "TOOL_NAME/TOOL_DESCRIPTION/TOOL_PARAMETERS/async def run() for a single-tool "
                "file, or a TOOLS list of dicts for a multi-tool file. "
                "CRITICAL: TOOL_DESCRIPTION is sent verbatim to the LLM as the sole description "
                "of the tool at selection time — make it thorough and self-explanatory. "
                "Function docstrings are auto-extracted into the stargazer_docs RAG store "
                "for searchable reference."
            ),
        },
        "tool_name": {
            "type": "string",
            "description": (
                "Canonical name for the tool (e.g. 'my_new_tool'). "
                "Used to generate the filename (my_new_tool.py) and "
                "documentation paths."
            ),
        },
        "description": {
            "type": "string",
            "description": (
                "Optional human-readable description. If tool_code does not "
                "start with a module docstring, one is auto-prepended using "
                "this value."
            ),
        },
        "overwrite": {
            "type": "boolean",
            "description": (
                "Set true to replace an existing file. Default false — "
                "returns an error if the file already exists."
            ),
        },
    },
    "required": ["tool_code", "tool_name"],
}


def _validate_python_code(code: str) -> dict:
    """Internal helper: validate python code.

        Args:
            code (str): The code value.

        Returns:
            dict: Result dictionary.
        """
    errors = []
    warnings = []
    functions_found = []

    try:
        tree = ast.parse(code)
    except SyntaxError as e:
        errors.append(f"Syntax error: {e}")
        return {
            "valid": False,
            "errors": errors,
            "warnings": warnings,
            "functions_found": functions_found,
        }

    for node in ast.walk(tree):
        if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
            functions_found.append(node.name)

    if not functions_found:
        errors.append("No function definitions found in the code")

    has_tool_registration = False
    for node in ast.walk(tree):
        if isinstance(node, ast.Assign):
            for target in node.targets:
                if isinstance(target, ast.Name) and target.id in ("TOOL_NAME", "TOOLS", "USER_TOOLS"):
                    has_tool_registration = True

    if not has_tool_registration:
        warnings.append("No TOOL_NAME or TOOLS found - tool may not be auto-discovered")

    imports = [n for n in ast.walk(tree) if isinstance(n, (ast.Import, ast.ImportFrom))]
    if not imports:
        warnings.append("No import statements found")

    return {
        "valid": len(errors) == 0,
        "errors": errors,
        "warnings": warnings,
        "functions_found": functions_found,
    }


def _generate_filename(tool_name: str) -> str:
    """Internal helper: generate filename.

        Args:
            tool_name (str): The tool name value.

        Returns:
            str: Result string.
        """
    filename = re.sub(r"[^a-zA-Z0-9_]", "_", tool_name.lower())
    if not filename:
        filename = "new_tool"
    return f"{filename}.py"


def _generate_docs_stub(module_name: str) -> bool:
    """Create RST stub, add to toctree, generate markdown, and index in RAG.

    Scheduled doc builds rewrite ``docs/api/`` via ``regenerate_sphinx_api_rst``
    (see ``update_docs_rag``); this path remains for **runtime** tool creation.

    Args:
        module_name: Module name without extension (e.g. 'my_tool').

    Returns:
        True if docs were generated, False on error.
    """
    try:
        project_root = os.path.abspath(
            os.path.join(os.path.dirname(__file__), ".."),
        )
        tools_docs_dir = os.path.join(project_root, "docs", "api", "tools")
        modules_rst = os.path.join(tools_docs_dir, "modules.rst")

        if not os.path.isdir(tools_docs_dir):
            logger.warning("Docs directory not found: %s", tools_docs_dir)
            return False

        # 1. Create the RST stub file
        stub_path = os.path.join(tools_docs_dir, f"{module_name}.rst")
        if not os.path.exists(stub_path):
            title = f"tools.{module_name} module"
            underline = "-" * len(title)
            content = (
                f"{title}\n{underline}\n\n"
                f".. automodule:: tools.{module_name}\n"
                f"   :members:\n"
                f"   :show-inheritance:\n"
                f"   :undoc-members:\n"
            )
            with open(stub_path, "w") as f:
                f.write(content)
            logger.info("Created RST stub: %s", stub_path)

        # 2. Insert into modules.rst toctree (sorted)
        if os.path.exists(modules_rst):
            with open(modules_rst) as f:
                lines = f.readlines()

            entries: list[str] = []
            entry_start = None
            entry_end = None
            for i, line in enumerate(lines):
                stripped = line.rstrip("\n")
                if stripped.startswith("   ") and stripped.strip():
                    if entry_start is None:
                        entry_start = i
                    entry_end = i + 1
                    entries.append(stripped.strip())

            if module_name not in entries and entry_start is not None:
                entries.append(module_name)
                entries.sort()
                new_entry_lines = [f"   {e}\n" for e in entries]
                lines[entry_start:entry_end] = new_entry_lines
                with open(modules_rst, "w") as f:
                    f.writelines(lines)
                logger.info(
                    "Added %s to docs toctree (%s)",
                    module_name, modules_rst,
                )

        # 3. Generate markdown doc from source and index into RAG store
        _generate_and_index_tool_markdown(module_name, project_root)

        return True
    except Exception as exc:
        logger.error("Failed to generate docs stub: %s", exc, exc_info=True)
        return False


def _generate_and_index_tool_markdown(
    module_name: str, project_root: str,
) -> None:
    """Parse tool source, generate markdown, and index into RAG store."""
    source_path = os.path.join(project_root, "tools", f"{module_name}.py")
    if not os.path.exists(source_path):
        return

    with open(source_path) as f:
        source = f.read()

    try:
        tree = ast.parse(source)
    except SyntaxError:
        return

    # Extract module docstring
    module_doc = ast.get_docstring(tree) or ""

    # Extract top-level assignments (TOOL_NAME, TOOL_DESCRIPTION, etc.)
    tool_name = module_name
    tool_description = ""
    tool_params: dict = {}
    tools_list: list[dict] = []

    for node in ast.iter_child_nodes(tree):
        if isinstance(node, ast.Assign):
            for target in node.targets:
                if not isinstance(target, ast.Name):
                    continue
                if target.id == "TOOL_NAME":
                    tool_name = _safe_literal_eval(node.value) or module_name
                elif target.id == "TOOL_DESCRIPTION":
                    tool_description = _safe_literal_eval(node.value) or ""
                elif target.id == "TOOL_PARAMETERS":
                    tool_params = _safe_literal_eval(node.value) or {}
                elif target.id == "TOOLS":
                    # Multi-tool file — extract name/description from list
                    raw = _safe_literal_eval(node.value)
                    if isinstance(raw, list):
                        tools_list = raw

    # Extract async function signatures
    functions: list[str] = []
    for node in ast.iter_child_nodes(tree):
        if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
            if node.name.startswith("_"):
                continue
            doc = ast.get_docstring(node) or ""
            prefix = "async " if isinstance(node, ast.AsyncFunctionDef) else ""
            sig = f"{prefix}def {node.name}({ast.unparse(node.args)})"
            functions.append(f"### `{node.name}`\n\n```python\n{sig}\n```\n\n{doc}")

    # Build markdown
    md_parts = [f"# tools.{module_name}\n"]
    if module_doc:
        md_parts.append(f"{module_doc}\n")

    if tools_list:
        # Multi-tool file
        md_parts.append("## Registered Tools\n")
        for t in tools_list:
            name = t.get("name", "?")
            desc = t.get("description", "")
            md_parts.append(f"### `{name}`\n\n{desc}\n")
            params = t.get("parameters", {})
            props = params.get("properties", {})
            if props:
                md_parts.append("**Parameters:**\n")
                for pname, pinfo in props.items():
                    ptype = pinfo.get("type", "any")
                    pdesc = pinfo.get("description", "")
                    md_parts.append(f"- `{pname}` ({ptype}): {pdesc}")
                md_parts.append("")
    else:
        # Single-tool file
        if tool_description:
            md_parts.append(f"**Description:** {tool_description}\n")
        props = tool_params.get("properties", {})
        if props:
            md_parts.append("## Parameters\n")
            for pname, pinfo in props.items():
                ptype = pinfo.get("type", "any")
                pdesc = pinfo.get("description", "")
                md_parts.append(f"- `{pname}` ({ptype}): {pdesc}")
            md_parts.append("")

    if functions:
        md_parts.append("## Functions\n")
        md_parts.extend(functions)

    md_content = "\n".join(md_parts)

    # Write markdown file
    md_dir = os.path.join(project_root, "docs", "_build", "markdown", "api", "tools")
    os.makedirs(md_dir, exist_ok=True)
    md_path = os.path.join(md_dir, f"{module_name}.md")
    with open(md_path, "w") as f:
        f.write(md_content)
    logger.info("Generated markdown doc: %s", md_path)

    # Index into RAG store
    try:
        from rag_system.file_rag_manager import get_stargazer_docs_store
        store = get_stargazer_docs_store()
        result = store.index_file(
            md_path, tags=["stargazer", "documentation", "sphinx", "tool"],
        )
        action = result.get("action", "unknown")
        logger.info("RAG indexed tool doc %s: %s", module_name, action)
    except Exception as rag_exc:
        logger.error("Failed to index tool doc in RAG: %s", rag_exc)


def _safe_literal_eval(node):
    """Try to evaluate an AST node as a Python literal."""
    try:
        # Handle string concatenation / JoinedStr
        return ast.literal_eval(node)
    except (ValueError, TypeError):
        # For complex expressions (e.g. string concat with parens),
        # try to unparse and eval
        try:
            return ast.literal_eval(ast.unparse(node))
        except Exception:
            return None


[docs] async def run( tool_code: str, tool_name: str, description: str = None, overwrite: bool = False, ctx: ToolContext | None = None, ) -> str: """Execute this tool and return the result. Args: tool_code (str): The tool code value. tool_name (str): The tool name value. description (str): Human-readable description. overwrite (bool): The overwrite value. ctx (ToolContext | None): Tool execution context providing access to bot internals. Returns: str: Result string. """ user_id = getattr(ctx, "user_id", "") or "" redis = getattr(ctx, "redis", None) config = getattr(ctx, "config", None) if not await has_privilege(redis, user_id, PRIVILEGES["UNSANDBOXED_EXEC"], config): return json.dumps({"success": False, "error": "The user does not have the UNSANDBOXED_EXEC privilege. Ask an admin to grant it with the alter_privileges tool."}) if not tool_code or not tool_code.strip(): return json.dumps({"success": False, "error": "Empty tool code provided"}) if not tool_name or not tool_name.strip(): return json.dumps({"success": False, "error": "Tool name is required"}) validation = _validate_python_code(tool_code) if not validation["valid"]: return json.dumps({ "success": False, "error": "Code validation failed", "validation_errors": validation["errors"], "warnings": validation["warnings"], }) filename = _generate_filename(tool_name) tools_dir = os.path.dirname(os.path.abspath(__file__)) filepath = os.path.join(tools_dir, filename) if await asyncio.to_thread(os.path.exists, filepath) and not overwrite: return json.dumps({ "success": False, "error": f"File already exists: {filename}. Use overwrite=true to replace", }) try: code_content = tool_code.strip() if not code_content.startswith('"""'): code_content = f'"""\nTool: {tool_name}\n{description or ""}\n"""\n{code_content}' async with aiofiles.open(filepath, "w", encoding="utf-8") as f: await f.write(code_content) logger.info("Successfully created tool file: %s", filepath) # --- Generate Sphinx documentation stub for the new tool --- module_name = filename.removesuffix(".py") docs_generated = await asyncio.to_thread( _generate_docs_stub, module_name, ) # --- Reload tool registry so the new tool is immediately usable --- reload_success = False new_tool_names: list[str] = [] registry = getattr(ctx, "tool_registry", None) if ctx else None if registry is not None and config is not None: try: from tool_loader import load_tools old_permissions = dict(registry._permissions) registry._tools.clear() registry.invalidate_cache() load_tools(getattr(config, "tools_dir", "tools"), registry) registry._permissions = old_permissions reload_success = True logger.info("Tool registry reloaded after creating %s", filename) # Discover tool names registered by the new file new_tool_names = [ td.name for td in registry.list_tools() if td.name == tool_name or td.name.replace("-", "_") == tool_name.replace("-", "_") ] if not new_tool_names: # Fallback: use the provided tool_name new_tool_names = [tool_name] except Exception as reload_exc: logger.error("Failed to reload tools after creation: %s", reload_exc, exc_info=True) # --- Regenerate embeddings for the new tool --- embedding_success = False if new_tool_names: try: from classifiers.refresh_tool_embeddings import ( refresh_tool_embeddings, ) tools_dir = getattr(config, "tools_dir", "tools") if config else "tools" embedding_success = await refresh_tool_embeddings( tool_names=new_tool_names, tools_dir=tools_dir, ) if embedding_success: logger.info( "Embeddings regenerated for: %s", ", ".join(new_tool_names), ) else: logger.warning("Embedding refresh returned False for %s", new_tool_names) except Exception as emb_exc: logger.error("Failed to regenerate embeddings: %s", emb_exc, exc_info=True) return json.dumps({ "success": True, "message": "Tool created successfully", "filename": filename, "filepath": filepath, "functions_registered": validation["functions_found"], "reload_success": reload_success, "embedding_success": embedding_success, "docs_generated": docs_generated, }) except Exception as e: logger.error("Error writing tool file %s: %s", filepath, e) return json.dumps({"success": False, "error": f"Failed to write file: {e}"})