Source code for tools.proxmox_tools

"""Proxmox VE hypervisor control via the Proxmox REST API.

Uses the ``proxmoxer`` library (see ``requirements.txt``). Authenticate with
either a password (``user`` like ``root@pam``) or an API token (``user`` like
``root@pam!tokenid`` and ``password`` as the token secret).

**Security:** requires ``UNSANDBOXED_EXEC`` — controls foundational infrastructure.

TLS verification is disabled by default (typical self-signed Proxmox certs); use
only on trusted networks or terminate TLS elsewhere.
"""

from __future__ import annotations

import asyncio
import json
import logging
from typing import Any

logger = logging.getLogger(__name__)

_MAX_JSON_CHARS = 500_000

_VALID_ACTIONS = frozenset({
    "list_vms",
    "start_vm",
    "stop_vm",
    "reset_vm",
    "create_snapshot",
    "rollback_snapshot",
    "destroy_vm",
})


async def _check_priv(ctx: Any) -> str | None:
    try:
        from tools.alter_privileges import PRIVILEGES, has_privilege

        redis = getattr(ctx, "redis", None)
        config = getattr(ctx, "config", None)
        user_id = getattr(ctx, "user_id", "") or ""
        if not await has_privilege(
            redis, user_id, PRIVILEGES["UNSANDBOXED_EXEC"], config,
        ):
            logger.warning(
                "SECURITY: User %s attempted proxmox_control without UNSANDBOXED_EXEC",
                user_id,
            )
            return json.dumps({
                "success": False,
                "error": (
                    "The user does not have the UNSANDBOXED_EXEC privilege. "
                    "Ask an admin to grant it with the alter_privileges tool."
                ),
            })
    except ImportError:
        return json.dumps({"success": False, "error": "Privilege system unavailable."})
    return None


def _bad_token(s: str) -> bool:
    if not s:
        return False
    return any(c in s for c in "\n\r\x00")


def _host_ok(host: str) -> bool:
    h = (host or "").strip()
    if not h or len(h) > 512 or _bad_token(h):
        return False
    return True


def _node_ok(node: str) -> bool:
    n = (node or "").strip()
    if not n or len(n) > 256 or _bad_token(n):
        return False
    return all(c.isalnum() or c in "-_." for c in n)


def _user_ok(user: str) -> bool:
    u = (user or "").strip()
    if not u or len(u) > 256 or _bad_token(u):
        return False
    return True


def _snapshot_name_ok(name: str | None) -> bool:
    if name is None:
        return True
    s = str(name).strip()
    if not s or len(s) > 256 or _bad_token(s):
        return False
    return True


def _proxmox_dispatch(
    host: str,
    user: str,
    password: str,
    node: str,
    action: str,
    vmid: int | None,
    snapshot_name: str | None,
) -> dict[str, Any]:
    try:
        from proxmoxer import ProxmoxAPI
    except ImportError:
        return {
            "success": False,
            "error": (
                "The proxmoxer package is not installed. "
                "Add it to the environment: pip install proxmoxer"
            ),
        }

    try:
        proxmox = ProxmoxAPI(
            host.strip(),
            user=user.strip(),
            password=str(password),
            verify_ssl=False,
        )
        n = node.strip()
        qemu = proxmox.nodes(n).qemu

        if action == "list_vms":
            vms = qemu.get()
            return {"success": True, "action": action, "node": n, "vms": vms}

        if vmid is None:
            return {
                "success": False,
                "error": f"Parameter vmid is required for action '{action}'.",
            }

        vm = qemu(vmid)

        if action == "start_vm":
            vm.status.start.post()
            return {"success": True, "action": action, "node": n, "vmid": vmid}

        if action == "stop_vm":
            vm.status.stop.post()
            return {"success": True, "action": action, "node": n, "vmid": vmid}

        if action == "reset_vm":
            vm.status.reset.post()
            return {"success": True, "action": action, "node": n, "vmid": vmid}

        if action == "create_snapshot":
            snap = (snapshot_name or "").strip()
            if not snap:
                return {
                    "success": False,
                    "error": "snapshot_name is required for create_snapshot.",
                }
            vm.snapshot.post(snapname=snap)
            return {
                "success": True,
                "action": action,
                "node": n,
                "vmid": vmid,
                "snapshot_name": snap,
            }

        if action == "rollback_snapshot":
            snap = (snapshot_name or "").strip()
            if not snap:
                return {
                    "success": False,
                    "error": "snapshot_name is required for rollback_snapshot.",
                }
            vm.snapshot(snap).rollback.post()
            return {
                "success": True,
                "action": action,
                "node": n,
                "vmid": vmid,
                "snapshot_name": snap,
            }

        if action == "destroy_vm":
            vm.delete(purge=1)
            return {"success": True, "action": action, "node": n, "vmid": vmid}

        return {"success": False, "error": f"Unsupported action: {action}"}

    except Exception as exc:
        logger.exception("proxmox_control failed: %s", exc)
        return {
            "success": False,
            "error": str(exc),
            "action": action,
        }


