"""Create new Python tool files in the tools/ directory.
This tool writes a new .py file into the tools/ directory and handles
all downstream registration automatically:
1. **Syntax validation** – parses the code with AST before writing.
2. **File creation** – writes the .py file (optionally overwriting).
3. **Markdown docs** – generates a Markdown doc from the source and
indexes it into the ``stargazer_docs`` RAG store so it is
immediately searchable. Also creates an RST stub for the next
full Sphinx rebuild.
4. **Registry reload** – hot-reloads the tool registry so the new
tool is callable in the same conversation.
5. **Classifier embeddings** – regenerates vector embeddings so the
tool can be auto-selected by future prompts.
Requires the UNSANDBOXED_EXEC privilege.
What the LLM actually sees at tool-selection time
-------------------------------------------------
The OpenAI function-calling API sends exactly three fields per tool:
- ``TOOL_NAME`` → ``function.name``
- ``TOOL_DESCRIPTION`` → ``function.description`` ← **the ONLY prose
the LLM reads** when deciding whether to call a tool
- ``TOOL_PARAMETERS`` → ``function.parameters``
Module docstrings and ``run()`` docstrings are **NOT** sent to the LLM
directly. They are auto-extracted into Markdown docs and indexed into
the ``stargazer_docs`` RAG store, where the LLM can search them later
with ``rag_search``.
Documentation best practices
----------------------------
- **TOOL_DESCRIPTION** must be self-contained and comprehensive. It is
the ONLY text the LLM sees at tool-selection time. Write it so that
any future instance of Stargazer can understand the tool's purpose,
constraints, and usage in any context.
- **Function docstrings** feed the auto-generated Markdown docs in the
``stargazer_docs`` RAG store. Document every public function,
parameter, and edge case for deeper reference.
"""
from __future__ import annotations
import ast
import asyncio
import json
import logging
import os
import re
from typing import TYPE_CHECKING
import aiofiles
from tools.alter_privileges import has_privilege, PRIVILEGES
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
TOOL_NAME = "write_python_tool"
TOOL_DESCRIPTION = (
"Create or overwrite a Python tool file in tools/. "
"After writing: (1) tool registry is auto-reloaded — tool is usable immediately, "
"(2) classifier embeddings are auto-regenerated — tool will be auto-selected by future prompts, "
"(3) Markdown documentation is auto-generated from the source and indexed into the "
"stargazer_docs RAG store — searchable immediately, no manual rebuild needed. "
"IMPORTANT: TOOL_DESCRIPTION is the ONLY text the LLM reads at tool-selection time "
"(sent verbatim as function.description in the OpenAI API). Make it self-contained "
"and comprehensive — your future self relies on it to decide when to use the tool. "
"Function docstrings are auto-extracted into searchable RAG docs for deeper reference. "
"ADMIN ONLY — requires UNSANDBOXED_EXEC."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"tool_code": {
"type": "string",
"description": (
"Complete Python source code for the tool. Must include imports and either "
"TOOL_NAME/TOOL_DESCRIPTION/TOOL_PARAMETERS/async def run() for a single-tool "
"file, or a TOOLS list of dicts for a multi-tool file. "
"CRITICAL: TOOL_DESCRIPTION is sent verbatim to the LLM as the sole description "
"of the tool at selection time — make it thorough and self-explanatory. "
"Function docstrings are auto-extracted into the stargazer_docs RAG store "
"for searchable reference."
),
},
"tool_name": {
"type": "string",
"description": (
"Canonical name for the tool (e.g. 'my_new_tool'). "
"Used to generate the filename (my_new_tool.py) and "
"documentation paths."
),
},
"description": {
"type": "string",
"description": (
"Optional human-readable description. If tool_code does not "
"start with a module docstring, one is auto-prepended using "
"this value."
),
},
"overwrite": {
"type": "boolean",
"description": (
"Set true to replace an existing file. Default false — "
"returns an error if the file already exists."
),
},
},
"required": ["tool_code", "tool_name"],
}
def _validate_python_code(code: str) -> dict:
"""Internal helper: validate python code.
Args:
code (str): The code value.
Returns:
dict: Result dictionary.
"""
errors = []
warnings = []
functions_found = []
try:
tree = ast.parse(code)
except SyntaxError as e:
errors.append(f"Syntax error: {e}")
return {
"valid": False,
"errors": errors,
"warnings": warnings,
"functions_found": functions_found,
}
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
functions_found.append(node.name)
if not functions_found:
errors.append("No function definitions found in the code")
has_tool_registration = False
for node in ast.walk(tree):
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id in ("TOOL_NAME", "TOOLS", "USER_TOOLS"):
has_tool_registration = True
if not has_tool_registration:
warnings.append("No TOOL_NAME or TOOLS found - tool may not be auto-discovered")
imports = [n for n in ast.walk(tree) if isinstance(n, (ast.Import, ast.ImportFrom))]
if not imports:
warnings.append("No import statements found")
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
"functions_found": functions_found,
}
def _generate_filename(tool_name: str) -> str:
"""Internal helper: generate filename.
Args:
tool_name (str): The tool name value.
Returns:
str: Result string.
"""
filename = re.sub(r"[^a-zA-Z0-9_]", "_", tool_name.lower())
if not filename:
filename = "new_tool"
return f"{filename}.py"
def _generate_docs_stub(module_name: str) -> bool:
"""Create RST stub, add to toctree, generate markdown, and index in RAG.
Scheduled doc builds rewrite ``docs/api/`` via ``regenerate_sphinx_api_rst``
(see ``update_docs_rag``); this path remains for **runtime** tool creation.
Args:
module_name: Module name without extension (e.g. 'my_tool').
Returns:
True if docs were generated, False on error.
"""
try:
project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), ".."),
)
tools_docs_dir = os.path.join(project_root, "docs", "api", "tools")
modules_rst = os.path.join(tools_docs_dir, "modules.rst")
if not os.path.isdir(tools_docs_dir):
logger.warning("Docs directory not found: %s", tools_docs_dir)
return False
# 1. Create the RST stub file
stub_path = os.path.join(tools_docs_dir, f"{module_name}.rst")
if not os.path.exists(stub_path):
title = f"tools.{module_name} module"
underline = "-" * len(title)
content = (
f"{title}\n{underline}\n\n"
f".. automodule:: tools.{module_name}\n"
f" :members:\n"
f" :show-inheritance:\n"
f" :undoc-members:\n"
)
with open(stub_path, "w") as f:
f.write(content)
logger.info("Created RST stub: %s", stub_path)
# 2. Insert into modules.rst toctree (sorted)
if os.path.exists(modules_rst):
with open(modules_rst) as f:
lines = f.readlines()
entries: list[str] = []
entry_start = None
entry_end = None
for i, line in enumerate(lines):
stripped = line.rstrip("\n")
if stripped.startswith(" ") and stripped.strip():
if entry_start is None:
entry_start = i
entry_end = i + 1
entries.append(stripped.strip())
if module_name not in entries and entry_start is not None:
entries.append(module_name)
entries.sort()
new_entry_lines = [f" {e}\n" for e in entries]
lines[entry_start:entry_end] = new_entry_lines
with open(modules_rst, "w") as f:
f.writelines(lines)
logger.info(
"Added %s to docs toctree (%s)",
module_name, modules_rst,
)
# 3. Generate markdown doc from source and index into RAG store
_generate_and_index_tool_markdown(module_name, project_root)
return True
except Exception as exc:
logger.error("Failed to generate docs stub: %s", exc, exc_info=True)
return False
def _generate_and_index_tool_markdown(
module_name: str, project_root: str,
) -> None:
"""Parse tool source, generate markdown, and index into RAG store."""
source_path = os.path.join(project_root, "tools", f"{module_name}.py")
if not os.path.exists(source_path):
return
with open(source_path) as f:
source = f.read()
try:
tree = ast.parse(source)
except SyntaxError:
return
# Extract module docstring
module_doc = ast.get_docstring(tree) or ""
# Extract top-level assignments (TOOL_NAME, TOOL_DESCRIPTION, etc.)
tool_name = module_name
tool_description = ""
tool_params: dict = {}
tools_list: list[dict] = []
for node in ast.iter_child_nodes(tree):
if isinstance(node, ast.Assign):
for target in node.targets:
if not isinstance(target, ast.Name):
continue
if target.id == "TOOL_NAME":
tool_name = _safe_literal_eval(node.value) or module_name
elif target.id == "TOOL_DESCRIPTION":
tool_description = _safe_literal_eval(node.value) or ""
elif target.id == "TOOL_PARAMETERS":
tool_params = _safe_literal_eval(node.value) or {}
elif target.id == "TOOLS":
# Multi-tool file — extract name/description from list
raw = _safe_literal_eval(node.value)
if isinstance(raw, list):
tools_list = raw
# Extract async function signatures
functions: list[str] = []
for node in ast.iter_child_nodes(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
if node.name.startswith("_"):
continue
doc = ast.get_docstring(node) or ""
prefix = "async " if isinstance(node, ast.AsyncFunctionDef) else ""
sig = f"{prefix}def {node.name}({ast.unparse(node.args)})"
functions.append(f"### `{node.name}`\n\n```python\n{sig}\n```\n\n{doc}")
# Build markdown
md_parts = [f"# tools.{module_name}\n"]
if module_doc:
md_parts.append(f"{module_doc}\n")
if tools_list:
# Multi-tool file
md_parts.append("## Registered Tools\n")
for t in tools_list:
name = t.get("name", "?")
desc = t.get("description", "")
md_parts.append(f"### `{name}`\n\n{desc}\n")
params = t.get("parameters", {})
props = params.get("properties", {})
if props:
md_parts.append("**Parameters:**\n")
for pname, pinfo in props.items():
ptype = pinfo.get("type", "any")
pdesc = pinfo.get("description", "")
md_parts.append(f"- `{pname}` ({ptype}): {pdesc}")
md_parts.append("")
else:
# Single-tool file
if tool_description:
md_parts.append(f"**Description:** {tool_description}\n")
props = tool_params.get("properties", {})
if props:
md_parts.append("## Parameters\n")
for pname, pinfo in props.items():
ptype = pinfo.get("type", "any")
pdesc = pinfo.get("description", "")
md_parts.append(f"- `{pname}` ({ptype}): {pdesc}")
md_parts.append("")
if functions:
md_parts.append("## Functions\n")
md_parts.extend(functions)
md_content = "\n".join(md_parts)
# Write markdown file
md_dir = os.path.join(project_root, "docs", "_build", "markdown", "api", "tools")
os.makedirs(md_dir, exist_ok=True)
md_path = os.path.join(md_dir, f"{module_name}.md")
with open(md_path, "w") as f:
f.write(md_content)
logger.info("Generated markdown doc: %s", md_path)
# Index into RAG store
try:
from rag_system.file_rag_manager import get_stargazer_docs_store
store = get_stargazer_docs_store()
result = store.index_file(
md_path, tags=["stargazer", "documentation", "sphinx", "tool"],
)
action = result.get("action", "unknown")
logger.info("RAG indexed tool doc %s: %s", module_name, action)
except Exception as rag_exc:
logger.error("Failed to index tool doc in RAG: %s", rag_exc)
def _safe_literal_eval(node):
"""Try to evaluate an AST node as a Python literal."""
try:
# Handle string concatenation / JoinedStr
return ast.literal_eval(node)
except (ValueError, TypeError):
# For complex expressions (e.g. string concat with parens),
# try to unparse and eval
try:
return ast.literal_eval(ast.unparse(node))
except Exception:
return None
[docs]
async def run(
tool_code: str,
tool_name: str,
description: str = None,
overwrite: bool = False,
ctx: ToolContext | None = None,
) -> str:
"""Execute this tool and return the result.
Args:
tool_code (str): The tool code value.
tool_name (str): The tool name value.
description (str): Human-readable description.
overwrite (bool): The overwrite value.
ctx (ToolContext | None): Tool execution context providing access to bot internals.
Returns:
str: Result string.
"""
user_id = getattr(ctx, "user_id", "") or ""
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
if not await has_privilege(redis, user_id, PRIVILEGES["UNSANDBOXED_EXEC"], config):
return json.dumps({"success": False, "error": "The user does not have the UNSANDBOXED_EXEC privilege. Ask an admin to grant it with the alter_privileges tool."})
if not tool_code or not tool_code.strip():
return json.dumps({"success": False, "error": "Empty tool code provided"})
if not tool_name or not tool_name.strip():
return json.dumps({"success": False, "error": "Tool name is required"})
validation = _validate_python_code(tool_code)
if not validation["valid"]:
return json.dumps({
"success": False,
"error": "Code validation failed",
"validation_errors": validation["errors"],
"warnings": validation["warnings"],
})
filename = _generate_filename(tool_name)
tools_dir = os.path.dirname(os.path.abspath(__file__))
filepath = os.path.join(tools_dir, filename)
if await asyncio.to_thread(os.path.exists, filepath) and not overwrite:
return json.dumps({
"success": False,
"error": f"File already exists: {filename}. Use overwrite=true to replace",
})
try:
code_content = tool_code.strip()
if not code_content.startswith('"""'):
code_content = f'"""\nTool: {tool_name}\n{description or ""}\n"""\n{code_content}'
async with aiofiles.open(filepath, "w", encoding="utf-8") as f:
await f.write(code_content)
logger.info("Successfully created tool file: %s", filepath)
# --- Generate Sphinx documentation stub for the new tool ---
module_name = filename.removesuffix(".py")
docs_generated = await asyncio.to_thread(
_generate_docs_stub, module_name,
)
# --- Reload tool registry so the new tool is immediately usable ---
reload_success = False
new_tool_names: list[str] = []
registry = getattr(ctx, "tool_registry", None) if ctx else None
if registry is not None and config is not None:
try:
from tool_loader import load_tools
old_permissions = dict(registry._permissions)
registry._tools.clear()
registry.invalidate_cache()
load_tools(getattr(config, "tools_dir", "tools"), registry)
registry._permissions = old_permissions
reload_success = True
logger.info("Tool registry reloaded after creating %s", filename)
# Discover tool names registered by the new file
new_tool_names = [
td.name for td in registry.list_tools()
if td.name == tool_name
or td.name.replace("-", "_") == tool_name.replace("-", "_")
]
if not new_tool_names:
# Fallback: use the provided tool_name
new_tool_names = [tool_name]
except Exception as reload_exc:
logger.error("Failed to reload tools after creation: %s", reload_exc, exc_info=True)
# --- Regenerate embeddings for the new tool ---
embedding_success = False
if new_tool_names:
try:
from classifiers.refresh_tool_embeddings import (
refresh_tool_embeddings,
)
tools_dir = getattr(config, "tools_dir", "tools") if config else "tools"
embedding_success = await refresh_tool_embeddings(
tool_names=new_tool_names,
tools_dir=tools_dir,
)
if embedding_success:
logger.info(
"Embeddings regenerated for: %s",
", ".join(new_tool_names),
)
else:
logger.warning("Embedding refresh returned False for %s", new_tool_names)
except Exception as emb_exc:
logger.error("Failed to regenerate embeddings: %s", emb_exc, exc_info=True)
return json.dumps({
"success": True,
"message": "Tool created successfully",
"filename": filename,
"filepath": filepath,
"functions_registered": validation["functions_found"],
"reload_success": reload_success,
"embedding_success": embedding_success,
"docs_generated": docs_generated,
})
except Exception as e:
logger.error("Error writing tool file %s: %s", filepath, e)
return json.dumps({"success": False, "error": f"Failed to write file: {e}"})