Source code for tools.mcpo_proxy_tools

"""Tools to install, configure, enumerate, document, and call MCP servers via mcpo (OpenAPI HTTP).

URLs are built only from configured ``mcpo.base_url`` plus validated path segments
(not arbitrary user URLs), so loopback/private targets are allowed.
"""

from __future__ import annotations

import asyncio
import json as _json
import logging
import re
import subprocess
import sys
from pathlib import Path
from typing import TYPE_CHECKING, Any
from urllib.parse import quote

import httpx
import jsonutil as json

from tools.alter_privileges import PRIVILEGES, has_privilege

if TYPE_CHECKING:
    from config import Config
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

_SERVER_RE = re.compile(r"^[a-zA-Z0-9_-]+$")
_TOOL_RE = re.compile(r"^[a-zA-Z0-9_-]+$")


def _json_size(obj: Any) -> int:
    """Estimate the serialized size of an object in characters.

    Serializes ``obj`` to JSON (coercing non-serializable values via ``str``)
    and returns the length of the resulting string, falling back to the length
    of ``str(obj)`` if serialization fails. Used as a cheap byte-budget proxy.

    This helper has no callers within this module or elsewhere in the repository
    (it is retained as an unused size-estimation utility).

    Args:
        obj: Any value whose JSON-serialized length is wanted.

    Returns:
        int: Approximate character length of the JSON representation.
    """
    try:
        return len(json.dumps(obj, default=str))
    except (TypeError, ValueError):
        return len(str(obj))


def _truncate_value(
    obj: Any,
    max_chars: int,
) -> tuple[Any, bool]:
    """Truncate a value to a character budget, returning it and a truncated flag.

    Serializes ``obj`` to JSON and, if it fits within ``max_chars``, returns the
    original object unchanged. If it exceeds the budget, returns a stand-in dict
    ``{"_truncated": True, "preview": <head of the JSON> + "…(truncated)"}`` so
    the caller can still surface a readable preview. A non-positive budget yields
    ``(None, True)``.

    This is called by :func:`handle_list_tools` (to cap per-tool request and
    response schemas) and by :func:`handle_get_tool_schema` (to shrink an
    oversized schema payload before returning it to the model).

    Args:
        obj: The value to size-limit (typically a JSON-Schema dict).
        max_chars: Maximum allowed serialized length in characters.

    Returns:
        tuple[Any, bool]: The (possibly replaced) value and a boolean that is
        ``True`` when truncation occurred.
    """
    if max_chars <= 0:
        return None, True
    raw = json.dumps(obj, default=str)
    if len(raw) <= max_chars:
        return obj, False
    cut = max(0, max_chars - 40)
    preview = raw[:cut] + "…(truncated)"
    return {"_truncated": True, "preview": preview}, True


def _auth_headers(api_key: str) -> dict[str, str]:
    """Build the bearer-auth headers for mcpo HTTP requests.

    Returns an ``Authorization: Bearer <key>`` header dict when a non-blank
    ``api_key`` is supplied, or an empty dict otherwise (mcpo may run without
    auth). The key is read from ``cfg.mcpo_api_key`` by the callers.

    Called by every outbound HTTP handler in this module
    (:func:`handle_list_servers`, :func:`handle_list_tools`,
    :func:`handle_get_tool_schema`, :func:`handle_call_tool`) to authenticate
    the request to the mcpo service.

    Args:
        api_key: The configured mcpo API key, possibly empty or ``None``.

    Returns:
        dict[str, str]: Header mapping with a bearer token, or empty if no key.
    """
    k = (api_key or "").strip()
    if not k:
        return {}
    return {"Authorization": f"Bearer {k}"}


def _require_cfg(ctx: ToolContext | None) -> Config | None:
    """Extract the bot :class:`Config` from a tool context, if present.

    Safely pulls ``ctx.config`` out of the per-invocation ``ToolContext``,
    returning ``None`` when the context itself or the attribute is missing so
    callers can short-circuit with a ``no_config`` error.

    Called at the top of every tool handler in this module to obtain mcpo
    settings (``mcpo_base_url``, ``mcpo_config_path``, ``mcpo_api_key``).

    Args:
        ctx: The tool invocation context, or ``None``.

    Returns:
        Config | None: The bot configuration object, or ``None`` if unavailable.
    """
    if ctx is None:
        return None
    return getattr(ctx, "config", None)


def _mcpo_base(cfg: Config | None) -> str:
    """Return the normalized mcpo base URL from config.

    Reads ``cfg.mcpo_base_url``, stripping surrounding whitespace and any
    trailing slash so it can be concatenated with path segments. Returns an
    empty string when no config or no base URL is set, which callers treat as
    ``mcpo_base_url_not_configured``.

    Called by :func:`handle_list_servers`, :func:`handle_list_tools`,
    :func:`handle_get_tool_schema`, and :func:`handle_call_tool` before building
    per-server URLs via :func:`_server_url`.

    Args:
        cfg: The bot configuration, or ``None``.

    Returns:
        str: The cleaned base URL, or ``""`` if not configured.
    """
    if cfg is None:
        return ""
    return (getattr(cfg, "mcpo_base_url", "") or "").strip().rstrip("/")