TOOL_NAME = "proxmox_control"
TOOL_DESCRIPTION = (
    "Control Proxmox VE QEMU/KVM guests: list VMs, start/stop/reset, snapshots, "
    "rollback, or destroy a VM. Uses the Proxmox API (proxmoxer). "
    "Requires UNSANDBOXED_EXEC. "
    "Auth: password (user e.g. root@pam) or API token (user e.g. root@pam!tokenid, "
    "password = token secret)."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "host": {
            "type": "string",
            "description": "Proxmox host IP or hostname (HTTPS API, default port 8006).",
        },
        "user": {
            "type": "string",
            "description": "API user, e.g. root@pam, or user@realm!tokenid for API tokens.",
        },
        "password": {
            "type": "string",
            "description": "Account password or API token secret.",
        },
        "node": {
            "type": "string",
            "description": "Proxmox node name (cluster member) to target.",
        },
        "action": {
            "type": "string",
            "description": (
                "Operation: list_vms; start_vm; stop_vm; reset_vm; create_snapshot; "
                "rollback_snapshot; destroy_vm."
            ),
            "enum": sorted(_VALID_ACTIONS),
        },
        "vmid": {
            "type": "integer",
            "description": (
                "QEMU guest VM ID. Not used for list_vms. Required for all other actions."
            ),
        },
        "snapshot_name": {
            "type": "string",
            "description": "Snapshot name — required for create_snapshot and rollback_snapshot.",
        },
    },
    "required": ["host", "user", "password", "node", "action"],
}


[docs] async def run( host: str, user: str, password: str, node: str, action: str, vmid: int | None = None, snapshot_name: str | None = None, ctx: Any = None, ) -> str: """Execute Proxmox API action and return JSON.""" if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx) if auth_err: return auth_err act = (action or "").strip() if act not in _VALID_ACTIONS: return json.dumps({ "success": False, "error": f"Invalid action. Must be one of: {', '.join(sorted(_VALID_ACTIONS))}.", }) if not _host_ok(host): return json.dumps({"success": False, "error": "Invalid or empty host."}) if not _user_ok(user): return json.dumps({"success": False, "error": "Invalid or empty user."}) if password is None or str(password) == "": return json.dumps({"success": False, "error": "password is required."}) if not _node_ok(node): return json.dumps({"success": False, "error": "Invalid node name."}) if not _snapshot_name_ok(snapshot_name): return json.dumps({"success": False, "error": "Invalid snapshot_name."}) if act != "list_vms" and vmid is None: return json.dumps({ "success": False, "error": f"vmid is required for action '{act}'.", }) result = await asyncio.to_thread( _proxmox_dispatch, host, user, password, node, act, vmid, snapshot_name, ) out = json.dumps(result, default=str) if len(out) > _MAX_JSON_CHARS: return json.dumps({ "success": result.get("success", False), "error": "Response exceeded size limit; narrow the request (e.g. list_vms).", "truncated": True, }) return out