"""VMware ESXi / vCenter management via pyVmomi (vSphere API).
Stateless: connect per call in a worker thread, disconnect in ``finally``.
Requires UNSANDBOXED_EXEC. TLS verification off by default for self-signed
infrastructure certs (trusted networks only).
Connection profiles: encrypted per-user JSON (host, user, password, port, verify_ssl).
"""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
import ssl
from typing import Any
from tools._credential_profile_store import (
delete_profile as _cred_delete,
list_profile_names as _cred_list,
load_profile as _cred_load,
merge_profile as _cred_merge,
save_profile as _cred_save,
)
logger = logging.getLogger(__name__)
_MAX_JSON_CHARS = 500_000
CRED_PREFIX = "vmware"
_VALID_ACTIONS = frozenset(
{
"list_vms",
"list_hosts",
"list_datastores",
"vm_power_on",
"vm_power_off",
"vm_reset",
"vm_suspend",
"vm_resume",
"snapshot_create",
"snapshot_remove",
"snapshot_revert",
"vm_destroy",
}
)
async def _check_priv(ctx: Any) -> str | None:
"""Verify the calling user holds the ``UNSANDBOXED_EXEC`` privilege.
Acts as the authorization gate for every public handler in this module,
since talking to vCenter/ESXi grants real control over infrastructure.
Returns ``None`` to mean "allowed"; any non-``None`` return is a ready-made
JSON error string the caller should hand straight back to the LLM.
It lazily imports ``tools.alter_privileges`` and calls ``has_privilege`` with
the user's Redis-backed privilege set, reading ``ctx.redis``, ``ctx.config``,
and ``ctx.user_id`` off the ToolContext; it performs no writes. Called by
:func:`_vmware_save_credentials`, :func:`_vmware_list_credentials`,
:func:`_vmware_delete_credentials`, and :func:`run` at the top of each, before
any credential or vSphere operation runs.
Args:
ctx: The ToolContext for the current invocation. ``ctx.redis``,
``ctx.config``, and ``ctx.user_id`` are read to resolve privileges.
Returns:
``None`` if the user has the privilege; otherwise a JSON string with
``success: False`` and an explanatory ``error`` (privilege missing, or
the privilege system being unavailable).
"""
try:
from tools.alter_privileges import PRIVILEGES, has_privilege
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
user_id = getattr(ctx, "user_id", "") or ""
if not await has_privilege(
redis,
user_id,
PRIVILEGES["UNSANDBOXED_EXEC"],
config,
):
return json.dumps(
{
"success": False,
"error": (
"The user does not have the UNSANDBOXED_EXEC privilege. "
"Ask an admin to grant it with the alter_privileges tool."
),
}
)
except ImportError:
return json.dumps({"success": False, "error": "Privilege system unavailable."})
return None
def _ssl_context(verify_ssl: bool) -> Any:
"""Build the ``ssl.SSLContext`` used for the vSphere connection.
Returns a normally-verifying context when ``verify_ssl`` is true, or an
unverified context otherwise so that self-signed infrastructure certificates
(the default for ESXi/vCenter on trusted networks) are accepted.
Wraps the stdlib ``ssl`` module only and has no side effects. Called by
:func:`_vmware_dispatch` to obtain the context it passes to ``SmartConnect``.
Args:
verify_ssl: When true, validate the server certificate chain; when
false, disable certificate verification.
Returns:
An ``ssl.SSLContext`` configured for verified or unverified TLS.
"""
if verify_ssl:
return ssl.create_default_context()
return ssl._create_unverified_context()
def _find_vm(content: Any, vm_uuid: str | None, vm_name: str | None) -> Any:
"""Resolve a single VM managed object from a UUID or display name.
Prefers the instance UUID, which is unique, looking it up via the vSphere
``searchIndex.FindByUuid`` (instance-UUID mode). Only when no UUID is given
does it fall back to a linear scan of all VMs by display name using a
container view, which it always tears down via ``DestroyView``.
Operates purely against the live vSphere ``content`` object (the
ServiceInstance content retrieved by :func:`_vmware_dispatch`) and lazily
imports ``pyVmomi.vim`` for the view type filter; it performs no Redis or
network side effects beyond the vSphere API calls. Called by
:func:`_vmware_dispatch` for every per-VM action (power, snapshot, destroy)
after the connection is established.
Args:
content: The vSphere ServiceInstance content object exposing
``searchIndex``, ``viewManager``, and ``rootFolder``.
vm_uuid: VM instance UUID to look up; takes precedence when non-empty.
vm_name: VM display name to match when no UUID is supplied.
Returns:
The matching ``vim.VirtualMachine`` managed object.
Raises:
ValueError: If no VM matches the given UUID, no VM matches the given
name, or neither ``vm_uuid`` nor ``vm_name`` was provided.
"""
from pyVmomi import vim
if vm_uuid and str(vm_uuid).strip():
u = str(vm_uuid).strip()
vm = content.searchIndex.FindByUuid(None, u, True, True)
if vm is None:
raise ValueError(f"No VM with instance UUID '{u}'.")
return vm
if vm_name and str(vm_name).strip():
name = str(vm_name).strip()
view = content.viewManager.CreateContainerView(
content.rootFolder,
[vim.VirtualMachine],
True,
)
try:
for vm in view.view:
if vm.name == name:
return vm
finally:
view.DestroyView()
raise ValueError(f"No VM with name '{name}'.")
raise ValueError("vm_uuid or vm_name is required.")
def _vmware_dispatch(
host: str,
user: str,
password: str,
port: int,
verify_ssl: bool,
action: str,
vm_uuid: str | None,
vm_name: str | None,
snapshot_name: str | None,
) -> dict[str, Any]:
"""Open a vSphere session, perform one action, and disconnect.
This is the synchronous, blocking core of the tool: it connects with
``SmartConnect``, retrieves content, then dispatches on ``action`` to list
inventory (VMs/hosts/datastores via container views) or to operate on a
single VM resolved by :func:`_find_vm` (power on/off/reset/suspend/resume,
snapshot create/remove/revert, or destroy). Long-running operations are
submitted as vSphere Tasks and awaited with ``WaitForTask``. The session is
always closed in a ``finally`` block via ``Disconnect``, keeping the tool
stateless across calls.
It lazily imports ``pyVim``/``pyVmomi``, builds its TLS context through
:func:`_ssl_context`, and logs failures through the module ``logger`` with
``logger.exception``; it returns errors as data rather than raising. It runs
off the event loop because :func:`run` invokes it via
``asyncio.to_thread``; there are no other internal callers.
Args:
host: vCenter or ESXi hostname or IP to connect to.
user: vSphere login (e.g. ``administrator@vsphere.local`` or ``root``).
password: Password for ``user``.
port: TCP port for the vSphere API (typically 443).
verify_ssl: Whether to verify the server TLS certificate.
action: One of the supported actions (see ``_VALID_ACTIONS``).
vm_uuid: VM instance UUID for per-VM actions; preferred over name.
vm_name: VM display name for per-VM actions when no UUID is given.
snapshot_name: Snapshot name, required for the ``snapshot_*`` actions.
Returns:
A result dict with ``success`` plus action-specific keys: ``vms``,
``hosts``, or ``datastores`` (with ``count``) for listings;
``snapshot_name`` for snapshot actions; or an ``error`` string on
failure (unsupported action, missing snapshot, VM not powered off for
destroy, pyvmomi not installed, or any caught exception).
"""
try:
from pyVim.connect import Disconnect, SmartConnect
from pyVim.task import WaitForTask
from pyVmomi import vim
except ImportError:
return {
"success": False,
"error": "pyvmomi is not installed. pip install pyvmomi",
}
ctx_ssl = _ssl_context(verify_ssl)
si = None
try:
si = SmartConnect(
host=host.strip(),
user=user.strip(),
pwd=str(password),
port=int(port),
sslContext=ctx_ssl,
)
content = si.RetrieveContent()
act = (action or "").strip()
if act == "list_vms":
view = content.viewManager.CreateContainerView(
content.rootFolder,
[vim.VirtualMachine],
True,
)
rows: list[dict[str, Any]] = []
try:
for vm in view.view:
rows.append(
{
"name": vm.name,
"instance_uuid": getattr(vm, "instanceUuid", None),
"bios_uuid": getattr(vm, "uuid", None),
"power_state": str(vm.runtime.powerState),
}
)
finally:
view.DestroyView()
return {"success": True, "action": act, "vms": rows, "count": len(rows)}
if act == "list_hosts":
view = content.viewManager.CreateContainerView(
content.rootFolder,
[vim.HostSystem],
True,
)
rows = []
try:
for h in view.view:
rows.append(
{
"name": h.name,
"connection_state": str(h.runtime.connectionState),
}
)
finally:
view.DestroyView()
return {"success": True, "action": act, "hosts": rows, "count": len(rows)}
if act == "list_datastores":
view = content.viewManager.CreateContainerView(
content.rootFolder,
[vim.Datastore],
True,
)
rows = []
try:
for ds in view.view:
rows.append(
{
"name": ds.name,
"type": ds.summary.type,
"url": getattr(ds.summary, "url", None),
}
)
finally:
view.DestroyView()
return {
"success": True,
"action": act,
"datastores": rows,
"count": len(rows),
}
vm = _find_vm(content, vm_uuid, vm_name)
if act == "vm_power_on":
if vm.runtime.powerState == vim.VirtualMachinePowerState.poweredOn:
return {
"success": True,
"action": act,
"message": "Already powered on.",
}
t = vm.PowerOnVM_Task(None)
WaitForTask(t)
return {"success": True, "action": act}
if act == "vm_power_off":
t = vm.PowerOffVM_Task()
WaitForTask(t)
return {"success": True, "action": act}
if act == "vm_reset":
t = vm.ResetVM_Task()
WaitForTask(t)
return {"success": True, "action": act}
if act == "vm_suspend":
t = vm.SuspendVM_Task()
WaitForTask(t)
return {"success": True, "action": act}
if act == "vm_resume":
t = vm.PowerOnVM_Task(None)
WaitForTask(t)
return {"success": True, "action": act}
if act == "snapshot_create":
snap = (snapshot_name or "").strip()
if not snap:
return {"success": False, "error": "snapshot_name is required."}
t = vm.CreateSnapshot_Task(
name=snap,
memory=False,
quiesce=False,
)
WaitForTask(t)
return {"success": True, "action": act, "snapshot_name": snap}
if act == "snapshot_remove":
snap = (snapshot_name or "").strip()
if not snap:
return {"success": False, "error": "snapshot_name is required."}
snaps = vm.snapshot.rootSnapshotList if vm.snapshot else []
found = None
def walk(sn: Any) -> None:
"""Recursively search the snapshot tree for ``snap`` by name.
Sets the enclosing ``found`` to the matching snapshot's managed
object reference. Used by the ``snapshot_remove`` branch to
locate the snapshot to remove.
Args:
sn: A snapshot tree node (``rootSnapshotList`` entry or
child) with ``name``, ``snapshot``, and
``childSnapshotList`` attributes.
"""
nonlocal found
if sn.name == snap:
found = sn.snapshot
for c in getattr(sn, "childSnapshotList", []) or []:
walk(c)
for s in snaps:
walk(s)
if found is None:
return {"success": False, "error": f"Snapshot '{snap}' not found."}
t = found.RemoveSnapshot_Task(removeChildren=False)
WaitForTask(t)
return {"success": True, "action": act, "snapshot_name": snap}
if act == "snapshot_revert":
snap = (snapshot_name or "").strip()
if not snap:
return {"success": False, "error": "snapshot_name is required."}
snaps = vm.snapshot.rootSnapshotList if vm.snapshot else []
found = None
def walk2(sn: Any) -> None:
"""Recursively search the snapshot tree for ``snap`` by name.
Sets the enclosing ``found`` to the matching snapshot's managed
object reference. Used by the ``snapshot_revert`` branch to
locate the snapshot to revert to.
Args:
sn: A snapshot tree node (``rootSnapshotList`` entry or
child) with ``name``, ``snapshot``, and
``childSnapshotList`` attributes.
"""
nonlocal found
if sn.name == snap:
found = sn.snapshot
for c in getattr(sn, "childSnapshotList", []) or []:
walk2(c)
for s in snaps:
walk2(s)
if found is None:
return {"success": False, "error": f"Snapshot '{snap}' not found."}
t = found.RevertToSnapshot_Task()
WaitForTask(t)
return {"success": True, "action": act, "snapshot_name": snap}
if act == "vm_destroy":
if vm.runtime.powerState != vim.VirtualMachinePowerState.poweredOff:
return {
"success": False,
"error": "VM must be powered off before destroy.",
}
t = vm.Destroy_Task()
WaitForTask(t)
return {"success": True, "action": act}
return {"success": False, "error": f"Unsupported action: {act}"}
except Exception as exc:
logger.exception("vmware_dispatch failed: %s", exc)
return {"success": False, "error": str(exc)}
finally:
if si is not None:
try:
Disconnect(si)
except Exception:
logger.exception("Disconnect failed")
async def _vmware_save_credentials(
host: str,
user: str,
password: str,
profile: str = "default",
port: int = 443,
verify_ssl: bool = False,
ctx: Any = None,
) -> str:
"""Save an encrypted per-user vCenter/ESXi connection profile.
Handler for the ``vmware_save_credentials`` tool. Validates that host, user,
and password are present, then persists the connection settings so later
``vmware_control`` calls can reference them by ``credential_profile`` instead
of passing secrets each time.
It gates access through :func:`_check_priv` (``UNSANDBOXED_EXEC``) and writes
the profile via ``_cred_save`` (``tools._credential_profile_store``), which
encrypts and stores it under a per-user Redis key derived from
``CRED_PREFIX`` ("vmware") and the user id. The returned JSON echoes the
non-secret fields plus ``has_password`` but never the password itself.
Invoked by the tool dispatcher in ``tools/__init__.py``, which calls the
handler with the tool arguments and the ToolContext; there are no other
internal callers.
Args:
host: vCenter or ESXi hostname or IP.
user: vSphere login (e.g. ``administrator@vsphere.local`` or ``root``).
password: Password for ``user``.
profile: Profile name to save under (defaults to ``"default"``).
port: vSphere API TCP port (defaults to 443).
verify_ssl: Whether to verify TLS for this profile (defaults to False).
ctx: The ToolContext; required for privilege check and encrypted store.
Returns:
A JSON string describing the save outcome. On success it includes the
stored ``host``, ``user``, ``port``, ``verify_ssl``, and
``has_password: True``; on failure it includes an ``error`` (no context,
missing privilege, or a missing required field).
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
if not host or not host.strip():
return json.dumps({"success": False, "error": "host is required."})
if not user or not user.strip():
return json.dumps({"success": False, "error": "user is required."})
if not password or str(password).strip() == "":
return json.dumps({"success": False, "error": "password is required."})
data = {
"host": host.strip(),
"user": user.strip(),
"password": str(password),
"port": int(port),
"verify_ssl": bool(verify_ssl),
}
raw = await _cred_save(CRED_PREFIX, profile, data, ctx)
out = json.loads(raw)
if out.get("success"):
out["host"] = data["host"]
out["user"] = data["user"]
out["port"] = data["port"]
out["verify_ssl"] = data["verify_ssl"]
out["has_password"] = True
return json.dumps(out)
async def _vmware_list_credentials(ctx: Any = None) -> str:
"""List the names of saved VMware profiles for the current user.
Handler for the ``vmware_list_credentials`` tool. Returns only profile
names, never secrets, so the user can pick a ``credential_profile`` for
``vmware_control``.
It gates access through :func:`_check_priv` (``UNSANDBOXED_EXEC``) and reads
from the encrypted store via ``_cred_list`` (``tools._credential_profile_store``)
under the per-user ``CRED_PREFIX`` ("vmware") Redis key. Invoked by the tool
dispatcher in ``tools/__init__.py``; there are no other internal callers.
Args:
ctx: The ToolContext; required for the privilege check and store access.
Returns:
A JSON string with the list of profile names, or an ``error`` (no
context or missing privilege).
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
return await _cred_list(CRED_PREFIX, ctx)
async def _vmware_delete_credentials(profile: str = "default", ctx: Any = None) -> str:
"""Delete a saved VMware connection profile for the current user.
Handler for the ``vmware_delete_credentials`` tool. Removes the named profile
from the encrypted per-user store.
It gates access through :func:`_check_priv` (``UNSANDBOXED_EXEC``) and deletes
via ``_cred_delete`` (``tools._credential_profile_store``) under the per-user
``CRED_PREFIX`` ("vmware") Redis key. Invoked by the tool dispatcher in
``tools/__init__.py``; there are no other internal callers.
Args:
profile: Name of the profile to delete (defaults to ``"default"``).
ctx: The ToolContext; required for the privilege check and store access.
Returns:
A JSON string describing the delete outcome, or an ``error`` (no context
or missing privilege).
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
return await _cred_delete(CRED_PREFIX, profile, ctx)
[docs]
async def run(
action: str = "",
host: str = "",
user: str = "",
password: str = "",
port: int = 443,
verify_ssl: bool = False,
vm_uuid: str | None = None,
vm_name: str | None = None,
snapshot_name: str | None = None,
credential_profile: str = "",
ctx: Any = None,
) -> str:
"""Validate and execute one VMware vCenter/ESXi management action.
Handler for the ``vmware_control`` tool and the module's primary entry point.
It enforces the privilege gate, validates the action against
``_VALID_ACTIONS``, resolves connection settings (optionally from a saved
credential profile, with explicit kwargs overriding), checks that the
required ``vm_uuid``/``vm_name`` is present for per-VM actions, then runs the
blocking vSphere work off the event loop. Results are size-capped to
``_MAX_JSON_CHARS`` so an oversized inventory cannot overrun the response.
It calls :func:`_check_priv` (``UNSANDBOXED_EXEC``) for authorization; when
``credential_profile`` is set it loads the encrypted profile via ``_cred_load``
and combines it with the passed kwargs via ``_cred_merge``
(``tools._credential_profile_store``); and it executes
:func:`_vmware_dispatch` through ``asyncio.to_thread`` so the synchronous
pyVmomi session does not block the loop. Invoked by the tool dispatcher in
``tools/__init__.py``, which calls this handler with the tool arguments and
the ToolContext; there are no other internal callers.
Args:
action: The action to perform; must be in ``_VALID_ACTIONS``.
host: vCenter/ESXi host; required unless supplied by the profile.
user: vSphere login; required unless supplied by the profile.
password: Password; required unless supplied by the profile.
port: vSphere API TCP port (defaults to 443).
verify_ssl: Whether to verify TLS (defaults to False).
vm_uuid: VM instance UUID for per-VM actions (preferred).
vm_name: VM display name for per-VM actions when no UUID is given.
snapshot_name: Snapshot name, required for the ``snapshot_*`` actions.
credential_profile: Optional saved profile name; its values are used as
defaults and overridden by any non-empty kwargs above.
ctx: The ToolContext; required for privilege check and profile loading.
Returns:
A JSON string with the dispatch result, or a JSON error (no context,
missing privilege, invalid action, profile load failure, missing
host/user/password, missing VM identifier, or a size-limit truncation
notice when the serialized result exceeds ``_MAX_JSON_CHARS``).
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
act = (action or "").strip()
if act not in _VALID_ACTIONS:
return json.dumps(
{
"success": False,
"error": f"Invalid action. Use one of: {', '.join(sorted(_VALID_ACTIONS))}.",
}
)
if credential_profile and str(credential_profile).strip():
loaded = await _cred_load(CRED_PREFIX, credential_profile.strip(), ctx)
if isinstance(loaded, str):
return json.dumps({"success": False, "error": loaded})
merged = _cred_merge(
loaded,
{
"host": host,
"user": user,
"password": password,
"port": port,
"verify_ssl": verify_ssl,
},
)
host = str(merged.get("host") or "")
user = str(merged.get("user") or "")
password = str(merged.get("password") or "")
if merged.get("port") is not None:
port = int(merged["port"])
if "verify_ssl" in merged:
verify_ssl = bool(merged["verify_ssl"])
if not host.strip() or not user.strip() or not str(password).strip():
return json.dumps(
{
"success": False,
"error": "host, user, and password are required (or use credential_profile).",
}
)
need_vm = act not in {"list_vms", "list_hosts", "list_datastores"}
if need_vm and not (
(vm_uuid and str(vm_uuid).strip()) or (vm_name and str(vm_name).strip())
):
return json.dumps(
{
"success": False,
"error": "vm_uuid or vm_name is required for this action.",
}
)
result = await asyncio.to_thread(
_vmware_dispatch,
host,
user,
password,
port,
verify_ssl,
act,
vm_uuid,
vm_name,
snapshot_name,
)
out = json.dumps(result, default=str)
if len(out) > _MAX_JSON_CHARS:
return json.dumps(
{
"success": result.get("success", False),
"error": "Response exceeded size limit; narrow the request.",
"truncated": True,
}
)
return out
_TOOLS_BASE = [
{
"name": "vmware_save_credentials",
"description": (
"Save vCenter/ESXi connection settings encrypted per-user (host, user, password, port, verify_ssl). "
"Requires UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": "vCenter or ESXi hostname or IP.",
},
"user": {
"type": "string",
"description": "e.g. administrator@vsphere.local or root.",
},
"password": {"type": "string", "description": "Password."},
"profile": {"type": "string", "default": "default"},
"port": {"type": "integer", "default": 443},
"verify_ssl": {"type": "boolean", "default": False},
},
"required": ["host", "user", "password"],
},
"handler": _vmware_save_credentials,
},
{
"name": "vmware_list_credentials",
"description": "List saved VMware profile names for this user. Requires UNSANDBOXED_EXEC.",
"parameters": {"type": "object", "properties": {}},
"handler": _vmware_list_credentials,
},
{
"name": "vmware_delete_credentials",
"description": "Delete a saved VMware profile. Requires UNSANDBOXED_EXEC.",
"parameters": {
"type": "object",
"properties": {"profile": {"type": "string", "default": "default"}},
"required": ["profile"],
},
"handler": _vmware_delete_credentials,
},
]
_CONTROL_PARAMS = {
"type": "object",
"properties": {
"action": {"type": "string", "enum": sorted(_VALID_ACTIONS)},
"host": {
"type": "string",
"description": "vCenter or ESXi (if not using credential_profile).",
},
"user": {"type": "string"},
"password": {"type": "string"},
"port": {"type": "integer", "default": 443},
"verify_ssl": {"type": "boolean", "default": False},
"credential_profile": {
"type": "string",
"description": "Load connection from saved profile; kwargs override.",
"default": "",
},
"vm_uuid": {"type": "string", "description": "VM instance UUID (preferred)."},
"vm_name": {
"type": "string",
"description": "VM display name if UUID unknown.",
},
"snapshot_name": {"type": "string", "description": "For snapshot_* actions."},
},
"required": ["action"],
}
TOOLS = _TOOLS_BASE + [
{
"name": "vmware_control",
"description": (
"Manage VMware vCenter or ESXi: list VMs/hosts/datastores; VM power, snapshots, destroy. "
"Requires UNSANDBOXED_EXEC. Use vm_uuid (instance UUID) or vm_name. "
"verify_ssl false by default for self-signed certs."
),
"parameters": _CONTROL_PARAMS,
"handler": run,
},
]
TOOL_NAME = "vmware_control"
TOOL_DESCRIPTION = (
"Manage VMware vCenter or ESXi via pyVmomi. Prefer vmware_save_credentials + credential_profile. "
"Requires UNSANDBOXED_EXEC."
)
TOOL_PARAMETERS = _CONTROL_PARAMS