def _server_url(base: str, server_name: str) -> str:
    """Build the validated per-server mcpo URL for a given server name.

    Joins the configured ``base`` with a URL-encoded ``server_name`` only after
    confirming the name matches ``_SERVER_RE`` (letters, digits, ``_``, ``-``).
    This whitelist is the core SSRF guard: callers never pass arbitrary URLs, so
    only loopback/private mcpo routes derived from config are reachable.

    Called by :func:`handle_list_servers`, :func:`handle_list_tools`,
    :func:`handle_get_tool_schema`, and :func:`handle_call_tool`, which append
    ``/openapi.json`` or a tool path to the result.

    Args:
        base: The normalized mcpo base URL (see :func:`_mcpo_base`).
        server_name: The mount/route segment to validate and encode.

    Returns:
        str: The fully-qualified ``{base}/{server_name}`` URL.

    Raises:
        ValueError: If ``base`` is empty or ``server_name`` is invalid.
    """
    if not base:
        raise ValueError("mcpo_base_url is not configured")
    if not _SERVER_RE.match(server_name or ""):
        raise ValueError("Invalid server_name (use letters, digits, _ and -)")
    return f"{base}/{quote(server_name, safe='')}"


async def _require_unsandboxed(ctx: ToolContext | None) -> str | None:
    """Enforce the ``UNSANDBOXED_EXEC`` privilege for mutating mcpo actions.

    Verifies the calling user holds the ``UNSANDBOXED_EXEC`` privilege before a
    handler is allowed to install software or rewrite the mcpo config on disk.
    Returns ``None`` to signal "allowed"; otherwise returns a ready-to-send JSON
    error string so the handler can ``return`` it directly.

    Interacts with the privilege store via :func:`tools.alter_privileges.has_privilege`,
    passing ``ctx.redis``, ``ctx.user_id``, ``PRIVILEGES["UNSANDBOXED_EXEC"]``, and
    ``ctx.config`` (the privilege lookup is backed by Redis). No state is written.

    Called as a gate at the start of :func:`handle_install_runtime`,
    :func:`handle_upsert_server`, and :func:`handle_remove_server`.

    Args:
        ctx: The tool invocation context (must carry ``redis`` and ``config``).

    Returns:
        str | None: ``None`` when the privilege is granted; a JSON error string
        (``privilege_check_failed`` or ``forbidden``) when it is not.
    """
    if ctx is None or ctx.redis is None or ctx.config is None:
        return json.dumps(
            {
                "error": "privilege_check_failed",
                "details": "Context, Redis, or config missing",
            }
        )
    ok = await has_privilege(
        ctx.redis,
        ctx.user_id,
        PRIVILEGES["UNSANDBOXED_EXEC"],
        ctx.config,
    )
    if not ok:
        return json.dumps(
            {
                "error": "forbidden",
                "details": "Requires UNSANDBOXED_EXEC privilege",
            }
        )
    return None


def _extract_json_schema_from_content(content: dict[str, Any]) -> dict[str, Any] | None:
    """Pull the JSON Schema out of an OpenAPI ``content`` mapping.

    Scans an OpenAPI media-type ``content`` object and returns the ``schema`` of
    the ``application/json`` entry (tolerating media-type parameters such as
    ``application/json; charset=utf-8``), or ``None`` if no JSON schema is found.

    Called by :func:`_request_schema` and :func:`_response_schema_200` to dig the
    schema out of an operation's request body and response definitions.

    Args:
        content: The OpenAPI ``content`` mapping (media type -> media object).

    Returns:
        dict[str, Any] | None: The JSON Schema dict, or ``None`` if absent.
    """
    for key, block in (content or {}).items():
        if not isinstance(block, dict):
            continue
        lk = str(key).lower().split(";")[0].strip()
        if lk == "application/json":
            sch = block.get("schema")
            if isinstance(sch, dict):
                return sch
    return None


def _request_schema(op: dict[str, Any]) -> dict[str, Any] | None:
    """Extract the request-body JSON Schema from an OpenAPI operation.

    Navigates ``op["requestBody"]["content"]`` and delegates to
    :func:`_extract_json_schema_from_content` to return the ``application/json``
    request schema, or ``None`` when the operation declares no JSON body.

    Called by :func:`handle_list_tools` and :func:`handle_get_tool_schema` to
    surface each tool's input schema to the model.

    Args:
        op: An OpenAPI operation object (the ``post`` entry for a path).

    Returns:
        dict[str, Any] | None: The request JSON Schema, or ``None``.
    """
    rb = op.get("requestBody")
    if not isinstance(rb, dict):
        return None
    content = rb.get("content")
    if not isinstance(content, dict):
        return None
    return _extract_json_schema_from_content(content)


