"""Proxmox VE hypervisor control via the Proxmox REST API.
Uses the ``proxmoxer`` library (see ``requirements.txt``). Authenticate with
either a password (``user`` like ``root@pam``) or an API token (``user`` like
``root@pam!tokenid`` and ``password`` as the token secret).
**Security:** requires ``UNSANDBOXED_EXEC`` — controls foundational infrastructure.
TLS verification is disabled by default (typical self-signed Proxmox certs); use
only on trusted networks or terminate TLS elsewhere.
"""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
from typing import Any
from tools._credential_profile_store import (
delete_profile as _cred_delete,
list_profile_names as _cred_list,
load_profile as _cred_load,
merge_profile as _cred_merge,
save_profile as _cred_save,
)
logger = logging.getLogger(__name__)
CRED_PREFIX = "proxmox"
_MAX_JSON_CHARS = 500_000
_VALID_ACTIONS = frozenset(
{
"list_vms",
"start_vm",
"stop_vm",
"reset_vm",
"create_snapshot",
"rollback_snapshot",
"destroy_vm",
}
)
async def _check_priv(ctx: Any) -> str | None:
"""Authorize the caller for Proxmox control via the UNSANDBOXED_EXEC privilege.
Acts as the security gate shared by every handler in this module: only users
holding the ``UNSANDBOXED_EXEC`` bit may touch the hypervisor, since these
tools control foundational infrastructure (VM lifecycle, snapshots, destroy).
It imports ``PRIVILEGES`` and ``has_privilege`` from ``tools.alter_privileges``
and calls ``has_privilege(ctx.redis, ctx.user_id, PRIVILEGES["UNSANDBOXED_EXEC"],
ctx.config)``, which reads the user's privilege mask from Redis. On denial it
emits a ``SECURITY`` warning to the module logger and returns a ready-to-send
JSON error; if the privilege subsystem cannot be imported it returns a JSON
error instead of raising.
Called by every public entry point here — :func:`run`,
:func:`_proxmox_save_credentials`, :func:`_proxmox_list_credentials`, and
:func:`_proxmox_delete_credentials` — as their first step after the context
check.
Args:
ctx: The tool ``ToolContext``. Its ``redis``, ``config``, and ``user_id``
attributes are read (all via ``getattr`` with safe defaults).
Returns:
str | None: ``None`` when the user is authorized; otherwise a JSON string
``{"success": False, "error": ...}`` describing the denial or the missing
privilege system, which the caller should return to the user verbatim.
"""
try:
from tools.alter_privileges import PRIVILEGES, has_privilege
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
user_id = getattr(ctx, "user_id", "") or ""
if not await has_privilege(
redis,
user_id,
PRIVILEGES["UNSANDBOXED_EXEC"],
config,
):
logger.warning(
"SECURITY: User %s attempted proxmox_control without UNSANDBOXED_EXEC",
user_id,
)
return json.dumps(
{
"success": False,
"error": (
"The user does not have the UNSANDBOXED_EXEC privilege. "
"Ask an admin to grant it with the alter_privileges tool."
),
}
)
except ImportError:
return json.dumps({"success": False, "error": "Privilege system unavailable."})
return None
def _bad_token(s: str) -> bool:
"""Report whether a string contains characters that could enable injection.
Rejects newlines, carriage returns, and NUL bytes so that user-supplied
hosts, nodes, users, and snapshot names cannot smuggle control characters
into the Proxmox API request path.
Called by the field validators :func:`_host_ok`, :func:`_node_ok`,
:func:`_user_ok`, and :func:`_snapshot_name_ok`.
Args:
s: The candidate string to inspect.
Returns:
bool: ``True`` if ``s`` contains a newline, carriage return, or NUL byte;
``False`` for empty or clean strings.
"""
if not s:
return False
return any(c in s for c in "\n\r\x00")
def _host_ok(host: str) -> bool:
"""Validate a Proxmox API host string.
Accepts a non-empty host of at most 512 characters with no control
characters. Permissive about the actual hostname/IP/URL shape because
``proxmoxer`` handles host parsing.
Delegates the control-character check to :func:`_bad_token`, and is called by
:func:`run` and :func:`_proxmox_save_credentials` before any API call or
credential save.
Args:
host: The candidate host (hostname, IP, or URL fragment); stripped of
surrounding whitespace before validation.
Returns:
bool: ``True`` if the host is non-empty, within the length cap, and free of
control characters; ``False`` otherwise.
"""
h = (host or "").strip()
if not h or len(h) > 512 or _bad_token(h):
return False
return True
def _node_ok(node: str) -> bool:
"""Validate a Proxmox cluster node name.
Stricter than :func:`_host_ok`: in addition to the non-empty, length, and
control-character checks (the latter via :func:`_bad_token`), every character
must be alphanumeric or one of ``-``, ``_``, or ``.``. The node name is
interpolated into the ``proxmox.nodes(node)`` API path in
:func:`_proxmox_dispatch`, so this guards that path segment.
Called by :func:`run` before dispatching any action to the hypervisor.
Args:
node: The candidate node name; stripped of surrounding whitespace before
validation.
Returns:
bool: ``True`` if the node is non-empty, at most 256 characters, free of
control characters, and composed only of allowed characters; ``False``
otherwise.
"""
n = (node or "").strip()
if not n or len(n) > 256 or _bad_token(n):
return False
return all(c.isalnum() or c in "-_." for c in n)
def _user_ok(user: str) -> bool:
"""Validate a Proxmox auth user/realm string.
Accepts a non-empty value of at most 256 characters with no control
characters. Kept permissive about ``@`` and ``!`` because the user field
encodes both password realms (``root@pam``) and API-token identities
(``root@pam!tokenid``).
Delegates the control-character check to :func:`_bad_token`, and is called by
:func:`run` and :func:`_proxmox_save_credentials`.
Args:
user: The candidate user/realm string; stripped of surrounding whitespace
before validation.
Returns:
bool: ``True`` if the user is non-empty, within the length cap, and free of
control characters; ``False`` otherwise.
"""
u = (user or "").strip()
if not u or len(u) > 256 or _bad_token(u):
return False
return True
def _snapshot_name_ok(name: str | None) -> bool:
"""Validate an optional snapshot name.
Treats ``None`` as valid because snapshot names are only required for the
snapshot/rollback actions; when present, the name must be a non-empty string
of at most 256 characters with no control characters (checked via
:func:`_bad_token`).
Called by :func:`run` to vet ``snapshot_name`` before dispatch; the per-action
"required" enforcement happens later in :func:`_proxmox_dispatch`.
Args:
name: The candidate snapshot name, or ``None`` when not supplied.
Returns:
bool: ``True`` if ``name`` is ``None`` or a clean, non-empty, length-capped
string; ``False`` otherwise.
"""
if name is None:
return True
s = str(name).strip()
if not s or len(s) > 256 or _bad_token(s):
return False
return True
def _proxmox_dispatch(
host: str,
user: str,
password: str,
node: str,
action: str,
vmid: int | None,
snapshot_name: str | None,
) -> dict[str, Any]:
"""Perform one blocking Proxmox API action and return a result dict.
Contains all the synchronous ``proxmoxer`` I/O for this tool. It constructs a
``ProxmoxAPI`` client (with ``verify_ssl=False`` for typical self-signed
Proxmox certs), resolves ``proxmox.nodes(node).qemu``, and dispatches on
``action``: ``list_vms`` returns the node's QEMU guest list; ``start_vm`` /
``stop_vm`` / ``reset_vm`` POST to the VM's ``status`` endpoints;
``create_snapshot`` and ``rollback_snapshot`` POST to the snapshot endpoints;
and ``destroy_vm`` calls ``vm.delete(purge=1)``. These calls drive real
network requests to the hypervisor and mutate VM state, so they are inherently
side-effecting.
Because every ``proxmoxer`` call is blocking, this function is never awaited
directly: :func:`run` invokes it through ``asyncio.to_thread(...)`` so the
event loop is not blocked. It is the only internal caller. The credential and
parameter validation has already happened in :func:`run`; this function only
re-checks the ``vmid``/``snapshot_name`` requirements per action and reports a
missing ``proxmoxer`` install.
Args:
host: Proxmox API host (hostname, IP, or URL); stripped before use.
user: Auth user/realm, e.g. ``root@pam`` or ``root@pam!tokenid``.
password: The account password or API-token secret.
node: Cluster node name; stripped and used as the API path segment.
action: One of the members of :data:`_VALID_ACTIONS`.
vmid: Target QEMU VM id; required for every action except ``list_vms``.
snapshot_name: Snapshot name; required for ``create_snapshot`` and
``rollback_snapshot``, otherwise unused.
Returns:
dict[str, Any]: A result mapping with ``success`` plus action-specific keys
(e.g. ``vms`` for ``list_vms``, or ``node``/``vmid``/``snapshot_name``),
or ``{"success": False, "error": ...}`` when the package is missing, a
required parameter is absent, the action is unsupported, or the underlying
API call raises. Exceptions from ``proxmoxer`` are caught, logged via
``logger.exception``, and folded into the error dict rather than
propagated.
"""
try:
from proxmoxer import ProxmoxAPI
except ImportError:
return {
"success": False,
"error": (
"The proxmoxer package is not installed. "
"Add it to the environment: pip install proxmoxer"
),
}
try:
proxmox = ProxmoxAPI(
host.strip(),
user=user.strip(),
password=str(password),
verify_ssl=False,
)
n = node.strip()
qemu = proxmox.nodes(n).qemu
if action == "list_vms":
vms = qemu.get()
return {"success": True, "action": action, "node": n, "vms": vms}
if vmid is None:
return {
"success": False,
"error": f"Parameter vmid is required for action '{action}'.",
}
vm = qemu(vmid)
if action == "start_vm":
vm.status.start.post()
return {"success": True, "action": action, "node": n, "vmid": vmid}
if action == "stop_vm":
vm.status.stop.post()
return {"success": True, "action": action, "node": n, "vmid": vmid}
if action == "reset_vm":
vm.status.reset.post()
return {"success": True, "action": action, "node": n, "vmid": vmid}
if action == "create_snapshot":
snap = (snapshot_name or "").strip()
if not snap:
return {
"success": False,
"error": "snapshot_name is required for create_snapshot.",
}
vm.snapshot.post(snapname=snap)
return {
"success": True,
"action": action,
"node": n,
"vmid": vmid,
"snapshot_name": snap,
}
if action == "rollback_snapshot":
snap = (snapshot_name or "").strip()
if not snap:
return {
"success": False,
"error": "snapshot_name is required for rollback_snapshot.",
}
vm.snapshot(snap).rollback.post()
return {
"success": True,
"action": action,
"node": n,
"vmid": vmid,
"snapshot_name": snap,
}
if action == "destroy_vm":
vm.delete(purge=1)
return {"success": True, "action": action, "node": n, "vmid": vmid}
return {"success": False, "error": f"Unsupported action: {action}"}
except Exception as exc:
logger.exception("proxmox_control failed: %s", exc)
return {
"success": False,
"error": str(exc),
"action": action,
}
async def _proxmox_save_credentials(
host: str,
user: str,
password: str,
node: str = "",
profile: str = "default",
ctx: Any = None,
) -> str:
"""Save encrypted Proxmox connection credentials as a named per-user profile.
Handler for the ``proxmox_save_credentials`` tool. After authorizing the
caller and validating the inputs, it stores host, user, password, and default
node so later ``proxmox_control`` calls can reference them by
``credential_profile`` instead of repeating secrets.
It enforces authorization via :func:`_check_priv`, validates ``host`` and
``user`` with :func:`_host_ok` / :func:`_user_ok`, then delegates to
:func:`tools._credential_profile_store.save_profile` (imported as ``_cred_save``)
under the ``"proxmox"`` prefix. That call encrypts the JSON blob with the
per-user key (AES-GCM) and writes it into the Redis hash
``stargazer:proxmox_credentials:{user_id}`` at field ``profile``.
Registered in the module ``TOOLS`` list as the ``proxmox_save_credentials``
handler and dispatched by ``tool_loader.py``; no direct in-repo callers.
Args:
host: Proxmox API host to store.
user: Auth user/realm to store (e.g. ``root@pam`` or a token identity).
password: Account password or API-token secret to store (required).
node: Default cluster node name to store; defaults to ``""``.
profile: Profile name to save under; defaults to ``"default"``.
ctx: The tool ``ToolContext`` supplying ``redis``, ``config``, and
``user_id``; ``None`` yields a "No context." error.
Returns:
str: A JSON result string — the success/message payload from the
credential store on success, or a JSON error for missing context, denied
privilege, invalid host/user, or empty password.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
if not _host_ok(host) or not _user_ok(user):
return json.dumps({"success": False, "error": "Invalid host or user."})
if not password or str(password).strip() == "":
return json.dumps({"success": False, "error": "password is required."})
data = {
"host": host.strip(),
"user": user.strip(),
"password": str(password),
"node": (node or "").strip(),
}
return await _cred_save(CRED_PREFIX, profile, data, ctx)
async def _proxmox_list_credentials(ctx: Any = None) -> str:
"""List the names of this user's saved Proxmox credential profiles.
Handler for the ``proxmox_list_credentials`` tool. Returns only profile names
(never the encrypted secrets) so the caller can discover which profiles exist.
It authorizes via :func:`_check_priv`, then delegates to
:func:`tools._credential_profile_store.list_profile_names` (imported as
``_cred_list``) under the ``"proxmox"`` prefix, which reads the Redis hash
``stargazer:proxmox_credentials:{user_id}`` and returns its sorted field names.
Registered in the module ``TOOLS`` list as the ``proxmox_list_credentials``
handler and dispatched by ``tool_loader.py``; no direct in-repo callers.
Args:
ctx: The tool ``ToolContext`` supplying ``redis``, ``config``, and
``user_id``; ``None`` yields a "No context." error.
Returns:
str: A JSON string with ``profiles`` and ``count`` on success, or a JSON
error for missing context or denied privilege.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
return await _cred_list(CRED_PREFIX, ctx)
async def _proxmox_delete_credentials(profile: str = "default", ctx: Any = None) -> str:
"""Delete one of this user's saved Proxmox credential profiles.
Handler for the ``proxmox_delete_credentials`` tool. Removes a single named
profile so its stored host/user/password/node can no longer be referenced.
It authorizes via :func:`_check_priv`, then delegates to
:func:`tools._credential_profile_store.delete_profile` (imported as
``_cred_delete``) under the ``"proxmox"`` prefix, which issues an ``HDEL`` on
the field ``profile`` of the Redis hash
``stargazer:proxmox_credentials:{user_id}``.
Registered in the module ``TOOLS`` list as the ``proxmox_delete_credentials``
handler and dispatched by ``tool_loader.py``; no direct in-repo callers.
Args:
profile: Name of the profile to delete; defaults to ``"default"``.
ctx: The tool ``ToolContext`` supplying ``redis``, ``config``, and
``user_id``; ``None`` yields a "No context." error.
Returns:
str: A JSON string reporting whether a profile was deleted, or a JSON error
for missing context or denied privilege.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
return await _cred_delete(CRED_PREFIX, profile, ctx)
[docs]
async def run(
host: str = "",
user: str = "",
password: str = "",
node: str = "",
action: str = "",
vmid: int | None = None,
snapshot_name: str | None = None,
credential_profile: str = "",
ctx: Any = None,
) -> str:
"""Authorize, validate, and execute one Proxmox VE control action, returning JSON.
Handler for the ``proxmox_control`` tool — the public entry point for VM
lifecycle and snapshot operations on a Proxmox hypervisor. It enforces the
privilege gate, optionally hydrates connection details from a saved credential
profile, validates every field, then runs the blocking Proxmox call off the
event loop and serialises the result (truncating oversized payloads).
It gates on :func:`_check_priv` (the ``UNSANDBOXED_EXEC`` privilege, resolved
against Redis). When ``credential_profile`` is given it loads and decrypts that
profile via :func:`tools._credential_profile_store.load_profile` (reading the
Redis hash ``stargazer:proxmox_credentials:{user_id}``) and overlays explicit
kwargs with :func:`tools._credential_profile_store.merge_profile`. It checks
``action`` against :data:`_VALID_ACTIONS` and validates host/user/node/snapshot
with :func:`_host_ok`, :func:`_user_ok`, :func:`_node_ok`, and
:func:`_snapshot_name_ok`, requiring ``vmid`` for every action except
``list_vms``. The real ``proxmoxer`` work — which makes live HTTP calls to the
hypervisor and can mutate VM state — is run via
``asyncio.to_thread(_proxmox_dispatch, ...)`` so the loop is not blocked.
Results larger than ``_MAX_JSON_CHARS`` are replaced with a "narrow the request"
error. This function mutates no module state of its own.
Registered in the module ``TOOLS`` list as the ``proxmox_control`` handler and
dispatched by ``tool_loader.py``; no direct in-repo Python callers.
Args:
host (str): Proxmox API host; required unless supplied via
``credential_profile``.
user (str): Auth user/realm, e.g. ``root@pam`` or ``root@pam!tokenid``.
password (str): Account password or API-token secret (required).
node (str): Cluster node name to target.
action (str): One of :data:`_VALID_ACTIONS` (``list_vms``, ``start_vm``,
``stop_vm``, ``reset_vm``, ``create_snapshot``, ``rollback_snapshot``,
``destroy_vm``).
vmid (int | None): Target QEMU VM id; required for every action except
``list_vms``.
snapshot_name (str | None): Snapshot name; required for the snapshot and
rollback actions.
credential_profile (str): Name of a saved profile to load host/user/
password/node from; explicit kwargs override the loaded values.
ctx: The tool ``ToolContext`` supplying ``redis``, ``config``, and
``user_id``; ``None`` yields a "No context." error.
Returns:
str: A JSON result string — the dispatch payload (with ``success`` and
action-specific keys) on success, or a JSON error for missing context,
denied privilege, a bad profile load, an invalid action/host/user/node/
snapshot, a missing required ``vmid``, or an over-limit response.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
if credential_profile and str(credential_profile).strip():
loaded = await _cred_load(CRED_PREFIX, credential_profile.strip(), ctx)
if isinstance(loaded, str):
return json.dumps({"success": False, "error": loaded})
merged = _cred_merge(
loaded,
{
"host": host,
"user": user,
"password": password,
"node": node,
},
)
host = str(merged.get("host") or "")
user = str(merged.get("user") or "")
password = str(merged.get("password") or "")
node = str(merged.get("node") or "")
act = (action or "").strip()
if act not in _VALID_ACTIONS:
return json.dumps(
{
"success": False,
"error": f"Invalid action. Must be one of: {', '.join(sorted(_VALID_ACTIONS))}.",
}
)
if not _host_ok(host):
return json.dumps({"success": False, "error": "Invalid or empty host."})
if not _user_ok(user):
return json.dumps({"success": False, "error": "Invalid or empty user."})
if password is None or str(password) == "":
return json.dumps({"success": False, "error": "password is required."})
if not _node_ok(node):
return json.dumps({"success": False, "error": "Invalid node name."})
if not _snapshot_name_ok(snapshot_name):
return json.dumps({"success": False, "error": "Invalid snapshot_name."})
if act != "list_vms" and vmid is None:
return json.dumps(
{
"success": False,
"error": f"vmid is required for action '{act}'.",
}
)
result = await asyncio.to_thread(
_proxmox_dispatch,
host,
user,
password,
node,
act,
vmid,
snapshot_name,
)
out = json.dumps(result, default=str)
if len(out) > _MAX_JSON_CHARS:
return json.dumps(
{
"success": result.get("success", False),
"error": "Response exceeded size limit; narrow the request (e.g. list_vms).",
"truncated": True,
}
)
return out
_CONTROL_PARAMS = {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": "Proxmox API host (if not using credential_profile).",
},
"user": {"type": "string"},
"password": {"type": "string"},
"node": {"type": "string", "description": "Cluster node name."},
"credential_profile": {
"type": "string",
"description": "Load host/user/password/node from saved profile; kwargs override.",
"default": "",
},
"action": {
"type": "string",
"enum": sorted(_VALID_ACTIONS),
},
"vmid": {"type": "integer"},
"snapshot_name": {"type": "string"},
},
"required": ["action"],
}
TOOLS = [
{
"name": "proxmox_save_credentials",
"description": (
"Save Proxmox API credentials (host, user, password, default node) encrypted per-user. "
"Requires UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"host": {"type": "string"},
"user": {"type": "string"},
"password": {"type": "string"},
"node": {
"type": "string",
"description": "Default node name",
"default": "",
},
"profile": {"type": "string", "default": "default"},
},
"required": ["host", "user", "password"],
},
"handler": _proxmox_save_credentials,
},
{
"name": "proxmox_list_credentials",
"description": "List saved Proxmox profile names. Requires UNSANDBOXED_EXEC.",
"parameters": {"type": "object", "properties": {}},
"handler": _proxmox_list_credentials,
},
{
"name": "proxmox_delete_credentials",
"description": "Delete a saved Proxmox profile. Requires UNSANDBOXED_EXEC.",
"parameters": {
"type": "object",
"properties": {"profile": {"type": "string", "default": "default"}},
"required": ["profile"],
},
"handler": _proxmox_delete_credentials,
},
{
"name": "proxmox_control",
"description": (
"Control Proxmox VE QEMU/KVM guests: list VMs, start/stop/reset, snapshots, "
"rollback, or destroy a VM. Uses the Proxmox API (proxmoxer). "
"Requires UNSANDBOXED_EXEC. "
"Auth: password (user e.g. root@pam) or API token. "
"Use credential_profile or pass host/user/password/node."
),
"parameters": _CONTROL_PARAMS,
"handler": run,
},
]
TOOL_NAME = "proxmox_control"
TOOL_DESCRIPTION = "Control Proxmox VE (see proxmox_control tool)."
TOOL_PARAMETERS = _CONTROL_PARAMS