"""Tools to install, configure, enumerate, document, and call MCP servers via mcpo (OpenAPI HTTP).
URLs are built only from configured ``mcpo.base_url`` plus validated path segments
(not arbitrary user URLs), so loopback/private targets are allowed.
"""
from __future__ import annotations
import asyncio
import json as _json
import logging
import re
import subprocess
import sys
from pathlib import Path
from typing import TYPE_CHECKING, Any
from urllib.parse import quote
import httpx
import jsonutil as json
from tools.alter_privileges import PRIVILEGES, has_privilege
if TYPE_CHECKING:
from config import Config
from tool_context import ToolContext
logger = logging.getLogger(__name__)
_SERVER_RE = re.compile(r"^[a-zA-Z0-9_-]+$")
_TOOL_RE = re.compile(r"^[a-zA-Z0-9_-]+$")
def _json_size(obj: Any) -> int:
"""Estimate the serialized size of an object in characters.
Serializes ``obj`` to JSON (coercing non-serializable values via ``str``)
and returns the length of the resulting string, falling back to the length
of ``str(obj)`` if serialization fails. Used as a cheap byte-budget proxy.
This helper has no callers within this module or elsewhere in the repository
(it is retained as an unused size-estimation utility).
Args:
obj: Any value whose JSON-serialized length is wanted.
Returns:
int: Approximate character length of the JSON representation.
"""
try:
return len(json.dumps(obj, default=str))
except (TypeError, ValueError):
return len(str(obj))
def _truncate_value(
obj: Any,
max_chars: int,
) -> tuple[Any, bool]:
"""Truncate a value to a character budget, returning it and a truncated flag.
Serializes ``obj`` to JSON and, if it fits within ``max_chars``, returns the
original object unchanged. If it exceeds the budget, returns a stand-in dict
``{"_truncated": True, "preview": <head of the JSON> + "…(truncated)"}`` so
the caller can still surface a readable preview. A non-positive budget yields
``(None, True)``.
This is called by :func:`handle_list_tools` (to cap per-tool request and
response schemas) and by :func:`handle_get_tool_schema` (to shrink an
oversized schema payload before returning it to the model).
Args:
obj: The value to size-limit (typically a JSON-Schema dict).
max_chars: Maximum allowed serialized length in characters.
Returns:
tuple[Any, bool]: The (possibly replaced) value and a boolean that is
``True`` when truncation occurred.
"""
if max_chars <= 0:
return None, True
raw = json.dumps(obj, default=str)
if len(raw) <= max_chars:
return obj, False
cut = max(0, max_chars - 40)
preview = raw[:cut] + "…(truncated)"
return {"_truncated": True, "preview": preview}, True
def _auth_headers(api_key: str) -> dict[str, str]:
"""Build the bearer-auth headers for mcpo HTTP requests.
Returns an ``Authorization: Bearer <key>`` header dict when a non-blank
``api_key`` is supplied, or an empty dict otherwise (mcpo may run without
auth). The key is read from ``cfg.mcpo_api_key`` by the callers.
Called by every outbound HTTP handler in this module
(:func:`handle_list_servers`, :func:`handle_list_tools`,
:func:`handle_get_tool_schema`, :func:`handle_call_tool`) to authenticate
the request to the mcpo service.
Args:
api_key: The configured mcpo API key, possibly empty or ``None``.
Returns:
dict[str, str]: Header mapping with a bearer token, or empty if no key.
"""
k = (api_key or "").strip()
if not k:
return {}
return {"Authorization": f"Bearer {k}"}
def _require_cfg(ctx: ToolContext | None) -> Config | None:
"""Extract the bot :class:`Config` from a tool context, if present.
Safely pulls ``ctx.config`` out of the per-invocation ``ToolContext``,
returning ``None`` when the context itself or the attribute is missing so
callers can short-circuit with a ``no_config`` error.
Called at the top of every tool handler in this module to obtain mcpo
settings (``mcpo_base_url``, ``mcpo_config_path``, ``mcpo_api_key``).
Args:
ctx: The tool invocation context, or ``None``.
Returns:
Config | None: The bot configuration object, or ``None`` if unavailable.
"""
if ctx is None:
return None
return getattr(ctx, "config", None)
def _mcpo_base(cfg: Config | None) -> str:
"""Return the normalized mcpo base URL from config.
Reads ``cfg.mcpo_base_url``, stripping surrounding whitespace and any
trailing slash so it can be concatenated with path segments. Returns an
empty string when no config or no base URL is set, which callers treat as
``mcpo_base_url_not_configured``.
Called by :func:`handle_list_servers`, :func:`handle_list_tools`,
:func:`handle_get_tool_schema`, and :func:`handle_call_tool` before building
per-server URLs via :func:`_server_url`.
Args:
cfg: The bot configuration, or ``None``.
Returns:
str: The cleaned base URL, or ``""`` if not configured.
"""
if cfg is None:
return ""
return (getattr(cfg, "mcpo_base_url", "") or "").strip().rstrip("/")
def _server_url(base: str, server_name: str) -> str:
"""Build the validated per-server mcpo URL for a given server name.
Joins the configured ``base`` with a URL-encoded ``server_name`` only after
confirming the name matches ``_SERVER_RE`` (letters, digits, ``_``, ``-``).
This whitelist is the core SSRF guard: callers never pass arbitrary URLs, so
only loopback/private mcpo routes derived from config are reachable.
Called by :func:`handle_list_servers`, :func:`handle_list_tools`,
:func:`handle_get_tool_schema`, and :func:`handle_call_tool`, which append
``/openapi.json`` or a tool path to the result.
Args:
base: The normalized mcpo base URL (see :func:`_mcpo_base`).
server_name: The mount/route segment to validate and encode.
Returns:
str: The fully-qualified ``{base}/{server_name}`` URL.
Raises:
ValueError: If ``base`` is empty or ``server_name`` is invalid.
"""
if not base:
raise ValueError("mcpo_base_url is not configured")
if not _SERVER_RE.match(server_name or ""):
raise ValueError("Invalid server_name (use letters, digits, _ and -)")
return f"{base}/{quote(server_name, safe='')}"
async def _require_unsandboxed(ctx: ToolContext | None) -> str | None:
"""Enforce the ``UNSANDBOXED_EXEC`` privilege for mutating mcpo actions.
Verifies the calling user holds the ``UNSANDBOXED_EXEC`` privilege before a
handler is allowed to install software or rewrite the mcpo config on disk.
Returns ``None`` to signal "allowed"; otherwise returns a ready-to-send JSON
error string so the handler can ``return`` it directly.
Interacts with the privilege store via :func:`tools.alter_privileges.has_privilege`,
passing ``ctx.redis``, ``ctx.user_id``, ``PRIVILEGES["UNSANDBOXED_EXEC"]``, and
``ctx.config`` (the privilege lookup is backed by Redis). No state is written.
Called as a gate at the start of :func:`handle_install_runtime`,
:func:`handle_upsert_server`, and :func:`handle_remove_server`.
Args:
ctx: The tool invocation context (must carry ``redis`` and ``config``).
Returns:
str | None: ``None`` when the privilege is granted; a JSON error string
(``privilege_check_failed`` or ``forbidden``) when it is not.
"""
if ctx is None or ctx.redis is None or ctx.config is None:
return json.dumps(
{
"error": "privilege_check_failed",
"details": "Context, Redis, or config missing",
}
)
ok = await has_privilege(
ctx.redis,
ctx.user_id,
PRIVILEGES["UNSANDBOXED_EXEC"],
ctx.config,
)
if not ok:
return json.dumps(
{
"error": "forbidden",
"details": "Requires UNSANDBOXED_EXEC privilege",
}
)
return None
def _extract_json_schema_from_content(content: dict[str, Any]) -> dict[str, Any] | None:
"""Pull the JSON Schema out of an OpenAPI ``content`` mapping.
Scans an OpenAPI media-type ``content`` object and returns the ``schema`` of
the ``application/json`` entry (tolerating media-type parameters such as
``application/json; charset=utf-8``), or ``None`` if no JSON schema is found.
Called by :func:`_request_schema` and :func:`_response_schema_200` to dig the
schema out of an operation's request body and response definitions.
Args:
content: The OpenAPI ``content`` mapping (media type -> media object).
Returns:
dict[str, Any] | None: The JSON Schema dict, or ``None`` if absent.
"""
for key, block in (content or {}).items():
if not isinstance(block, dict):
continue
lk = str(key).lower().split(";")[0].strip()
if lk == "application/json":
sch = block.get("schema")
if isinstance(sch, dict):
return sch
return None
def _request_schema(op: dict[str, Any]) -> dict[str, Any] | None:
"""Extract the request-body JSON Schema from an OpenAPI operation.
Navigates ``op["requestBody"]["content"]`` and delegates to
:func:`_extract_json_schema_from_content` to return the ``application/json``
request schema, or ``None`` when the operation declares no JSON body.
Called by :func:`handle_list_tools` and :func:`handle_get_tool_schema` to
surface each tool's input schema to the model.
Args:
op: An OpenAPI operation object (the ``post`` entry for a path).
Returns:
dict[str, Any] | None: The request JSON Schema, or ``None``.
"""
rb = op.get("requestBody")
if not isinstance(rb, dict):
return None
content = rb.get("content")
if not isinstance(content, dict):
return None
return _extract_json_schema_from_content(content)
def _response_schema_200(op: dict[str, Any]) -> dict[str, Any] | None:
"""Extract a success-response JSON Schema from an OpenAPI operation.
Inspects ``op["responses"]`` in priority order (``200``, ``201``, ``202``,
then ``default``) and returns the first ``application/json`` schema found via
:func:`_extract_json_schema_from_content`, or ``None`` if none is declared.
Called by :func:`handle_list_tools` and :func:`handle_get_tool_schema` to
include the expected response shape (``response_schema_200``) alongside the
request schema.
Args:
op: An OpenAPI operation object (the ``post`` entry for a path).
Returns:
dict[str, Any] | None: The success-response JSON Schema, or ``None``.
"""
responses = op.get("responses")
if not isinstance(responses, dict):
return None
for code in ("200", "201", "202", "default"):
block = responses.get(code)
if not isinstance(block, dict):
continue
content = block.get("content")
if not isinstance(content, dict):
continue
sch = _extract_json_schema_from_content(content)
if sch is not None:
return sch
return None
[docs]
async def handle_install_runtime(
package_spec: str = "mcpo>=0.0.20",
ctx: ToolContext | None = None,
) -> str:
"""Install or upgrade the ``mcpo`` PyPI package into this Python environment.
Backs the ``mcpo_install_runtime`` tool. Intended for dev use without Docker
(production runs mcpo as a Compose service). Gates on the ``UNSANDBOXED_EXEC``
privilege and a Python 3.11+ interpreter, then shells out to ``pip install``.
Interacts with :func:`_require_unsandboxed` for the privilege gate and runs
``{sys.executable} -m pip install <package_spec>`` via ``subprocess.run`` on a
worker thread (``asyncio.to_thread``) with a 600s timeout — a real side effect
that mutates the local environment. After install it reads the resolved
version through ``importlib.metadata.version("mcpo")``. No Redis/KG/event-bus
interaction.
Called indirectly by the tool registry (registered in ``TOOLS`` as
``mcpo_install_runtime`` and dispatched by ``tool_loader.py``); also called
directly in ``tests/test_mcpo_proxy_tools.py``.
Args:
package_spec: A pip requirement string (default ``"mcpo>=0.0.20"``).
ctx: The tool invocation context, used for the privilege check.
Returns:
str: A JSON string. On the privilege/version gates, an ``error`` payload;
otherwise ``{"ok", "exit_code", "mcpo_version", "stdout_tail",
"stderr_tail"}`` (output tails capped at 4000 chars). pip timeout or
failure yields ``{"error": "pip_timeout"}`` / ``{"error": "pip_failed"}``.
"""
err = await _require_unsandboxed(ctx)
if err:
return err
if sys.version_info < (3, 11):
return json.dumps(
{
"error": "python_version",
"details": "mcpo requires Python 3.11+",
"python": sys.version,
}
)
def _run() -> tuple[int, str, str]:
"""Run the blocking ``pip install`` subprocess and capture its output.
Closure over the enclosing ``package_spec``; executes
``{sys.executable} -m pip install <package_spec>`` synchronously with a
600-second timeout and captured text streams. Offloaded to a thread by
the enclosing :func:`handle_install_runtime` via ``asyncio.to_thread``.
Returns:
tuple[int, str, str]: The process exit code, stdout, and stderr.
Raises:
subprocess.TimeoutExpired: If pip exceeds the 600-second timeout
(caught and reported by the caller).
"""
r = subprocess.run(
[sys.executable, "-m", "pip", "install", package_spec],
capture_output=True,
text=True,
timeout=600,
)
return r.returncode, r.stdout or "", r.stderr or ""
try:
code, out, err = await asyncio.to_thread(_run)
except subprocess.TimeoutExpired:
return json.dumps({"error": "pip_timeout"})
except Exception as exc:
logger.exception("mcpo install failed")
return json.dumps({"error": "pip_failed", "details": str(exc)})
ver = ""
try:
from importlib.metadata import version as pkg_version
ver = pkg_version("mcpo")
except Exception:
pass
return json.dumps(
{
"ok": code == 0,
"exit_code": code,
"mcpo_version": ver or None,
"stdout_tail": (out or "")[-4000:],
"stderr_tail": (err or "")[-4000:],
}
)
[docs]
async def handle_upsert_server(
server_name: str,
server_config_json: str,
ctx: ToolContext | None = None,
) -> str:
"""Add or replace one server entry in the Claude-style mcpo JSON config.
Backs the ``mcpo_upsert_server`` tool. Parses ``server_config_json`` (a JSON
object describing a single MCP server, e.g. ``command``/``args`` for stdio or
``type``/``url``/``headers`` for sse/streamable-http) and writes it under the
``mcpServers`` map in the on-disk mcpo config, creating the file if needed.
Gated by :func:`_require_unsandboxed` (``UNSANDBOXED_EXEC``); reads config via
:func:`_require_cfg` and validates ``server_name`` against ``_SERVER_RE``. The
side effect is filesystem: it creates ``cfg.mcpo_config_path`` (and parent
dirs), merges the new entry into any existing ``mcpServers``, and rewrites the
file as pretty-printed JSON. That path is bind-mounted into the mcpo Docker
container, so with ``--hot-reload`` the change applies without a restart. No
Redis/KG/event-bus writes.
Called indirectly through the tool registry (``mcpo_upsert_server`` in
``TOOLS``); also exercised directly by ``tests/test_mcpo_proxy_tools.py``.
Args:
server_name: Mount/route segment (validated by ``_SERVER_RE``).
server_config_json: JSON object string for this one server's config.
ctx: The tool invocation context (privilege check and config source).
Returns:
str: A JSON string — ``{"ok": True, "path", "server", "hint"}`` on
success, or an ``error`` payload (``forbidden``, ``no_config``,
``invalid_server_name``, ``invalid_json``,
``server_config_must_be_object``, or ``existing_config_invalid_json``).
"""
err = await _require_unsandboxed(ctx)
if err:
return err
cfg = _require_cfg(ctx)
if cfg is None:
return json.dumps({"error": "no_config"})
if not _SERVER_RE.match(server_name or ""):
return json.dumps({"error": "invalid_server_name"})
try:
entry = json.loads(server_config_json)
except _json.JSONDecodeError as e:
return json.dumps({"error": "invalid_json", "details": str(e)})
if not isinstance(entry, dict) or not entry:
return json.dumps({"error": "server_config_must_be_object"})
path = Path(cfg.mcpo_config_path)
path.parent.mkdir(parents=True, exist_ok=True)
if path.exists():
try:
data = json.loads(path.read_text(encoding="utf-8"))
except _json.JSONDecodeError:
return json.dumps({"error": "existing_config_invalid_json"})
else:
data = {}
servers = data.get("mcpServers")
if not isinstance(servers, dict):
servers = {}
servers[server_name] = entry
data["mcpServers"] = servers
path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
return json.dumps(
{
"ok": True,
"path": str(path.resolve()),
"server": server_name,
"hint": (
"Docker Compose mcpo uses --hot-reload; edits apply without restart. "
"Otherwise restart the mcpo container."
),
}
)
[docs]
async def handle_remove_server(
server_name: str,
ctx: ToolContext | None = None,
) -> str:
"""Remove one named server from the mcpo JSON config.
Backs the ``mcpo_remove_server`` tool. Loads the on-disk mcpo config, deletes
the named entry from its ``mcpServers`` map, and rewrites the file. With
Docker ``--hot-reload``, mcpo drops the corresponding mount automatically.
Gated by :func:`_require_unsandboxed` (``UNSANDBOXED_EXEC``); config comes
from :func:`_require_cfg` and the name is validated against ``_SERVER_RE``.
The side effect is filesystem: it reads and rewrites ``cfg.mcpo_config_path``
as pretty-printed JSON. No Redis/KG/event-bus writes.
Called indirectly via the tool registry (``mcpo_remove_server`` in
``TOOLS``); also called directly in ``tests/test_mcpo_proxy_tools.py``.
Args:
server_name: The mount name to remove (validated by ``_SERVER_RE``).
ctx: The tool invocation context (privilege check and config source).
Returns:
str: A JSON string — ``{"ok": True, "removed", "path", "hint"}`` on
success, or an ``error``/``ok: False`` payload (``forbidden``,
``no_config``, ``invalid_server_name``, ``config_missing``,
``config_invalid_json``, ``no_mcp_servers_in_config``, or
``server_not_found``).
"""
err = await _require_unsandboxed(ctx)
if err:
return err
cfg = _require_cfg(ctx)
if cfg is None:
return json.dumps({"error": "no_config"})
if not _SERVER_RE.match(server_name or ""):
return json.dumps({"error": "invalid_server_name"})
path = Path(cfg.mcpo_config_path)
if not path.exists():
return json.dumps(
{
"ok": False,
"error": "config_missing",
"path": str(path),
}
)
try:
data = json.loads(path.read_text(encoding="utf-8"))
except _json.JSONDecodeError:
return json.dumps({"error": "config_invalid_json", "path": str(path)})
servers = data.get("mcpServers")
if not isinstance(servers, dict):
return json.dumps({"error": "no_mcp_servers_in_config"})
if server_name not in servers:
return json.dumps(
{
"ok": False,
"error": "server_not_found",
"server": server_name,
}
)
del servers[server_name]
data["mcpServers"] = servers
path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
return json.dumps(
{
"ok": True,
"removed": server_name,
"path": str(path.resolve()),
"hint": (
"With Docker Compose + --hot-reload, mcpo unmounts the server automatically."
),
}
)
[docs]
async def handle_list_servers(
probe_openapi: bool = True,
ctx: ToolContext | None = None,
) -> str:
"""List MCP server names from the mcpo config, optionally probing OpenAPI.
Backs the ``mcpo_list_servers`` tool. Reads the ``mcpServers`` keys from the
on-disk mcpo config and returns them sorted. When ``probe_openapi`` is true
and a base URL is configured, it additionally GETs each server's
``openapi.json`` to report its API title/version and HTTP status.
Reads ``cfg.mcpo_config_path`` (via :func:`_require_cfg`) and resolves the
base URL with :func:`_mcpo_base`. The OpenAPI probe issues authenticated HTTP
GETs (headers from :func:`_auth_headers`, URLs from :func:`_server_url`)
through ``httpx.AsyncClient`` with a 15s/5s connect timeout; per-server
failures are captured as ``openapi_error`` rather than raised. No
Redis/KG/event-bus interaction.
Called indirectly via the tool registry (``mcpo_list_servers`` in ``TOOLS``);
no direct internal callers outside the test suite.
Args:
probe_openapi: When ``True`` (default), fetch each server's
``openapi.json`` for title/version metadata.
ctx: The tool invocation context (config source).
Returns:
str: A JSON string with ``mcpo_base_url``, ``config_path``, and a
``servers`` list (names plus optional ``openapi_status``/``api_title``/
``api_version``/``openapi_error``); or an ``error`` payload
(``no_config`` / ``config_invalid_json``), or a note when the config file
is missing.
"""
cfg = _require_cfg(ctx)
if cfg is None:
return json.dumps({"error": "no_config"})
path = Path(cfg.mcpo_config_path)
if not path.exists():
return json.dumps(
{
"config_path": str(path),
"servers": [],
"note": "Config file missing; use mcpo_upsert_server or create it.",
}
)
try:
data = json.loads(path.read_text(encoding="utf-8"))
except _json.JSONDecodeError:
return json.dumps({"error": "config_invalid_json", "path": str(path)})
servers = data.get("mcpServers")
names = sorted(servers.keys()) if isinstance(servers, dict) else []
base = _mcpo_base(cfg)
out_servers: list[dict[str, Any]] = []
timeout = httpx.Timeout(15.0, connect=5.0)
headers = _auth_headers(getattr(cfg, "mcpo_api_key", ""))
if probe_openapi and base:
async with httpx.AsyncClient(timeout=timeout) as client:
for name in names:
row: dict[str, Any] = {"name": name}
url = _server_url(base, name) + "/openapi.json"
try:
r = await client.get(url, headers=headers)
row["openapi_status"] = r.status_code
if r.status_code == 200:
spec = r.json()
row["api_title"] = spec.get("info", {}).get("title")
row["api_version"] = spec.get("info", {}).get("version")
except Exception as exc:
row["openapi_error"] = str(exc)
out_servers.append(row)
else:
out_servers = [{"name": n} for n in names]
return json.dumps(
{
"mcpo_base_url": base or None,
"config_path": str(path.resolve()),
"servers": out_servers,
}
)
TOOLS = [
{
"name": "mcpo_install_runtime",
"description": (
"Install or upgrade the mcpo PyPI package in this Python environment via pip "
"(optional; for dev without Docker). Production uses the mcpo Docker Compose "
"service. Requires Python 3.11+ and UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"package_spec": {
"type": "string",
"description": "pip requirement, default mcpo>=0.0.20",
},
},
},
"handler": "handle_install_runtime",
},
{
"name": "mcpo_upsert_server",
"description": (
"Add or replace one entry in the Claude-style mcpo JSON config "
"(mcpServers). server_config_json must be a JSON object with fields "
"like command/args (stdio), or type+url+headers (sse/streamable-http). "
"Requires UNSANDBOXED_EXEC. Persists to mcpo.config_path (bind-mounted into "
"the mcpo container when using docker-compose.yml)."
),
"parameters": {
"type": "object",
"properties": {
"server_name": {
"type": "string",
"description": "Mount name / route segment (letters, digits, _, -).",
},
"server_config_json": {
"type": "string",
"description": "JSON object string for this server only.",
},
},
"required": ["server_name", "server_config_json"],
},
"handler": "handle_upsert_server",
},
{
"name": "mcpo_remove_server",
"description": (
"Remove one named server from the mcpo JSON config (mcpServers). "
"Requires UNSANDBOXED_EXEC. With Docker + --hot-reload, mcpo drops the mount "
"without restarting the container."
),
"parameters": {
"type": "object",
"properties": {
"server_name": {
"type": "string",
"description": "Same mount name used in mcpo_upsert_server.",
},
},
"required": ["server_name"],
},
"handler": "handle_remove_server",
},
{
"name": "mcpo_list_servers",
"description": (
"List MCP server names from mcpo config file. Optionally probes "
"each server's OpenAPI ( openapi.json ) for title/version."
),
"parameters": {
"type": "object",
"properties": {
"probe_openapi": {
"type": "boolean",
"description": "If true (default), GET openapi.json per server.",
},
},
},
"handler": "handle_list_servers",
},
{
"name": "mcpo_list_tools",
"description": (
"List tools exposed by one mcpo server with request (and optional 200) "
"JSON Schemas from OpenAPI. Supports truncation; use mcpo_get_tool_schema "
"if truncated. Requires mcpo.base_url in bot config."
),
"parameters": {
"type": "object",
"properties": {
"server_name": {"type": "string"},
"include_schemas": {
"type": "boolean",
"description": "Include request_schema and response_schema_200 per tool.",
},
"max_total_chars": {
"type": "integer",
"description": "Soft budget across all tools (default 120000).",
},
"max_schema_chars_per_tool": {
"type": "integer",
"description": "Max chars per request/response schema (default 24000).",
},
},
"required": ["server_name"],
},
"handler": "handle_list_tools",
},
{
"name": "mcpo_get_tool_schema",
"description": (
"Fetch full OpenAPI schema for one mcpo tool (request and optional 200 "
"response). Use when mcpo_list_tools truncated."
),
"parameters": {
"type": "object",
"properties": {
"server_name": {"type": "string"},
"tool_name": {"type": "string"},
"max_chars": {
"type": "integer",
"description": "Approx cap on serialized output (default 200000).",
},
},
"required": ["server_name", "tool_name"],
},
"handler": "handle_get_tool_schema",
},
{
"name": "mcpo_call_tool",
"description": (
"Invoke one MCP tool via mcpo HTTP POST with JSON body. Powerful: "
"restrict with tool_permissions if needed. Requires mcpo.base_url."
),
"parameters": {
"type": "object",
"properties": {
"server_name": {"type": "string"},
"tool_name": {"type": "string"},
"arguments_json": {
"type": "string",
"description": "JSON object string for the tool arguments.",
},
"timeout_seconds": {
"type": "number",
"description": "HTTP timeout (default 120).",
},
},
"required": ["server_name", "tool_name"],
},
"handler": "handle_call_tool",
},
]
for _tool in TOOLS:
_hname = _tool.pop("handler")
_tool["handler"] = globals()[_hname]