def _response_schema_200(op: dict[str, Any]) -> dict[str, Any] | None:
    """Extract a success-response JSON Schema from an OpenAPI operation.

    Inspects ``op["responses"]`` in priority order (``200``, ``201``, ``202``,
    then ``default``) and returns the first ``application/json`` schema found via
    :func:`_extract_json_schema_from_content`, or ``None`` if none is declared.

    Called by :func:`handle_list_tools` and :func:`handle_get_tool_schema` to
    include the expected response shape (``response_schema_200``) alongside the
    request schema.

    Args:
        op: An OpenAPI operation object (the ``post`` entry for a path).

    Returns:
        dict[str, Any] | None: The success-response JSON Schema, or ``None``.
    """
    responses = op.get("responses")
    if not isinstance(responses, dict):
        return None
    for code in ("200", "201", "202", "default"):
        block = responses.get(code)
        if not isinstance(block, dict):
            continue
        content = block.get("content")
        if not isinstance(content, dict):
            continue
        sch = _extract_json_schema_from_content(content)
        if sch is not None:
            return sch
    return None


[docs] async def handle_install_runtime( package_spec: str = "mcpo>=0.0.20", ctx: ToolContext | None = None, ) -> str: """Install or upgrade the ``mcpo`` PyPI package into this Python environment. Backs the ``mcpo_install_runtime`` tool. Intended for dev use without Docker (production runs mcpo as a Compose service). Gates on the ``UNSANDBOXED_EXEC`` privilege and a Python 3.11+ interpreter, then shells out to ``pip install``. Interacts with :func:`_require_unsandboxed` for the privilege gate and runs ``{sys.executable} -m pip install <package_spec>`` via ``subprocess.run`` on a worker thread (``asyncio.to_thread``) with a 600s timeout — a real side effect that mutates the local environment. After install it reads the resolved version through ``importlib.metadata.version("mcpo")``. No Redis/KG/event-bus interaction. Called indirectly by the tool registry (registered in ``TOOLS`` as ``mcpo_install_runtime`` and dispatched by ``tool_loader.py``); also called directly in ``tests/test_mcpo_proxy_tools.py``. Args: package_spec: A pip requirement string (default ``"mcpo>=0.0.20"``). ctx: The tool invocation context, used for the privilege check. Returns: str: A JSON string. On the privilege/version gates, an ``error`` payload; otherwise ``{"ok", "exit_code", "mcpo_version", "stdout_tail", "stderr_tail"}`` (output tails capped at 4000 chars). pip timeout or failure yields ``{"error": "pip_timeout"}`` / ``{"error": "pip_failed"}``. """ err = await _require_unsandboxed(ctx) if err: return err if sys.version_info < (3, 11): return json.dumps( { "error": "python_version", "details": "mcpo requires Python 3.11+", "python": sys.version, } ) def _run() -> tuple[int, str, str]: """Run the blocking ``pip install`` subprocess and capture its output. Closure over the enclosing ``package_spec``; executes ``{sys.executable} -m pip install <package_spec>`` synchronously with a 600-second timeout and captured text streams. Offloaded to a thread by the enclosing :func:`handle_install_runtime` via ``asyncio.to_thread``. Returns: tuple[int, str, str]: The process exit code, stdout, and stderr. Raises: subprocess.TimeoutExpired: If pip exceeds the 600-second timeout (caught and reported by the caller). """ r = subprocess.run( [sys.executable, "-m", "pip", "install", package_spec], capture_output=True, text=True, timeout=600, ) return r.returncode, r.stdout or "", r.stderr or "" try: code, out, err = await asyncio.to_thread(_run) except subprocess.TimeoutExpired: return json.dumps({"error": "pip_timeout"}) except Exception as exc: logger.exception("mcpo install failed") return json.dumps({"error": "pip_failed", "details": str(exc)}) ver = "" try: from importlib.metadata import version as pkg_version ver = pkg_version("mcpo") except Exception: pass return json.dumps( { "ok": code == 0, "exit_code": code, "mcpo_version": ver or None, "stdout_tail": (out or "")[-4000:], "stderr_tail": (err or "")[-4000:], } )
[docs] async def handle_upsert_server( server_name: str, server_config_json: str, ctx: ToolContext | None = None, ) -> str: """Add or replace one server entry in the Claude-style mcpo JSON config. Backs the ``mcpo_upsert_server`` tool. Parses ``server_config_json`` (a JSON object describing a single MCP server, e.g. ``command``/``args`` for stdio or ``type``/``url``/``headers`` for sse/streamable-http) and writes it under the ``mcpServers`` map in the on-disk mcpo config, creating the file if needed. Gated by :func:`_require_unsandboxed` (``UNSANDBOXED_EXEC``); reads config via :func:`_require_cfg` and validates ``server_name`` against ``_SERVER_RE``. The side effect is filesystem: it creates ``cfg.mcpo_config_path`` (and parent dirs), merges the new entry into any existing ``mcpServers``, and rewrites the file as pretty-printed JSON. That path is bind-mounted into the mcpo Docker container, so with ``--hot-reload`` the change applies without a restart. No Redis/KG/event-bus writes. Called indirectly through the tool registry (``mcpo_upsert_server`` in ``TOOLS``); also exercised directly by ``tests/test_mcpo_proxy_tools.py``. Args: server_name: Mount/route segment (validated by ``_SERVER_RE``). server_config_json: JSON object string for this one server's config. ctx: The tool invocation context (privilege check and config source). Returns: str: A JSON string — ``{"ok": True, "path", "server", "hint"}`` on success, or an ``error`` payload (``forbidden``, ``no_config``, ``invalid_server_name``, ``invalid_json``, ``server_config_must_be_object``, or ``existing_config_invalid_json``). """ err = await _require_unsandboxed(ctx) if err: return err cfg = _require_cfg(ctx) if cfg is None: return json.dumps({"error": "no_config"}) if not _SERVER_RE.match(server_name or ""): return json.dumps({"error": "invalid_server_name"}) try: entry = json.loads(server_config_json) except _json.JSONDecodeError as e: return json.dumps({"error": "invalid_json", "details": str(e)}) if not isinstance(entry, dict) or not entry: return json.dumps({"error": "server_config_must_be_object"}) path = Path(cfg.mcpo_config_path) path.parent.mkdir(parents=True, exist_ok=True) if path.exists(): try: data = json.loads(path.read_text(encoding="utf-8")) except _json.JSONDecodeError: return json.dumps({"error": "existing_config_invalid_json"}) else: data = {} servers = data.get("mcpServers") if not isinstance(servers, dict): servers = {} servers[server_name] = entry data["mcpServers"] = servers path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8") return json.dumps( { "ok": True, "path": str(path.resolve()), "server": server_name, "hint": ( "Docker Compose mcpo uses --hot-reload; edits apply without restart. " "Otherwise restart the mcpo container." ), } )
[docs] async def handle_remove_server( server_name: str, ctx: ToolContext | None = None, ) -> str: """Remove one named server from the mcpo JSON config. Backs the ``mcpo_remove_server`` tool. Loads the on-disk mcpo config, deletes the named entry from its ``mcpServers`` map, and rewrites the file. With Docker ``--hot-reload``, mcpo drops the corresponding mount automatically. Gated by :func:`_require_unsandboxed` (``UNSANDBOXED_EXEC``); config comes from :func:`_require_cfg` and the name is validated against ``_SERVER_RE``. The side effect is filesystem: it reads and rewrites ``cfg.mcpo_config_path`` as pretty-printed JSON. No Redis/KG/event-bus writes. Called indirectly via the tool registry (``mcpo_remove_server`` in ``TOOLS``); also called directly in ``tests/test_mcpo_proxy_tools.py``. Args: server_name: The mount name to remove (validated by ``_SERVER_RE``). ctx: The tool invocation context (privilege check and config source). Returns: str: A JSON string — ``{"ok": True, "removed", "path", "hint"}`` on success, or an ``error``/``ok: False`` payload (``forbidden``, ``no_config``, ``invalid_server_name``, ``config_missing``, ``config_invalid_json``, ``no_mcp_servers_in_config``, or ``server_not_found``). """ err = await _require_unsandboxed(ctx) if err: return err cfg = _require_cfg(ctx) if cfg is None: return json.dumps({"error": "no_config"}) if not _SERVER_RE.match(server_name or ""): return json.dumps({"error": "invalid_server_name"}) path = Path(cfg.mcpo_config_path) if not path.exists(): return json.dumps( { "ok": False, "error": "config_missing", "path": str(path), } ) try: data = json.loads(path.read_text(encoding="utf-8")) except _json.JSONDecodeError: return json.dumps({"error": "config_invalid_json", "path": str(path)}) servers = data.get("mcpServers") if not isinstance(servers, dict): return json.dumps({"error": "no_mcp_servers_in_config"}) if server_name not in servers: return json.dumps( { "ok": False, "error": "server_not_found", "server": server_name, } ) del servers[server_name] data["mcpServers"] = servers path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8") return json.dumps( { "ok": True, "removed": server_name, "path": str(path.resolve()), "hint": ( "With Docker Compose + --hot-reload, mcpo unmounts the server automatically." ), } )
[docs] async def handle_list_servers( probe_openapi: bool = True, ctx: ToolContext | None = None, ) -> str: """List MCP server names from the mcpo config, optionally probing OpenAPI. Backs the ``mcpo_list_servers`` tool. Reads the ``mcpServers`` keys from the on-disk mcpo config and returns them sorted. When ``probe_openapi`` is true and a base URL is configured, it additionally GETs each server's ``openapi.json`` to report its API title/version and HTTP status. Reads ``cfg.mcpo_config_path`` (via :func:`_require_cfg`) and resolves the base URL with :func:`_mcpo_base`. The OpenAPI probe issues authenticated HTTP GETs (headers from :func:`_auth_headers`, URLs from :func:`_server_url`) through ``httpx.AsyncClient`` with a 15s/5s connect timeout; per-server failures are captured as ``openapi_error`` rather than raised. No Redis/KG/event-bus interaction. Called indirectly via the tool registry (``mcpo_list_servers`` in ``TOOLS``); no direct internal callers outside the test suite. Args: probe_openapi: When ``True`` (default), fetch each server's ``openapi.json`` for title/version metadata. ctx: The tool invocation context (config source). Returns: str: A JSON string with ``mcpo_base_url``, ``config_path``, and a ``servers`` list (names plus optional ``openapi_status``/``api_title``/ ``api_version``/``openapi_error``); or an ``error`` payload (``no_config`` / ``config_invalid_json``), or a note when the config file is missing. """ cfg = _require_cfg(ctx) if cfg is None: return json.dumps({"error": "no_config"}) path = Path(cfg.mcpo_config_path) if not path.exists(): return json.dumps( { "config_path": str(path), "servers": [], "note": "Config file missing; use mcpo_upsert_server or create it.", } ) try: data = json.loads(path.read_text(encoding="utf-8")) except _json.JSONDecodeError: return json.dumps({"error": "config_invalid_json", "path": str(path)}) servers = data.get("mcpServers") names = sorted(servers.keys()) if isinstance(servers, dict) else [] base = _mcpo_base(cfg) out_servers: list[dict[str, Any]] = [] timeout = httpx.Timeout(15.0, connect=5.0) headers = _auth_headers(getattr(cfg, "mcpo_api_key", "")) if probe_openapi and base: async with httpx.AsyncClient(timeout=timeout) as client: for name in names: row: dict[str, Any] = {"name": name} url = _server_url(base, name) + "/openapi.json" try: r = await client.get(url, headers=headers) row["openapi_status"] = r.status_code if r.status_code == 200: spec = r.json() row["api_title"] = spec.get("info", {}).get("title") row["api_version"] = spec.get("info", {}).get("version") except Exception as exc: row["openapi_error"] = str(exc) out_servers.append(row) else: out_servers = [{"name": n} for n in names] return json.dumps( { "mcpo_base_url": base or None, "config_path": str(path.resolve()), "servers": out_servers, } )
[docs] async def handle_list_tools( server_name: str, include_schemas: bool = True, max_total_chars: int = 120_000, max_schema_chars_per_tool: int = 24_000, ctx: ToolContext | None = None, ) -> str: """List the tools exposed by one mcpo server, with their JSON Schemas. Backs the ``mcpo_list_tools`` tool. Fetches the server's ``openapi.json`` and derives one entry per ``POST`` path (the mcpo tool-call convention), each with its name, summary/description, and optionally the request and ``200`` response JSON Schemas. Applies a two-level character budget: per-tool schemas are capped via :func:`_truncate_value`, and if the whole payload still exceeds ``max_total_chars`` it drops schemas from the tail until it fits. Resolves config/base URL via :func:`_require_cfg` / :func:`_mcpo_base`, builds the URL with :func:`_server_url`, and issues an authenticated ``httpx`` GET (headers from :func:`_auth_headers`, 30s/8s connect timeout). Schemas are extracted with :func:`_request_schema` and :func:`_response_schema_200`. The docs/openapi.json/redoc paths are skipped. No Redis/KG/event-bus interaction. Called indirectly via the tool registry (``mcpo_list_tools`` in ``TOOLS``); also called directly in ``tests/test_mcpo_proxy_tools.py``. Args: server_name: The mcpo server whose tools to enumerate. include_schemas: When ``True`` (default), include request and ``200`` response schemas per tool. max_total_chars: Soft budget (default 120000) across the whole payload; schemas are progressively dropped to stay under it. max_schema_chars_per_tool: Per-schema character cap (default 24000); the response schema is additionally capped at 8000. ctx: The tool invocation context (config source). Returns: str: A JSON string with ``server``, a ``tools`` list, a ``truncated`` flag, and a ``hint_truncated`` message when anything was dropped; or an ``error`` payload (``no_config``, ``mcpo_base_url_not_configured``, ``bad_request``, ``http_error``, ``openapi_fetch_failed``, ``openapi_not_json``, or ``no_paths_in_openapi``). """ cfg = _require_cfg(ctx) if cfg is None: return json.dumps({"error": "no_config"}) base = _mcpo_base(cfg) if not base: return json.dumps({"error": "mcpo_base_url_not_configured"}) try: url = _server_url(base, server_name) + "/openapi.json" except ValueError as e: return json.dumps({"error": "bad_request", "details": str(e)}) headers = _auth_headers(getattr(cfg, "mcpo_api_key", "")) timeout = httpx.Timeout(30.0, connect=8.0) try: async with httpx.AsyncClient(timeout=timeout) as client: r = await client.get(url, headers=headers) except httpx.RequestError as exc: return json.dumps({"error": "http_error", "details": str(exc)}) if r.status_code != 200: return json.dumps( { "error": "openapi_fetch_failed", "status": r.status_code, "body_preview": (r.text or "")[:2000], } ) try: whole: dict[str, Any] = r.json() except _json.JSONDecodeError: return json.dumps({"error": "openapi_not_json"}) paths = whole.get("paths") if not isinstance(paths, dict): return json.dumps({"error": "no_paths_in_openapi"}) tools_out: list[dict[str, Any]] = [] per_cap = max(0, int(max_schema_chars_per_tool)) resp_cap = min(per_cap, 8000) if per_cap else 0 for path_key, item in sorted(paths.items()): if not isinstance(path_key, str) or not path_key.startswith("/"): continue if not isinstance(item, dict): continue op = item.get("post") if not isinstance(op, dict): continue sk = path_key.strip("/") if sk in ("docs", "openapi.json", "redoc"): continue name = sk.split("/")[-1] if sk else "" if not name: continue entry: dict[str, Any] = { "tool_name": name, "path": path_key, "summary": op.get("summary"), "description": op.get("description"), } if include_schemas and per_cap > 0: req = _request_schema(op) resp = _response_schema_200(op) t1 = t2 = False if req is not None: req_out, t1 = _truncate_value(req, per_cap) entry["request_schema"] = req_out if resp is not None and resp_cap > 0: resp_out, t2 = _truncate_value(resp, resp_cap) entry["response_schema_200"] = resp_out entry["schema_truncated"] = bool(t1 or t2) elif include_schemas: entry["schema_truncated"] = False tools_out.append(entry) truncated_global = False max_tot = max(0, int(max_total_chars)) if max_tot > 0 and tools_out: payload = {"server": server_name, "tools": tools_out} while len(json.dumps(payload, default=str)) > max_tot: stripped = False for t in reversed(tools_out): if t.pop("request_schema", None) is not None: t["schema_truncated"] = True stripped = True break if t.pop("response_schema_200", None) is not None: t["schema_truncated"] = True stripped = True break if not stripped: truncated_global = True break truncated_global = True payload = {"server": server_name, "tools": tools_out} any_schema_trunc = any(t.get("schema_truncated") for t in tools_out) return json.dumps( { "server": server_name, "tools": tools_out, "truncated": truncated_global or any_schema_trunc, "hint_truncated": ( "Schemas truncated or dropped for size; use mcpo_get_tool_schema per tool." if truncated_global or any_schema_trunc else None ), } )
[docs] async def handle_get_tool_schema( server_name: str, tool_name: str, max_chars: int = 200_000, ctx: ToolContext | None = None, ) -> str: """Fetch the full OpenAPI schema for one specific mcpo tool. Backs the ``mcpo_get_tool_schema`` tool — the per-tool fallback when :func:`handle_list_tools` truncated. Fetches the server's ``openapi.json``, locates the operation matching ``tool_name`` (trying ``/tool_name``, the URL-encoded variant, then the last path segment of each path), and returns its summary, description, request schema, and ``200`` response schema. If the serialized result exceeds ``max_chars`` it re-truncates the schemas (request at ``cap/2``, response at ``cap/4``) via :func:`_truncate_value`. Resolves config/base URL via :func:`_require_cfg` / :func:`_mcpo_base`, validates both names against ``_SERVER_RE`` / ``_TOOL_RE``, builds the URL with :func:`_server_url`, and issues an authenticated ``httpx`` GET (headers from :func:`_auth_headers`, 30s/8s connect timeout). Schemas come from :func:`_request_schema` and :func:`_response_schema_200`. No Redis/KG/event-bus interaction. Called indirectly via the tool registry (``mcpo_get_tool_schema`` in ``TOOLS``); no direct internal callers found. Args: server_name: The mcpo server hosting the tool (validated). tool_name: The tool/operation to fetch (validated). max_chars: Approximate cap on the serialized output (default 200000; floored at 1000). ctx: The tool invocation context (config source). Returns: str: A JSON string with ``server``, ``tool``, ``summary``, ``description``, ``request_schema``, ``response_schema_200`` (and a ``truncated`` flag if shrunk); or an ``error`` payload (``no_config``, ``mcpo_base_url_not_configured``, ``invalid_name``, ``bad_request``, ``http_error``, ``openapi_fetch_failed``, ``openapi_not_json``, ``no_paths``, ``tool_not_found``, or ``no_post_for_tool``). """ cfg = _require_cfg(ctx) if cfg is None: return json.dumps({"error": "no_config"}) base = _mcpo_base(cfg) if not base: return json.dumps({"error": "mcpo_base_url_not_configured"}) if not _SERVER_RE.match(server_name or "") or not _TOOL_RE.match( tool_name or "", ): return json.dumps({"error": "invalid_name"}) try: url = _server_url(base, server_name) + "/openapi.json" except ValueError as e: return json.dumps({"error": "bad_request", "details": str(e)}) cap = max(1000, int(max_chars)) headers = _auth_headers(getattr(cfg, "mcpo_api_key", "")) timeout = httpx.Timeout(30.0, connect=8.0) try: async with httpx.AsyncClient(timeout=timeout) as client: r = await client.get(url, headers=headers) except httpx.RequestError as exc: return json.dumps({"error": "http_error", "details": str(exc)}) if r.status_code != 200: return json.dumps({"error": "openapi_fetch_failed", "status": r.status_code}) try: whole = r.json() except _json.JSONDecodeError: return json.dumps({"error": "openapi_not_json"}) paths = whole.get("paths") if not isinstance(paths, dict): return json.dumps({"error": "no_paths"}) candidates = [ f"/{tool_name}", f"/{quote(tool_name, safe='')}", ] item = None for ck in candidates: if ck in paths: item = paths[ck] break if item is None: for pk, val in paths.items(): if not isinstance(pk, str) or not isinstance(val, dict): continue sk = pk.strip("/").split("/")[-1] if sk == tool_name: item = val break if not isinstance(item, dict): return json.dumps({"error": "tool_not_found", "tool": tool_name}) op = item.get("post") if not isinstance(op, dict): return json.dumps({"error": "no_post_for_tool", "tool": tool_name}) req = _request_schema(op) resp = _response_schema_200(op) payload = { "server": server_name, "tool": tool_name, "summary": op.get("summary"), "description": op.get("description"), "request_schema": req, "response_schema_200": resp, } raw = json.dumps(payload, default=str) truncated = len(raw) > cap if truncated: req2, _ = _truncate_value(req, cap // 2) if req else (None, False) resp2, _ = _truncate_value(resp, cap // 4) if resp else (None, False) payload = { "server": server_name, "tool": tool_name, "summary": op.get("summary"), "description": op.get("description"), "request_schema": req2, "response_schema_200": resp2, "truncated": True, } return json.dumps(payload, default=str)
[docs] async def handle_call_tool( server_name: str, tool_name: str, arguments_json: str = "{}", timeout_seconds: float = 120.0, ctx: ToolContext | None = None, ) -> str: """Invoke one MCP tool through mcpo via an HTTP ``POST`` with a JSON body. Backs the ``mcpo_call_tool`` tool — the actual execution path. Parses ``arguments_json`` into the request body and POSTs it to the tool's mcpo endpoint, returning the upstream status code and (parsed JSON or text-capped) body. This is powerful and unrestricted by design; callers should constrain it with tool permissions where appropriate. Resolves config/base URL via :func:`_require_cfg` / :func:`_mcpo_base`, validates both names against ``_SERVER_RE`` / ``_TOOL_RE``, builds the tool URL with :func:`_server_url`, and POSTs through ``httpx.AsyncClient`` with bearer auth from :func:`_auth_headers`, a JSON content type, and a caller-supplied total timeout (10s connect). Whatever side effects occur live in the downstream MCP server; this function itself does no Redis/KG/event-bus work. Called indirectly via the tool registry (``mcpo_call_tool`` in ``TOOLS``); no direct internal callers found. Args: server_name: The mcpo server hosting the tool (validated). tool_name: The tool/operation to invoke (validated). arguments_json: JSON object string of tool arguments (default ``"{}"``). timeout_seconds: Total HTTP timeout for the call (default 120.0). ctx: The tool invocation context (config source). Returns: str: A JSON string ``{"status_code", "body"}`` where ``body`` is the parsed JSON response or the raw text capped at 500000 chars; or an ``error`` payload (``no_config``, ``mcpo_base_url_not_configured``, ``invalid_name``, ``invalid_arguments_json``, ``arguments_must_be_object``, ``bad_request``, or ``http_error``). """ cfg = _require_cfg(ctx) if cfg is None: return json.dumps({"error": "no_config"}) base = _mcpo_base(cfg) if not base: return json.dumps({"error": "mcpo_base_url_not_configured"}) if not _SERVER_RE.match(server_name or "") or not _TOOL_RE.match( tool_name or "", ): return json.dumps({"error": "invalid_name"}) try: body = json.loads(arguments_json or "{}") except _json.JSONDecodeError as e: return json.dumps({"error": "invalid_arguments_json", "details": str(e)}) if not isinstance(body, dict): return json.dumps({"error": "arguments_must_be_object"}) try: surl = _server_url(base, server_name) + "/" + quote(tool_name, safe="") except ValueError as e: return json.dumps({"error": "bad_request", "details": str(e)}) headers = _auth_headers(getattr(cfg, "mcpo_api_key", "")) headers["Content-Type"] = "application/json" timeout = httpx.Timeout(float(timeout_seconds), connect=10.0) try: async with httpx.AsyncClient(timeout=timeout) as client: r = await client.post(surl, headers=headers, json=body) except httpx.RequestError as exc: return json.dumps({"error": "http_error", "details": str(exc)}) ct = (r.headers.get("content-type") or "").lower() text = r.text or "" parsed: Any = None if "json" in ct: try: parsed = r.json() except _json.JSONDecodeError: parsed = None return json.dumps( { "status_code": r.status_code, "body": parsed if parsed is not None else text[:500_000], } )
TOOLS = [ { "name": "mcpo_install_runtime", "description": ( "Install or upgrade the mcpo PyPI package in this Python environment via pip " "(optional; for dev without Docker). Production uses the mcpo Docker Compose " "service. Requires Python 3.11+ and UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "package_spec": { "type": "string", "description": "pip requirement, default mcpo>=0.0.20", }, }, }, "handler": "handle_install_runtime", }, { "name": "mcpo_upsert_server", "description": ( "Add or replace one entry in the Claude-style mcpo JSON config " "(mcpServers). server_config_json must be a JSON object with fields " "like command/args (stdio), or type+url+headers (sse/streamable-http). " "Requires UNSANDBOXED_EXEC. Persists to mcpo.config_path (bind-mounted into " "the mcpo container when using docker-compose.yml)." ), "parameters": { "type": "object", "properties": { "server_name": { "type": "string", "description": "Mount name / route segment (letters, digits, _, -).", }, "server_config_json": { "type": "string", "description": "JSON object string for this server only.", }, }, "required": ["server_name", "server_config_json"], }, "handler": "handle_upsert_server", }, { "name": "mcpo_remove_server", "description": ( "Remove one named server from the mcpo JSON config (mcpServers). " "Requires UNSANDBOXED_EXEC. With Docker + --hot-reload, mcpo drops the mount " "without restarting the container." ), "parameters": { "type": "object", "properties": { "server_name": { "type": "string", "description": "Same mount name used in mcpo_upsert_server.", }, }, "required": ["server_name"], }, "handler": "handle_remove_server", }, { "name": "mcpo_list_servers", "description": ( "List MCP server names from mcpo config file. Optionally probes " "each server's OpenAPI ( openapi.json ) for title/version." ), "parameters": { "type": "object", "properties": { "probe_openapi": { "type": "boolean", "description": "If true (default), GET openapi.json per server.", }, }, }, "handler": "handle_list_servers", }, { "name": "mcpo_list_tools", "description": ( "List tools exposed by one mcpo server with request (and optional 200) " "JSON Schemas from OpenAPI. Supports truncation; use mcpo_get_tool_schema " "if truncated. Requires mcpo.base_url in bot config." ), "parameters": { "type": "object", "properties": { "server_name": {"type": "string"}, "include_schemas": { "type": "boolean", "description": "Include request_schema and response_schema_200 per tool.", }, "max_total_chars": { "type": "integer", "description": "Soft budget across all tools (default 120000).", }, "max_schema_chars_per_tool": { "type": "integer", "description": "Max chars per request/response schema (default 24000).", }, }, "required": ["server_name"], }, "handler": "handle_list_tools", }, { "name": "mcpo_get_tool_schema", "description": ( "Fetch full OpenAPI schema for one mcpo tool (request and optional 200 " "response). Use when mcpo_list_tools truncated." ), "parameters": { "type": "object", "properties": { "server_name": {"type": "string"}, "tool_name": {"type": "string"}, "max_chars": { "type": "integer", "description": "Approx cap on serialized output (default 200000).", }, }, "required": ["server_name", "tool_name"], }, "handler": "handle_get_tool_schema", }, { "name": "mcpo_call_tool", "description": ( "Invoke one MCP tool via mcpo HTTP POST with JSON body. Powerful: " "restrict with tool_permissions if needed. Requires mcpo.base_url." ), "parameters": { "type": "object", "properties": { "server_name": {"type": "string"}, "tool_name": {"type": "string"}, "arguments_json": { "type": "string", "description": "JSON object string for the tool arguments.", }, "timeout_seconds": { "type": "number", "description": "HTTP timeout (default 120).", }, }, "required": ["server_name", "tool_name"], }, "handler": "handle_call_tool", }, ] for _tool in TOOLS: _hname = _tool.pop("handler") _tool["handler"] = globals()[_hname]