Source code for tools.vmware_tools

"""VMware ESXi / vCenter management via pyVmomi (vSphere API).

Stateless: connect per call in a worker thread, disconnect in ``finally``.
Requires UNSANDBOXED_EXEC. TLS verification off by default for self-signed
infrastructure certs (trusted networks only).

Connection profiles: encrypted per-user JSON (host, user, password, port, verify_ssl).
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
import ssl
from typing import Any

from tools._credential_profile_store import (
    delete_profile as _cred_delete,
    list_profile_names as _cred_list,
    load_profile as _cred_load,
    merge_profile as _cred_merge,
    save_profile as _cred_save,
)

logger = logging.getLogger(__name__)

_MAX_JSON_CHARS = 500_000
CRED_PREFIX = "vmware"

_VALID_ACTIONS = frozenset(
    {
        "list_vms",
        "list_hosts",
        "list_datastores",
        "vm_power_on",
        "vm_power_off",
        "vm_reset",
        "vm_suspend",
        "vm_resume",
        "snapshot_create",
        "snapshot_remove",
        "snapshot_revert",
        "vm_destroy",
    }
)


async def _check_priv(ctx: Any) -> str | None:
    """Verify the calling user holds the ``UNSANDBOXED_EXEC`` privilege.

    Acts as the authorization gate for every public handler in this module,
    since talking to vCenter/ESXi grants real control over infrastructure.
    Returns ``None`` to mean "allowed"; any non-``None`` return is a ready-made
    JSON error string the caller should hand straight back to the LLM.

    It lazily imports ``tools.alter_privileges`` and calls ``has_privilege`` with
    the user's Redis-backed privilege set, reading ``ctx.redis``, ``ctx.config``,
    and ``ctx.user_id`` off the ToolContext; it performs no writes. Called by
    :func:`_vmware_save_credentials`, :func:`_vmware_list_credentials`,
    :func:`_vmware_delete_credentials`, and :func:`run` at the top of each, before
    any credential or vSphere operation runs.

    Args:
        ctx: The ToolContext for the current invocation. ``ctx.redis``,
            ``ctx.config``, and ``ctx.user_id`` are read to resolve privileges.

    Returns:
        ``None`` if the user has the privilege; otherwise a JSON string with
        ``success: False`` and an explanatory ``error`` (privilege missing, or
        the privilege system being unavailable).
    """
    try:
        from tools.alter_privileges import PRIVILEGES, has_privilege

        redis = getattr(ctx, "redis", None)
        config = getattr(ctx, "config", None)
        user_id = getattr(ctx, "user_id", "") or ""
        if not await has_privilege(
            redis,
            user_id,
            PRIVILEGES["UNSANDBOXED_EXEC"],
            config,
        ):
            return json.dumps(
                {
                    "success": False,
                    "error": (
                        "The user does not have the UNSANDBOXED_EXEC privilege. "
                        "Ask an admin to grant it with the alter_privileges tool."
                    ),
                }
            )
    except ImportError:
        return json.dumps({"success": False, "error": "Privilege system unavailable."})
    return None


def _ssl_context(verify_ssl: bool) -> Any:
    """Build the ``ssl.SSLContext`` used for the vSphere connection.

    Returns a normally-verifying context when ``verify_ssl`` is true, or an
    unverified context otherwise so that self-signed infrastructure certificates
    (the default for ESXi/vCenter on trusted networks) are accepted.

    Wraps the stdlib ``ssl`` module only and has no side effects. Called by
    :func:`_vmware_dispatch` to obtain the context it passes to ``SmartConnect``.

    Args:
        verify_ssl: When true, validate the server certificate chain; when
            false, disable certificate verification.

    Returns:
        An ``ssl.SSLContext`` configured for verified or unverified TLS.
    """
    if verify_ssl:
        return ssl.create_default_context()
    return ssl._create_unverified_context()


def _find_vm(content: Any, vm_uuid: str | None, vm_name: str | None) -> Any:
    """Resolve a single VM managed object from a UUID or display name.

    Prefers the instance UUID, which is unique, looking it up via the vSphere
    ``searchIndex.FindByUuid`` (instance-UUID mode). Only when no UUID is given
    does it fall back to a linear scan of all VMs by display name using a
    container view, which it always tears down via ``DestroyView``.

    Operates purely against the live vSphere ``content`` object (the
    ServiceInstance content retrieved by :func:`_vmware_dispatch`) and lazily
    imports ``pyVmomi.vim`` for the view type filter; it performs no Redis or
    network side effects beyond the vSphere API calls. Called by
    :func:`_vmware_dispatch` for every per-VM action (power, snapshot, destroy)
    after the connection is established.

    Args:
        content: The vSphere ServiceInstance content object exposing
            ``searchIndex``, ``viewManager``, and ``rootFolder``.
        vm_uuid: VM instance UUID to look up; takes precedence when non-empty.
        vm_name: VM display name to match when no UUID is supplied.

    Returns:
        The matching ``vim.VirtualMachine`` managed object.

    Raises:
        ValueError: If no VM matches the given UUID, no VM matches the given
            name, or neither ``vm_uuid`` nor ``vm_name`` was provided.
    """
    from pyVmomi import vim

    if vm_uuid and str(vm_uuid).strip():
        u = str(vm_uuid).strip()
        vm = content.searchIndex.FindByUuid(None, u, True, True)
        if vm is None:
            raise ValueError(f"No VM with instance UUID '{u}'.")
        return vm
    if vm_name and str(vm_name).strip():
        name = str(vm_name).strip()
        view = content.viewManager.CreateContainerView(
            content.rootFolder,
            [vim.VirtualMachine],
            True,
        )
        try:
            for vm in view.view:
                if vm.name == name:
                    return vm
        finally:
            view.DestroyView()
        raise ValueError(f"No VM with name '{name}'.")
    raise ValueError("vm_uuid or vm_name is required.")


def _vmware_dispatch(
    host: str,
    user: str,
    password: str,
    port: int,
    verify_ssl: bool,
    action: str,
    vm_uuid: str | None,
    vm_name: str | None,
    snapshot_name: str | None,
) -> dict[str, Any]:
    """Open a vSphere session, perform one action, and disconnect.

    This is the synchronous, blocking core of the tool: it connects with
    ``SmartConnect``, retrieves content, then dispatches on ``action`` to list
    inventory (VMs/hosts/datastores via container views) or to operate on a
    single VM resolved by :func:`_find_vm` (power on/off/reset/suspend/resume,
    snapshot create/remove/revert, or destroy). Long-running operations are
    submitted as vSphere Tasks and awaited with ``WaitForTask``. The session is
    always closed in a ``finally`` block via ``Disconnect``, keeping the tool
    stateless across calls.

    It lazily imports ``pyVim``/``pyVmomi``, builds its TLS context through
    :func:`_ssl_context`, and logs failures through the module ``logger`` with
    ``logger.exception``; it returns errors as data rather than raising. It runs
    off the event loop because :func:`run` invokes it via
    ``asyncio.to_thread``; there are no other internal callers.

    Args:
        host: vCenter or ESXi hostname or IP to connect to.
        user: vSphere login (e.g. ``administrator@vsphere.local`` or ``root``).
        password: Password for ``user``.
        port: TCP port for the vSphere API (typically 443).
        verify_ssl: Whether to verify the server TLS certificate.
        action: One of the supported actions (see ``_VALID_ACTIONS``).
        vm_uuid: VM instance UUID for per-VM actions; preferred over name.
        vm_name: VM display name for per-VM actions when no UUID is given.
        snapshot_name: Snapshot name, required for the ``snapshot_*`` actions.

    Returns:
        A result dict with ``success`` plus action-specific keys: ``vms``,
        ``hosts``, or ``datastores`` (with ``count``) for listings;
        ``snapshot_name`` for snapshot actions; or an ``error`` string on
        failure (unsupported action, missing snapshot, VM not powered off for
        destroy, pyvmomi not installed, or any caught exception).
    """
    try:
        from pyVim.connect import Disconnect, SmartConnect
        from pyVim.task import WaitForTask
        from pyVmomi import vim
    except ImportError:
        return {
            "success": False,
            "error": "pyvmomi is not installed. pip install pyvmomi",
        }

    ctx_ssl = _ssl_context(verify_ssl)
    si = None
    try:
        si = SmartConnect(
            host=host.strip(),
            user=user.strip(),
            pwd=str(password),
            port=int(port),
            sslContext=ctx_ssl,
        )
        content = si.RetrieveContent()
        act = (action or "").strip()

        if act == "list_vms":
            view = content.viewManager.CreateContainerView(
                content.rootFolder,
                [vim.VirtualMachine],
                True,
            )
            rows: list[dict[str, Any]] = []
            try:
                for vm in view.view:
                    rows.append(
                        {
                            "name": vm.name,
                            "instance_uuid": getattr(vm, "instanceUuid", None),
                            "bios_uuid": getattr(vm, "uuid", None),
                            "power_state": str(vm.runtime.powerState),
                        }
                    )
            finally:
                view.DestroyView()
            return {"success": True, "action": act, "vms": rows, "count": len(rows)}

        if act == "list_hosts":
            view = content.viewManager.CreateContainerView(
                content.rootFolder,
                [vim.HostSystem],
                True,
            )
            rows = []
            try:
                for h in view.view:
                    rows.append(
                        {
                            "name": h.name,
                            "connection_state": str(h.runtime.connectionState),
                        }
                    )
            finally:
                view.DestroyView()
            return {"success": True, "action": act, "hosts": rows, "count": len(rows)}

        if act == "list_datastores":
            view = content.viewManager.CreateContainerView(
                content.rootFolder,
                [vim.Datastore],
                True,
            )
            rows = []
            try:
                for ds in view.view:
                    rows.append(
                        {
                            "name": ds.name,
                            "type": ds.summary.type,
                            "url": getattr(ds.summary, "url", None),
                        }
                    )
            finally:
                view.DestroyView()
            return {
                "success": True,
                "action": act,
                "datastores": rows,
                "count": len(rows),
            }

        vm = _find_vm(content, vm_uuid, vm_name)

        if act == "vm_power_on":
            if vm.runtime.powerState == vim.VirtualMachinePowerState.poweredOn:
                return {
                    "success": True,
                    "action": act,
                    "message": "Already powered on.",
                }
            t = vm.PowerOnVM_Task(None)
            WaitForTask(t)
            return {"success": True, "action": act}

        if act == "vm_power_off":
            t = vm.PowerOffVM_Task()
            WaitForTask(t)
            return {"success": True, "action": act}

        if act == "vm_reset":
            t = vm.ResetVM_Task()
            WaitForTask(t)
            return {"success": True, "action": act}

        if act == "vm_suspend":
            t = vm.SuspendVM_Task()
            WaitForTask(t)
            return {"success": True, "action": act}

        if act == "vm_resume":
            t = vm.PowerOnVM_Task(None)
            WaitForTask(t)
            return {"success": True, "action": act}

        if act == "snapshot_create":
            snap = (snapshot_name or "").strip()
            if not snap:
                return {"success": False, "error": "snapshot_name is required."}
            t = vm.CreateSnapshot_Task(
                name=snap,
                memory=False,
                quiesce=False,
            )
            WaitForTask(t)
            return {"success": True, "action": act, "snapshot_name": snap}

        if act == "snapshot_remove":
            snap = (snapshot_name or "").strip()
            if not snap:
                return {"success": False, "error": "snapshot_name is required."}
            snaps = vm.snapshot.rootSnapshotList if vm.snapshot else []
            found = None

            def walk(sn: Any) -> None:
                """Recursively search the snapshot tree for ``snap`` by name.

                Sets the enclosing ``found`` to the matching snapshot's managed
                object reference. Used by the ``snapshot_remove`` branch to
                locate the snapshot to remove.

                Args:
                    sn: A snapshot tree node (``rootSnapshotList`` entry or
                        child) with ``name``, ``snapshot``, and
                        ``childSnapshotList`` attributes.
                """
                nonlocal found
                if sn.name == snap:
                    found = sn.snapshot
                for c in getattr(sn, "childSnapshotList", []) or []:
                    walk(c)

            for s in snaps:
                walk(s)
            if found is None:
                return {"success": False, "error": f"Snapshot '{snap}' not found."}
            t = found.RemoveSnapshot_Task(removeChildren=False)
            WaitForTask(t)
            return {"success": True, "action": act, "snapshot_name": snap}

        if act == "snapshot_revert":
            snap = (snapshot_name or "").strip()
            if not snap:
                return {"success": False, "error": "snapshot_name is required."}
            snaps = vm.snapshot.rootSnapshotList if vm.snapshot else []
            found = None

            def walk2(sn: Any) -> None:
                """Recursively search the snapshot tree for ``snap`` by name.

                Sets the enclosing ``found`` to the matching snapshot's managed
                object reference. Used by the ``snapshot_revert`` branch to
                locate the snapshot to revert to.

                Args:
                    sn: A snapshot tree node (``rootSnapshotList`` entry or
                        child) with ``name``, ``snapshot``, and
                        ``childSnapshotList`` attributes.
                """
                nonlocal found
                if sn.name == snap:
                    found = sn.snapshot
                for c in getattr(sn, "childSnapshotList", []) or []:
                    walk2(c)

            for s in snaps:
                walk2(s)
            if found is None:
                return {"success": False, "error": f"Snapshot '{snap}' not found."}
            t = found.RevertToSnapshot_Task()
            WaitForTask(t)
            return {"success": True, "action": act, "snapshot_name": snap}

        if act == "vm_destroy":
            if vm.runtime.powerState != vim.VirtualMachinePowerState.poweredOff:
                return {
                    "success": False,
                    "error": "VM must be powered off before destroy.",
                }
            t = vm.Destroy_Task()
            WaitForTask(t)
            return {"success": True, "action": act}

        return {"success": False, "error": f"Unsupported action: {act}"}

    except Exception as exc:
        logger.exception("vmware_dispatch failed: %s", exc)
        return {"success": False, "error": str(exc)}
    finally:
        if si is not None:
            try:
                Disconnect(si)
            except Exception:
                logger.exception("Disconnect failed")


async def _vmware_save_credentials(
    host: str,
    user: str,
    password: str,
    profile: str = "default",
    port: int = 443,
    verify_ssl: bool = False,
    ctx: Any = None,
) -> str:
    """Save an encrypted per-user vCenter/ESXi connection profile.

    Handler for the ``vmware_save_credentials`` tool. Validates that host, user,
    and password are present, then persists the connection settings so later
    ``vmware_control`` calls can reference them by ``credential_profile`` instead
    of passing secrets each time.

    It gates access through :func:`_check_priv` (``UNSANDBOXED_EXEC``) and writes
    the profile via ``_cred_save`` (``tools._credential_profile_store``), which
    encrypts and stores it under a per-user Redis key derived from
    ``CRED_PREFIX`` ("vmware") and the user id. The returned JSON echoes the
    non-secret fields plus ``has_password`` but never the password itself.
    Invoked by the tool dispatcher in ``tools/__init__.py``, which calls the
    handler with the tool arguments and the ToolContext; there are no other
    internal callers.

    Args:
        host: vCenter or ESXi hostname or IP.
        user: vSphere login (e.g. ``administrator@vsphere.local`` or ``root``).
        password: Password for ``user``.
        profile: Profile name to save under (defaults to ``"default"``).
        port: vSphere API TCP port (defaults to 443).
        verify_ssl: Whether to verify TLS for this profile (defaults to False).
        ctx: The ToolContext; required for privilege check and encrypted store.

    Returns:
        A JSON string describing the save outcome. On success it includes the
        stored ``host``, ``user``, ``port``, ``verify_ssl``, and
        ``has_password: True``; on failure it includes an ``error`` (no context,
        missing privilege, or a missing required field).
    """
    if ctx is None:
        return json.dumps({"success": False, "error": "No context."})
    auth_err = await _check_priv(ctx)
    if auth_err:
        return auth_err
    if not host or not host.strip():
        return json.dumps({"success": False, "error": "host is required."})
    if not user or not user.strip():
        return json.dumps({"success": False, "error": "user is required."})
    if not password or str(password).strip() == "":
        return json.dumps({"success": False, "error": "password is required."})
    data = {
        "host": host.strip(),
        "user": user.strip(),
        "password": str(password),
        "port": int(port),
        "verify_ssl": bool(verify_ssl),
    }
    raw = await _cred_save(CRED_PREFIX, profile, data, ctx)
    out = json.loads(raw)
    if out.get("success"):
        out["host"] = data["host"]
        out["user"] = data["user"]
        out["port"] = data["port"]
        out["verify_ssl"] = data["verify_ssl"]
        out["has_password"] = True
    return json.dumps(out)


async def _vmware_list_credentials(ctx: Any = None) -> str:
    """List the names of saved VMware profiles for the current user.

    Handler for the ``vmware_list_credentials`` tool. Returns only profile
    names, never secrets, so the user can pick a ``credential_profile`` for
    ``vmware_control``.

    It gates access through :func:`_check_priv` (``UNSANDBOXED_EXEC``) and reads
    from the encrypted store via ``_cred_list`` (``tools._credential_profile_store``)
    under the per-user ``CRED_PREFIX`` ("vmware") Redis key. Invoked by the tool
    dispatcher in ``tools/__init__.py``; there are no other internal callers.

    Args:
        ctx: The ToolContext; required for the privilege check and store access.

    Returns:
        A JSON string with the list of profile names, or an ``error`` (no
        context or missing privilege).
    """
    if ctx is None:
        return json.dumps({"success": False, "error": "No context."})
    auth_err = await _check_priv(ctx)
    if auth_err:
        return auth_err
    return await _cred_list(CRED_PREFIX, ctx)


async def _vmware_delete_credentials(profile: str = "default", ctx: Any = None) -> str:
    """Delete a saved VMware connection profile for the current user.

    Handler for the ``vmware_delete_credentials`` tool. Removes the named profile
    from the encrypted per-user store.

    It gates access through :func:`_check_priv` (``UNSANDBOXED_EXEC``) and deletes
    via ``_cred_delete`` (``tools._credential_profile_store``) under the per-user
    ``CRED_PREFIX`` ("vmware") Redis key. Invoked by the tool dispatcher in
    ``tools/__init__.py``; there are no other internal callers.

    Args:
        profile: Name of the profile to delete (defaults to ``"default"``).
        ctx: The ToolContext; required for the privilege check and store access.

    Returns:
        A JSON string describing the delete outcome, or an ``error`` (no context
        or missing privilege).
    """
    if ctx is None:
        return json.dumps({"success": False, "error": "No context."})
    auth_err = await _check_priv(ctx)
    if auth_err:
        return auth_err
    return await _cred_delete(CRED_PREFIX, profile, ctx)


[docs] async def run( action: str = "", host: str = "", user: str = "", password: str = "", port: int = 443, verify_ssl: bool = False, vm_uuid: str | None = None, vm_name: str | None = None, snapshot_name: str | None = None, credential_profile: str = "", ctx: Any = None, ) -> str: """Validate and execute one VMware vCenter/ESXi management action. Handler for the ``vmware_control`` tool and the module's primary entry point. It enforces the privilege gate, validates the action against ``_VALID_ACTIONS``, resolves connection settings (optionally from a saved credential profile, with explicit kwargs overriding), checks that the required ``vm_uuid``/``vm_name`` is present for per-VM actions, then runs the blocking vSphere work off the event loop. Results are size-capped to ``_MAX_JSON_CHARS`` so an oversized inventory cannot overrun the response. It calls :func:`_check_priv` (``UNSANDBOXED_EXEC``) for authorization; when ``credential_profile`` is set it loads the encrypted profile via ``_cred_load`` and combines it with the passed kwargs via ``_cred_merge`` (``tools._credential_profile_store``); and it executes :func:`_vmware_dispatch` through ``asyncio.to_thread`` so the synchronous pyVmomi session does not block the loop. Invoked by the tool dispatcher in ``tools/__init__.py``, which calls this handler with the tool arguments and the ToolContext; there are no other internal callers. Args: action: The action to perform; must be in ``_VALID_ACTIONS``. host: vCenter/ESXi host; required unless supplied by the profile. user: vSphere login; required unless supplied by the profile. password: Password; required unless supplied by the profile. port: vSphere API TCP port (defaults to 443). verify_ssl: Whether to verify TLS (defaults to False). vm_uuid: VM instance UUID for per-VM actions (preferred). vm_name: VM display name for per-VM actions when no UUID is given. snapshot_name: Snapshot name, required for the ``snapshot_*`` actions. credential_profile: Optional saved profile name; its values are used as defaults and overridden by any non-empty kwargs above. ctx: The ToolContext; required for privilege check and profile loading. Returns: A JSON string with the dispatch result, or a JSON error (no context, missing privilege, invalid action, profile load failure, missing host/user/password, missing VM identifier, or a size-limit truncation notice when the serialized result exceeds ``_MAX_JSON_CHARS``). """ if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx) if auth_err: return auth_err act = (action or "").strip() if act not in _VALID_ACTIONS: return json.dumps( { "success": False, "error": f"Invalid action. Use one of: {', '.join(sorted(_VALID_ACTIONS))}.", } ) if credential_profile and str(credential_profile).strip(): loaded = await _cred_load(CRED_PREFIX, credential_profile.strip(), ctx) if isinstance(loaded, str): return json.dumps({"success": False, "error": loaded}) merged = _cred_merge( loaded, { "host": host, "user": user, "password": password, "port": port, "verify_ssl": verify_ssl, }, ) host = str(merged.get("host") or "") user = str(merged.get("user") or "") password = str(merged.get("password") or "") if merged.get("port") is not None: port = int(merged["port"]) if "verify_ssl" in merged: verify_ssl = bool(merged["verify_ssl"]) if not host.strip() or not user.strip() or not str(password).strip(): return json.dumps( { "success": False, "error": "host, user, and password are required (or use credential_profile).", } ) need_vm = act not in {"list_vms", "list_hosts", "list_datastores"} if need_vm and not ( (vm_uuid and str(vm_uuid).strip()) or (vm_name and str(vm_name).strip()) ): return json.dumps( { "success": False, "error": "vm_uuid or vm_name is required for this action.", } ) result = await asyncio.to_thread( _vmware_dispatch, host, user, password, port, verify_ssl, act, vm_uuid, vm_name, snapshot_name, ) out = json.dumps(result, default=str) if len(out) > _MAX_JSON_CHARS: return json.dumps( { "success": result.get("success", False), "error": "Response exceeded size limit; narrow the request.", "truncated": True, } ) return out
_TOOLS_BASE = [ { "name": "vmware_save_credentials", "description": ( "Save vCenter/ESXi connection settings encrypted per-user (host, user, password, port, verify_ssl). " "Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "host": { "type": "string", "description": "vCenter or ESXi hostname or IP.", }, "user": { "type": "string", "description": "e.g. administrator@vsphere.local or root.", }, "password": {"type": "string", "description": "Password."}, "profile": {"type": "string", "default": "default"}, "port": {"type": "integer", "default": 443}, "verify_ssl": {"type": "boolean", "default": False}, }, "required": ["host", "user", "password"], }, "handler": _vmware_save_credentials, }, { "name": "vmware_list_credentials", "description": "List saved VMware profile names for this user. Requires UNSANDBOXED_EXEC.", "parameters": {"type": "object", "properties": {}}, "handler": _vmware_list_credentials, }, { "name": "vmware_delete_credentials", "description": "Delete a saved VMware profile. Requires UNSANDBOXED_EXEC.", "parameters": { "type": "object", "properties": {"profile": {"type": "string", "default": "default"}}, "required": ["profile"], }, "handler": _vmware_delete_credentials, }, ] _CONTROL_PARAMS = { "type": "object", "properties": { "action": {"type": "string", "enum": sorted(_VALID_ACTIONS)}, "host": { "type": "string", "description": "vCenter or ESXi (if not using credential_profile).", }, "user": {"type": "string"}, "password": {"type": "string"}, "port": {"type": "integer", "default": 443}, "verify_ssl": {"type": "boolean", "default": False}, "credential_profile": { "type": "string", "description": "Load connection from saved profile; kwargs override.", "default": "", }, "vm_uuid": {"type": "string", "description": "VM instance UUID (preferred)."}, "vm_name": { "type": "string", "description": "VM display name if UUID unknown.", }, "snapshot_name": {"type": "string", "description": "For snapshot_* actions."}, }, "required": ["action"], } TOOLS = _TOOLS_BASE + [ { "name": "vmware_control", "description": ( "Manage VMware vCenter or ESXi: list VMs/hosts/datastores; VM power, snapshots, destroy. " "Requires UNSANDBOXED_EXEC. Use vm_uuid (instance UUID) or vm_name. " "verify_ssl false by default for self-signed certs." ), "parameters": _CONTROL_PARAMS, "handler": run, }, ] TOOL_NAME = "vmware_control" TOOL_DESCRIPTION = ( "Manage VMware vCenter or ESXi via pyVmomi. Prefer vmware_save_credentials + credential_profile. " "Requires UNSANDBOXED_EXEC." ) TOOL_PARAMETERS = _CONTROL_PARAMS