"""Import an MCP tool from GitHub and convert it to a native Stargazer tool.
Clones the target repository, scans for MCP tool definitions, sends the
source to Cursor for conversion and then a second pass for security
auditing, and finally installs the result into the bot's tool registry
with embeddings and documentation.
Requires the UNSANDBOXED_EXEC privilege.
"""
from __future__ import annotations
import ast
import asyncio
import jsonutil as json
import logging
import os
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
from tools._cursor_api_key import get_cursor_api_key
from tools.alter_privileges import has_privilege, PRIVILEGES
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
TOOL_NAME = "import_mcp_tool"
TOOL_DESCRIPTION = (
"Import an MCP (Model Context Protocol) tool from a GitHub repository and "
"convert it into a native Stargazer tool. Clones the repo, uses Cursor AI "
"to strip all MCP protocol boilerplate and rewrite the tool in the bot's "
"native format, runs a security/privacy audit pass, then installs the tool "
"with automatic registry reload, classifier embeddings, and documentation. "
"Requires UNSANDBOXED_EXEC privilege. Accepts any GitHub repo URL containing "
"an MCP server (Python FastMCP, raw mcp SDK, or TypeScript MCP servers). "
"Uses the caller's personal Cursor API key when set (set_user_api_key "
"service=cursor api_key=crsr_...); otherwise falls back to the channel/global "
"pool, then the bot's configured key."
)
DEFAULT_MODEL = "gpt-5.3-codex-spark-preview"
CURSOR_TIMEOUT = 3600
MCP_INDICATORS = [
"fastmcp",
"FastMCP",
"mcp.server",
"mcp.types",
"@mcp.tool",
"@server.tool",
"McpServer",
"CallToolRequestSchema",
"ListToolsRequestSchema",
"from mcp",
"import mcp",
]
SOURCE_EXTENSIONS = {
".py",
".ts",
".js",
".mjs",
".tsx",
".jsx",
}
CONFIG_FILENAMES = {
"pyproject.toml",
"package.json",
"requirements.txt",
"setup.py",
"setup.cfg",
"Cargo.toml",
}
MAX_CONTEXT_CHARS = 80_000
MAX_FILE_CHARS = 8_000
MAX_FILES = 15
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"github_url": {
"type": "string",
"description": (
"GitHub URL of the MCP tool repository "
"(e.g. 'https://github.com/user/mcp-weather')."
),
},
"tool_name": {
"type": "string",
"description": (
"Override name for the generated tool file (without .py). "
"If omitted, auto-detected from the repo name."
),
},
"model": {
"type": "string",
"description": (
f"Cursor model to use for conversion. Default: {DEFAULT_MODEL}."
),
},
},
"required": ["github_url"],
}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _normalise_github_url(url: str) -> str:
"""Normalise an arbitrary GitHub reference into a clonable HTTPS URL.
Repairs the many shapes a user may paste — trailing slashes, ``#anchor``
fragments, ``/tree/<branch>`` browse paths, a missing ``.git`` suffix, and
``git@github.com:`` SSH form — so the downstream ``git clone`` always
receives a plain HTTPS ``.git`` URL. This is pure string manipulation with
no I/O. Called by :func:`run` to produce the ``clone_url`` it hands to
:func:`_clone_repo`; no other callers were found.
Args:
url (str): The raw GitHub URL or reference supplied by the caller.
Returns:
str: The cleaned, clonable HTTPS ``.git`` URL.
"""
url = url.strip().rstrip("/")
url = re.sub(r"#.*$", "", url)
url = re.sub(r"/tree/[^/]+.*$", "", url)
if not url.endswith(".git"):
url += ".git"
if url.startswith("git@"):
url = url.replace("git@github.com:", "https://github.com/")
return url
def _derive_tool_name(github_url: str) -> str:
"""Derive a default snake_case tool name from a GitHub repo URL.
Pulls the repository slug out of the URL, strips redundant ``mcp`` prefixes
and suffixes (e.g. ``mcp-weather`` becomes ``weather``), and squashes any
remaining non-alphanumeric characters into underscores so the result is a
valid module/tool name. This is a pure helper with no side effects, falling
back to ``"imported_tool"`` when nothing usable can be extracted. Called by
:func:`run` only when the caller did not pass an explicit ``tool_name``; no
other callers were found.
Args:
github_url (str): The repository URL to derive the name from.
Returns:
str: A sanitised snake_case tool name, or ``"imported_tool"`` as a
fallback.
"""
match = re.search(r"github\.com/[^/]+/([^/.]+)", github_url)
name = match.group(1) if match else "imported_tool"
name = re.sub(r"^mcp[-_]?", "", name, flags=re.IGNORECASE)
name = re.sub(r"[-_]?mcp$", "", name, flags=re.IGNORECASE)
name = re.sub(r"[^a-zA-Z0-9]", "_", name).strip("_").lower()
return name or "imported_tool"
async def _clone_repo(url: str, dest: str) -> tuple[bool, str]:
"""Shallow-clone a GitHub repository into a local directory.
Spawns ``git clone --depth 1`` via :func:`asyncio.create_subprocess_exec`
(no shell) under a 120-second timeout so the import never blocks the event
loop or hangs indefinitely on a slow/hostile remote. Its only side effect is
writing the cloned tree into ``dest`` on the filesystem. Called by
:func:`run` against a temporary directory after :func:`_normalise_github_url`
has cleaned the URL; no other callers were found.
Args:
url (str): The clonable HTTPS GitHub URL to fetch.
dest (str): Destination directory the repo is cloned into.
Returns:
tuple[bool, str]: ``(success, message)`` — ``(True, "")`` on a clean
clone, otherwise ``(False, <git stderr>)`` describing the failure.
"""
proc = await asyncio.create_subprocess_exec(
"git",
"clone",
"--depth",
"1",
url,
dest,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
_, stderr = await asyncio.wait_for(proc.communicate(), timeout=120)
if proc.returncode != 0:
return False, stderr.decode(errors="replace").strip()
return True, ""
def _scan_repo(repo_dir: str) -> tuple[list[dict], list[dict]]:
"""Walk the repo for MCP source files and config files.
Returns (source_files, config_files) where each entry is
{"path": relative_path, "content": str, "mcp_score": int}.
"""
source_files: list[dict] = []
config_files: list[dict] = []
for root, dirs, files in os.walk(repo_dir):
dirs[:] = [
d
for d in dirs
if d
not in {
"node_modules",
".git",
"__pycache__",
"venv",
".venv",
"dist",
"build",
}
]
for fname in files:
fpath = os.path.join(root, fname)
rel = os.path.relpath(fpath, repo_dir)
ext = os.path.splitext(fname)[1].lower()
if fname in CONFIG_FILENAMES:
try:
content = Path(fpath).read_text(errors="replace")[:MAX_FILE_CHARS]
config_files.append(
{"path": rel, "content": content, "mcp_score": 0}
)
except OSError:
pass
continue
if ext not in SOURCE_EXTENSIONS:
continue
try:
content = Path(fpath).read_text(errors="replace")
except OSError:
continue
score = sum(1 for ind in MCP_INDICATORS if ind in content)
if score == 0 and len(source_files) >= 5:
continue
source_files.append(
{
"path": rel,
"content": content[:MAX_FILE_CHARS],
"mcp_score": score,
}
)
source_files.sort(key=lambda f: f["mcp_score"], reverse=True)
return source_files[:MAX_FILES], config_files
def _build_context_block(source_files: list[dict], config_files: list[dict]) -> str:
"""Flatten the scanned repo files into one labelled text block for the LLM.
Concatenates the config files first, then the MCP-scored source files (each
fenced with a ``--- path ---`` header), stopping once the running size would
exceed :data:`MAX_CONTEXT_CHARS` so the conversion prompt stays within the
model's budget. This is a pure string builder with no I/O; it consumes the
dicts produced by :func:`_scan_repo`. Called by :func:`run`, which passes the
returned block into :func:`_build_conversion_prompt`; no other callers were
found.
Args:
source_files (list[dict]): Scanned source entries, each with ``path``,
``content`` and ``mcp_score`` keys.
config_files (list[dict]): Scanned config-file entries (same shape, with
``mcp_score`` of 0).
Returns:
str: A single newline-joined text block of headered file contents,
truncated to fit the context budget.
"""
parts: list[str] = []
total = 0
for f in config_files:
chunk = f"--- {f['path']} ---\n{f['content']}\n"
if total + len(chunk) > MAX_CONTEXT_CHARS:
break
parts.append(chunk)
total += len(chunk)
for f in source_files:
chunk = f"--- {f['path']} (MCP signals: {f['mcp_score']}) ---\n{f['content']}\n"
if total + len(chunk) > MAX_CONTEXT_CHARS:
break
parts.append(chunk)
total += len(chunk)
return "\n".join(parts)
_REFERENCE_TOOL = '''\
"""Tool: example_tool
Example of the target single-tool format.
"""
import jsonutil as json
import aiohttp
TOOL_NAME = "example_tool"
TOOL_DESCRIPTION = "One-paragraph description the LLM reads at selection time."
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "What to search for.",
},
"limit": {
"type": "integer",
"description": "Max results (default 5).",
},
},
"required": ["query"],
}
async def run(query: str, limit: int = 5) -> str:
"""Core logic — must be async, must return str."""
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com/search",
params={"q": query, "n": limit}) as resp:
data = await resp.json()
return json.dumps({"results": data})
'''
_MULTI_TOOL_REFERENCE = '''\
"""Multi-tool file format (use when the MCP server defines multiple tools)."""
import jsonutil as json
async def _search(query: str, limit: int = 5) -> str:
return json.dumps({"results": []})
async def _lookup(id: str) -> str:
return json.dumps({"item": {}})
TOOLS = [
{
"name": "example_search",
"description": "Search for items.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query."},
"limit": {"type": "integer", "description": "Max results."},
},
"required": ["query"],
},
"handler": _search,
},
{
"name": "example_lookup",
"description": "Look up an item by ID.",
"parameters": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "Item ID."},
},
"required": ["id"],
},
"handler": _lookup,
},
]
'''
def _build_conversion_prompt(
tool_name: str,
context_block: str,
) -> str:
"""Build the Cursor prompt that converts MCP source into a native tool.
Assembles the full instruction string fed to ``cursor-agent`` for the first
(conversion) pass: it embeds the collected repo source, both reference tool
templates, the numbered conversion rules, and the target output path so the
model rewrites the MCP server as a single ``tools/<tool_name>.py`` file.
This is a pure string builder with no side effects; it interpolates the
module-level ``_REFERENCE_TOOL`` and ``_MULTI_TOOL_REFERENCE`` example
templates along with its arguments. Called by :func:`run` (conversion step),
which passes the returned prompt to :func:`_run_cursor`.
Args:
tool_name (str): Sanitised snake_case name used both in the target file
path (``tools/<tool_name>.py``) and in the instructions.
context_block (str): Concatenated source/config text produced by
:func:`_build_context_block` describing the MCP repo.
Returns:
str: The fully rendered conversion prompt for ``cursor-agent``.
"""
return f"""\
You are converting an MCP (Model Context Protocol) tool into a native Stargazer bot tool.
## Source MCP code
{context_block}
## Target format
The output must be a SINGLE Python file written to: tools/{tool_name}.py
### Single-tool format (if the MCP server has one tool):
{_REFERENCE_TOOL}
### Multi-tool format (if the MCP server defines multiple tools):
{_MULTI_TOOL_REFERENCE}
## Conversion rules
1. Extract every tool defined in the MCP server.
2. Convert parameter schemas to JSON Schema (type/properties/required/description).
3. Preserve the core business logic. Rewrite MCP-specific I/O (TextContent, etc.) to return plain str (JSON-serialised where appropriate).
4. All handler functions MUST be async and MUST return str.
5. Parameter names in the function signature must match the JSON Schema properties.
6. For non-standard dependencies, wrap imports in try/except ImportError with a feature flag:
try:
import some_lib
SOME_LIB_AVAILABLE = True
except ImportError:
SOME_LIB_AVAILABLE = False
7. If the tool needs to make HTTP requests, prefer aiohttp or httpx (both already installed).
8. Strip ALL MCP protocol machinery: no FastMCP, no Server, no TextContent, no JSON-RPC, no mcp imports.
9. Include a module docstring. TOOL_DESCRIPTION must be comprehensive and self-contained.
10. If the MCP server is TypeScript/JavaScript, rewrite the logic in Python.
11. Do NOT include any test code or if __name__ == "__main__" blocks.
Write the file now to tools/{tool_name}.py. Do not create any other files.
"""
def _build_audit_prompt(tool_name: str) -> str:
"""Build the Cursor prompt for the security/privacy audit pass.
Produces the instruction string for the second ``cursor-agent`` invocation,
which re-reviews the freshly generated ``tools/<tool_name>.py`` for data
exfiltration, code injection, filesystem abuse, credential harvesting, and
related risks, fixes them in place, and appends a ``_AUDIT_REPORT`` constant
that :func:`_extract_audit_report` later parses out of the file.
This is a pure string builder with no side effects; it only interpolates
``tool_name`` into the static checklist template. Called by :func:`run`
(audit step), which forwards the returned prompt to :func:`_run_cursor`.
Args:
tool_name (str): Sanitised snake_case name identifying the generated
file (``tools/<tool_name>.py``) to audit and fix.
Returns:
str: The fully rendered security-audit prompt for ``cursor-agent``.
"""
return f"""\
You are a security auditor. Review the file tools/{tool_name}.py which was \
just generated by converting an untrusted MCP tool from a public GitHub repo.
This code is about to run INSIDE a production chatbot process with access to \
Redis, API keys, user data, and the full filesystem. Your job is to find and \
FIX every security and privacy flaw.
## Checklist — flag and fix ALL of the following:
1. DATA EXFILTRATION: Remove/rewrite any outbound HTTP/socket calls that \
send bot internals, user data, API keys, tokens, Redis contents, filesystem \
paths, or environment variables to external endpoints. Calls to the tool's \
own legitimate API are fine; calls to unknown/hardcoded suspicious URLs are not.
2. CODE INJECTION: Remove or rewrite any use of eval(), exec(), compile(), \
__import__(), importlib dynamic imports, pickle.loads, yaml.unsafe_load, \
subprocess (unless essential — if so, add a comment explaining why), \
os.system, ctypes, or similar.
3. FILESYSTEM ABUSE: Ensure the tool does not read/write outside its expected \
scope. No reading ~/.ssh, /etc/passwd, config.yaml, .env, or credential \
files. No open() on user-controllable paths without validation.
4. NETWORK ABUSE: No listening sockets, no reverse shells, no connecting to \
hardcoded attacker-controlled endpoints.
5. CREDENTIAL HARVESTING: Ensure no parameters, return values, or side \
effects expose or log API keys, tokens, passwords, or Redis data.
6. DEPENDENCY POISONING: Flag any pip install, subprocess calls that install \
packages, or runtime dependency fetching. Dependencies should be documented \
in comments, not silently installed.
7. PRIVILEGE ESCALATION: Ensure the tool does not access ctx.redis, \
ctx.config, ctx.adapter, or ctx.tool_registry unless genuinely needed. If it \
does, verify the access is scoped and safe.
8. OBFUSCATION: Flag base64-encoded strings, hex-encoded payloads, ROT13, \
or any encoded blobs that could hide malicious logic. Decode and inspect.
## Output format
After making your fixes, add a MODULE-LEVEL constant at the very end of the \
file (after all other code) in exactly this format:
_AUDIT_REPORT = {{
"issues_found": <int>,
"issues_fixed": <int>,
"details": [
"<one-line description of each finding and fix>",
],
}}
If the code is clean, set issues_found and issues_fixed to 0 and details to \
["No security issues found."].
Fix everything in-place in tools/{tool_name}.py now.
"""
async def _run_cursor(
prompt: str,
model: str,
cwd: str,
api_key: str,
timeout: int = CURSOR_TIMEOUT,
) -> tuple[bool, dict | None, str]:
"""Invoke the ``cursor-agent`` CLI with a prompt and capture its result.
Runs ``cursor-agent`` in headless ``--yolo`` JSON mode via
:func:`asyncio.create_subprocess_exec` (no shell) under a timeout, feeding it
the caller's API key, model and prompt. The agent's real side effect is
writing/modifying ``tools/<tool_name>.py`` inside ``cwd``; here we only spawn
it and parse its stdout (falling back to a ``raw_output`` wrapper if the JSON
cannot be decoded). Returns early without spawning anything when no API key is
available. Called by :func:`run` twice — once for the conversion pass and once
for the security-audit pass; no other callers were found.
Args:
prompt (str): The full instruction string passed to ``cursor-agent``.
model (str): Cursor model identifier to run the prompt against.
cwd (str): Working directory the agent runs in (the bot's root, so it can
write ``tools/<tool_name>.py``).
api_key (str): Cursor API key; if empty the call fails fast.
timeout (int): Seconds to wait for the agent before
:func:`asyncio.wait_for` raises. Defaults to :data:`CURSOR_TIMEOUT`.
Returns:
tuple[bool, dict | None, str]: ``(success, response_json, stderr)`` where
``success`` reflects a zero exit code, ``response_json`` is the parsed
stdout (or ``None`` / a ``raw_output`` wrapper), and the last element is
the decoded stderr.
"""
if not api_key:
return (
False,
None,
(
"CURSOR_API_KEY is not configured. Set the CURSOR_API_KEY env "
"var or cursor_api_key in config.yaml before running this tool."
),
)
cmd = [
"cursor-agent",
"--api-key",
api_key,
"--model",
model,
"--output-format",
"json",
"--yolo",
"-p",
prompt,
]
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=cwd,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
stdout_text = stdout.decode(errors="replace") if stdout else ""
stderr_text = stderr.decode(errors="replace") if stderr else ""
response = None
if stdout_text.strip():
try:
response = json.loads(stdout_text)
except json.JSONDecodeError:
response = {"raw_output": stdout_text[:4000]}
return proc.returncode == 0, response, stderr_text
def _validate_tool_file(filepath: str) -> tuple[bool, list[str]]:
"""Statically validate a generated tool file before it is installed.
Reads the file and parses it with :mod:`ast`, then walks the tree to confirm
it is syntactically valid and structurally a Stargazer tool — it must declare
a ``TOOL_NAME`` or ``TOOLS`` assignment and define at least one function. This
is a read-only filesystem check (no import/execution of the untrusted code).
Called by :func:`run` twice: immediately after the Cursor conversion pass and
again after the security-audit pass, deleting the file if either check fails.
No other callers were found.
Args:
filepath (str): Path to the generated ``tools/<tool_name>.py`` file.
Returns:
tuple[bool, list[str]]: ``(valid, errors)`` — ``valid`` is ``True`` only
when ``errors`` is empty; ``errors`` lists read failures, syntax errors,
or missing registration/function structure.
"""
errors: list[str] = []
try:
source = Path(filepath).read_text()
except OSError as exc:
return False, [f"Cannot read file: {exc}"]
try:
tree = ast.parse(source)
except SyntaxError as exc:
return False, [f"Syntax error: {exc}"]
has_registration = False
for node in ast.walk(tree):
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id in ("TOOL_NAME", "TOOLS"):
has_registration = True
if not has_registration:
errors.append("No TOOL_NAME or TOOLS assignment found")
has_func = any(
isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef)) for n in ast.walk(tree)
)
if not has_func:
errors.append("No function definitions found")
return len(errors) == 0, errors
def _extract_audit_report(filepath: str) -> dict:
"""Recover the ``_AUDIT_REPORT`` summary the audit pass appended to the file.
Reads the post-audit tool file and walks its :mod:`ast` tree for the
module-level ``_AUDIT_REPORT`` assignment, returning it via
:func:`ast.literal_eval` (a safe parse that never executes the untrusted
code). On any read/parse problem, or when the constant is absent, it returns
a sentinel dict with ``issues_found``/``issues_fixed`` of ``-1``. Called by
:func:`run` after the security-audit Cursor pass to surface the audit summary
in the tool's result payload; no other callers were found.
Args:
filepath (str): Path to the audited ``tools/<tool_name>.py`` file.
Returns:
dict: The parsed audit report (``issues_found``, ``issues_fixed``,
``details``), or the ``-1`` sentinel dict when it cannot be read.
"""
default = {
"issues_found": -1,
"issues_fixed": -1,
"details": ["Audit report not found in file."],
}
try:
source = Path(filepath).read_text()
tree = ast.parse(source)
except Exception:
return default
for node in ast.walk(tree):
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == "_AUDIT_REPORT":
try:
return ast.literal_eval(node.value)
except Exception:
return default
return default
def _extract_tool_names(filepath: str) -> list[str]:
"""Statically collect the tool names a generated file will register.
Parses the file with :mod:`ast` and walks it for a ``TOOL_NAME`` string
assignment (single-tool file) or a ``TOOLS`` list assignment (multi-tool
file), using :func:`ast.literal_eval` so the untrusted module is never
imported or executed. Returns every registered name found, swallowing any
parse error into an empty list. Called by :func:`run` to learn which names to
feed the classifier-embedding refresh after install; no other callers were
found.
Args:
filepath (str): Path to the generated ``tools/<tool_name>.py`` file.
Returns:
list[str]: The registered tool names (possibly empty when none can be
statically resolved).
"""
names: list[str] = []
try:
source = Path(filepath).read_text()
tree = ast.parse(source)
except Exception:
return names
for node in ast.walk(tree):
if isinstance(node, ast.Assign):
for target in node.targets:
if not isinstance(target, ast.Name):
continue
if target.id == "TOOL_NAME":
try:
names.append(ast.literal_eval(node.value))
except Exception:
pass
elif target.id == "TOOLS":
try:
tools_list = ast.literal_eval(node.value)
if isinstance(tools_list, list):
for t in tools_list:
if isinstance(t, dict) and "name" in t:
names.append(t["name"])
except Exception:
pass
return names
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
[docs]
async def run(
github_url: str,
tool_name: str = "",
model: str = "",
ctx: ToolContext | None = None,
) -> str:
"""Import and convert an MCP tool from GitHub into a native Stargazer tool.
Clones ``github_url``, scans for MCP tool definitions, runs a Cursor
conversion pass followed by a Cursor security-audit pass, then validates
and installs the generated ``tools/<tool_name>.py`` (registry reload,
classifier embeddings, and docs). Requires the UNSANDBOXED_EXEC privilege.
Args:
github_url (str): GitHub URL of the MCP tool repository to import.
tool_name (str): Optional override for the generated tool file name
(without ``.py``). Auto-derived from the repo name if empty.
model (str): Optional Cursor model to use; defaults to ``DEFAULT_MODEL``.
ctx (ToolContext | None): Tool execution context.
Returns:
str: JSON describing success/failure, the installed tool names, and
the security-audit summary.
"""
user_id = getattr(ctx, "user_id", "") or ""
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
if not await has_privilege(redis, user_id, PRIVILEGES["UNSANDBOXED_EXEC"], config):
return json.dumps(
{
"success": False,
"error": "Requires UNSANDBOXED_EXEC privilege.",
}
)
if not github_url or not github_url.strip():
return json.dumps({"success": False, "error": "Empty github_url."})
api_key = await get_cursor_api_key(ctx)
if not api_key:
from tools.manage_api_keys import missing_api_key_error
return json.dumps({"success": False, "error": missing_api_key_error("cursor")})
clone_url = _normalise_github_url(github_url)
tool_name = (tool_name.strip() if tool_name else "") or _derive_tool_name(
github_url
)
tool_name = (
re.sub(r"[^a-zA-Z0-9_]", "_", tool_name).lower().strip("_") or "imported_tool"
)
selected_model = (model.strip() if model else "") or DEFAULT_MODEL
bot_dir = os.getcwd()
tool_filepath = os.path.join(bot_dir, "tools", f"{tool_name}.py")
if os.path.exists(tool_filepath):
return json.dumps(
{
"success": False,
"error": f"tools/{tool_name}.py already exists. Choose a different name or delete it first.",
}
)
tmp_dir = tempfile.mkdtemp(prefix="mcp_import_")
try:
# ---- 1. Clone ----
logger.info("Cloning %s into %s", clone_url, tmp_dir)
clone_dest = os.path.join(tmp_dir, "repo")
ok, err_msg = await _clone_repo(clone_url, clone_dest)
if not ok:
return json.dumps(
{
"success": False,
"error": f"git clone failed: {err_msg}",
}
)
# ---- 2. Scan ----
source_files, config_files = _scan_repo(clone_dest)
mcp_files = [f for f in source_files if f["mcp_score"] > 0]
if not mcp_files:
return json.dumps(
{
"success": False,
"error": "No MCP tool definitions found in the repository.",
"files_scanned": [f["path"] for f in source_files],
}
)
context_block = _build_context_block(source_files, config_files)
logger.info(
"Scanned repo: %d source files (%d with MCP signals), %d config files, %d chars context",
len(source_files),
len(mcp_files),
len(config_files),
len(context_block),
)
# ---- 3. Conversion via Cursor ----
conversion_prompt = _build_conversion_prompt(tool_name, context_block)
logger.info("Running Cursor conversion pass (model=%s)...", selected_model)
try:
conv_ok, conv_resp, conv_stderr = await _run_cursor(
conversion_prompt,
selected_model,
bot_dir,
api_key,
)
except asyncio.TimeoutError:
return json.dumps(
{
"success": False,
"error": "Cursor conversion timed out.",
}
)
if not conv_ok:
return json.dumps(
{
"success": False,
"error": "Cursor conversion failed.",
"cursor_response": conv_resp,
"stderr": conv_stderr[:2000],
}
)
# ---- 4. Verify conversion output ----
if not os.path.exists(tool_filepath):
return json.dumps(
{
"success": False,
"error": f"Cursor did not create tools/{tool_name}.py.",
"cursor_response": conv_resp,
}
)
valid, validation_errors = _validate_tool_file(tool_filepath)
if not valid:
os.remove(tool_filepath)
return json.dumps(
{
"success": False,
"error": "Generated tool failed validation.",
"validation_errors": validation_errors,
"cursor_response": conv_resp,
}
)
# ---- 5. Security audit via Cursor ----
audit_prompt = _build_audit_prompt(tool_name)
logger.info("Running Cursor security audit pass...")
try:
audit_ok, audit_resp, audit_stderr = await _run_cursor(
audit_prompt,
selected_model,
bot_dir,
api_key,
)
except asyncio.TimeoutError:
logger.error("Security audit timed out — deleting tool file (fail closed)")
if os.path.exists(tool_filepath):
os.remove(tool_filepath)
return json.dumps(
{
"success": False,
"error": "Security audit timed out. Tool file deleted (fail closed).",
}
)
if not audit_ok:
logger.error("Security audit Cursor pass failed — deleting tool file")
if os.path.exists(tool_filepath):
os.remove(tool_filepath)
return json.dumps(
{
"success": False,
"error": "Security audit Cursor pass failed. Tool file deleted (fail closed).",
"cursor_response": audit_resp,
"stderr": audit_stderr[:2000],
}
)
# Re-validate after audit modifications
valid, validation_errors = _validate_tool_file(tool_filepath)
if not valid:
if os.path.exists(tool_filepath):
os.remove(tool_filepath)
return json.dumps(
{
"success": False,
"error": "Tool file invalid after security audit pass.",
"validation_errors": validation_errors,
}
)
audit_report = _extract_audit_report(tool_filepath)
logger.info(
"Audit complete: %d found, %d fixed",
audit_report.get("issues_found", -1),
audit_report.get("issues_fixed", -1),
)
# ---- 6. Extract tool names ----
registered_names = _extract_tool_names(tool_filepath)
if not registered_names:
registered_names = [tool_name]
# ---- 7. Reload tool registry ----
reload_success = False
registry = getattr(ctx, "tool_registry", None) if ctx else None
if registry is not None and config is not None:
try:
from tool_loader import load_tools
with registry._lock:
old_permissions = dict(registry._permissions)
registry._tools.clear()
registry.invalidate_cache()
load_tools(getattr(config, "tools_dir", "tools"), registry)
registry._permissions = old_permissions
reload_success = True
logger.info("Tool registry reloaded after importing %s", tool_name)
except Exception as exc:
logger.error("Registry reload failed: %s", exc, exc_info=True)
# ---- 8. Refresh embeddings ----
embedding_success = False
if registered_names:
try:
from classifiers.refresh_tool_embeddings import refresh_tool_embeddings
tools_dir = getattr(config, "tools_dir", "tools") if config else "tools"
embedding_success = await refresh_tool_embeddings(
tool_names=registered_names,
tools_dir=tools_dir,
)
if embedding_success:
logger.info(
"Embeddings refreshed for: %s", ", ".join(registered_names)
)
except Exception as exc:
logger.error("Embedding refresh failed: %s", exc, exc_info=True)
# ---- 9. Generate docs ----
docs_generated = False
try:
from tools.write_python_tool import _generate_docs_stub
module_name = tool_name
docs_generated = _generate_docs_stub(module_name)
except Exception as exc:
logger.error("Docs generation failed: %s", exc, exc_info=True)
return json.dumps(
{
"success": True,
"tool_file": f"tools/{tool_name}.py",
"tool_names": registered_names,
"source_repo": github_url,
"reload_success": reload_success,
"embedding_success": embedding_success,
"docs_generated": docs_generated,
"audit": {
"passed": audit_report.get("issues_found", -1) >= 0,
"issues_found": audit_report.get("issues_found", 0),
"issues_fixed": audit_report.get("issues_fixed", 0),
"details": audit_report.get("details", []),
},
"mcp_files_found": [f["path"] for f in mcp_files],
}
)
except Exception as exc:
logger.error("import_mcp_tool failed: %s", exc, exc_info=True)
if os.path.exists(tool_filepath):
os.remove(tool_filepath)
return json.dumps({"success": False, "error": f"Unexpected error: {exc}"})
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)