Source code for tools.proxmox_tools

"""Proxmox VE hypervisor control via the Proxmox REST API.

Uses the ``proxmoxer`` library (see ``requirements.txt``). Authenticate with
either a password (``user`` like ``root@pam``) or an API token (``user`` like
``root@pam!tokenid`` and ``password`` as the token secret).

**Security:** requires ``UNSANDBOXED_EXEC`` — controls foundational infrastructure.

TLS verification is disabled by default (typical self-signed Proxmox certs); use
only on trusted networks or terminate TLS elsewhere.
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
from typing import Any

from tools._credential_profile_store import (
    delete_profile as _cred_delete,
    list_profile_names as _cred_list,
    load_profile as _cred_load,
    merge_profile as _cred_merge,
    save_profile as _cred_save,
)

logger = logging.getLogger(__name__)

CRED_PREFIX = "proxmox"

_MAX_JSON_CHARS = 500_000

_VALID_ACTIONS = frozenset(
    {
        "list_vms",
        "start_vm",
        "stop_vm",
        "reset_vm",
        "create_snapshot",
        "rollback_snapshot",
        "destroy_vm",
    }
)


async def _check_priv(ctx: Any) -> str | None:
    """Authorize the caller for Proxmox control via the UNSANDBOXED_EXEC privilege.

    Acts as the security gate shared by every handler in this module: only users
    holding the ``UNSANDBOXED_EXEC`` bit may touch the hypervisor, since these
    tools control foundational infrastructure (VM lifecycle, snapshots, destroy).

    It imports ``PRIVILEGES`` and ``has_privilege`` from ``tools.alter_privileges``
    and calls ``has_privilege(ctx.redis, ctx.user_id, PRIVILEGES["UNSANDBOXED_EXEC"],
    ctx.config)``, which reads the user's privilege mask from Redis. On denial it
    emits a ``SECURITY`` warning to the module logger and returns a ready-to-send
    JSON error; if the privilege subsystem cannot be imported it returns a JSON
    error instead of raising.

    Called by every public entry point here — :func:`run`,
    :func:`_proxmox_save_credentials`, :func:`_proxmox_list_credentials`, and
    :func:`_proxmox_delete_credentials` — as their first step after the context
    check.

    Args:
        ctx: The tool ``ToolContext``. Its ``redis``, ``config``, and ``user_id``
            attributes are read (all via ``getattr`` with safe defaults).

    Returns:
        str | None: ``None`` when the user is authorized; otherwise a JSON string
        ``{"success": False, "error": ...}`` describing the denial or the missing
        privilege system, which the caller should return to the user verbatim.
    """
    try:
        from tools.alter_privileges import PRIVILEGES, has_privilege

        redis = getattr(ctx, "redis", None)
        config = getattr(ctx, "config", None)
        user_id = getattr(ctx, "user_id", "") or ""
        if not await has_privilege(
            redis,
            user_id,
            PRIVILEGES["UNSANDBOXED_EXEC"],
            config,
        ):
            logger.warning(
                "SECURITY: User %s attempted proxmox_control without UNSANDBOXED_EXEC",
                user_id,
            )
            return json.dumps(
                {
                    "success": False,
                    "error": (
                        "The user does not have the UNSANDBOXED_EXEC privilege. "
                        "Ask an admin to grant it with the alter_privileges tool."
                    ),
                }
            )
    except ImportError:
        return json.dumps({"success": False, "error": "Privilege system unavailable."})
    return None


def _bad_token(s: str) -> bool:
    """Report whether a string contains characters that could enable injection.

    Rejects newlines, carriage returns, and NUL bytes so that user-supplied
    hosts, nodes, users, and snapshot names cannot smuggle control characters
    into the Proxmox API request path.

    Called by the field validators :func:`_host_ok`, :func:`_node_ok`,
    :func:`_user_ok`, and :func:`_snapshot_name_ok`.

    Args:
        s: The candidate string to inspect.

    Returns:
        bool: ``True`` if ``s`` contains a newline, carriage return, or NUL byte;
        ``False`` for empty or clean strings.
    """
    if not s:
        return False
    return any(c in s for c in "\n\r\x00")


def _host_ok(host: str) -> bool:
    """Validate a Proxmox API host string.

    Accepts a non-empty host of at most 512 characters with no control
    characters. Permissive about the actual hostname/IP/URL shape because
    ``proxmoxer`` handles host parsing.

    Delegates the control-character check to :func:`_bad_token`, and is called by
    :func:`run` and :func:`_proxmox_save_credentials` before any API call or
    credential save.

    Args:
        host: The candidate host (hostname, IP, or URL fragment); stripped of
            surrounding whitespace before validation.

    Returns:
        bool: ``True`` if the host is non-empty, within the length cap, and free of
        control characters; ``False`` otherwise.
    """
    h = (host or "").strip()
    if not h or len(h) > 512 or _bad_token(h):
        return False
    return True


def _node_ok(node: str) -> bool:
    """Validate a Proxmox cluster node name.

    Stricter than :func:`_host_ok`: in addition to the non-empty, length, and
    control-character checks (the latter via :func:`_bad_token`), every character
    must be alphanumeric or one of ``-``, ``_``, or ``.``. The node name is
    interpolated into the ``proxmox.nodes(node)`` API path in
    :func:`_proxmox_dispatch`, so this guards that path segment.

    Called by :func:`run` before dispatching any action to the hypervisor.

    Args:
        node: The candidate node name; stripped of surrounding whitespace before
            validation.

    Returns:
        bool: ``True`` if the node is non-empty, at most 256 characters, free of
        control characters, and composed only of allowed characters; ``False``
        otherwise.
    """
    n = (node or "").strip()
    if not n or len(n) > 256 or _bad_token(n):
        return False
    return all(c.isalnum() or c in "-_." for c in n)


def _user_ok(user: str) -> bool:
    """Validate a Proxmox auth user/realm string.

    Accepts a non-empty value of at most 256 characters with no control
    characters. Kept permissive about ``@`` and ``!`` because the user field
    encodes both password realms (``root@pam``) and API-token identities
    (``root@pam!tokenid``).

    Delegates the control-character check to :func:`_bad_token`, and is called by
    :func:`run` and :func:`_proxmox_save_credentials`.

    Args:
        user: The candidate user/realm string; stripped of surrounding whitespace
            before validation.

    Returns:
        bool: ``True`` if the user is non-empty, within the length cap, and free of
        control characters; ``False`` otherwise.
    """
    u = (user or "").strip()
    if not u or len(u) > 256 or _bad_token(u):
        return False
    return True


def _snapshot_name_ok(name: str | None) -> bool:
    """Validate an optional snapshot name.

    Treats ``None`` as valid because snapshot names are only required for the
    snapshot/rollback actions; when present, the name must be a non-empty string
    of at most 256 characters with no control characters (checked via
    :func:`_bad_token`).

    Called by :func:`run` to vet ``snapshot_name`` before dispatch; the per-action
    "required" enforcement happens later in :func:`_proxmox_dispatch`.

    Args:
        name: The candidate snapshot name, or ``None`` when not supplied.

    Returns:
        bool: ``True`` if ``name`` is ``None`` or a clean, non-empty, length-capped
        string; ``False`` otherwise.
    """
    if name is None:
        return True
    s = str(name).strip()
    if not s or len(s) > 256 or _bad_token(s):
        return False
    return True


def _proxmox_dispatch(
    host: str,
    user: str,
    password: str,
    node: str,
    action: str,
    vmid: int | None,
    snapshot_name: str | None,
) -> dict[str, Any]:
    """Perform one blocking Proxmox API action and return a result dict.

    Contains all the synchronous ``proxmoxer`` I/O for this tool. It constructs a
    ``ProxmoxAPI`` client (with ``verify_ssl=False`` for typical self-signed
    Proxmox certs), resolves ``proxmox.nodes(node).qemu``, and dispatches on
    ``action``: ``list_vms`` returns the node's QEMU guest list; ``start_vm`` /
    ``stop_vm`` / ``reset_vm`` POST to the VM's ``status`` endpoints;
    ``create_snapshot`` and ``rollback_snapshot`` POST to the snapshot endpoints;
    and ``destroy_vm`` calls ``vm.delete(purge=1)``. These calls drive real
    network requests to the hypervisor and mutate VM state, so they are inherently
    side-effecting.

    Because every ``proxmoxer`` call is blocking, this function is never awaited
    directly: :func:`run` invokes it through ``asyncio.to_thread(...)`` so the
    event loop is not blocked. It is the only internal caller. The credential and
    parameter validation has already happened in :func:`run`; this function only
    re-checks the ``vmid``/``snapshot_name`` requirements per action and reports a
    missing ``proxmoxer`` install.

    Args:
        host: Proxmox API host (hostname, IP, or URL); stripped before use.
        user: Auth user/realm, e.g. ``root@pam`` or ``root@pam!tokenid``.
        password: The account password or API-token secret.
        node: Cluster node name; stripped and used as the API path segment.
        action: One of the members of :data:`_VALID_ACTIONS`.
        vmid: Target QEMU VM id; required for every action except ``list_vms``.
        snapshot_name: Snapshot name; required for ``create_snapshot`` and
            ``rollback_snapshot``, otherwise unused.

    Returns:
        dict[str, Any]: A result mapping with ``success`` plus action-specific keys
        (e.g. ``vms`` for ``list_vms``, or ``node``/``vmid``/``snapshot_name``),
        or ``{"success": False, "error": ...}`` when the package is missing, a
        required parameter is absent, the action is unsupported, or the underlying
        API call raises. Exceptions from ``proxmoxer`` are caught, logged via
        ``logger.exception``, and folded into the error dict rather than
        propagated.
    """
    try:
        from proxmoxer import ProxmoxAPI
    except ImportError:
        return {
            "success": False,
            "error": (
                "The proxmoxer package is not installed. "
                "Add it to the environment: pip install proxmoxer"
            ),
        }

    try:
        proxmox = ProxmoxAPI(
            host.strip(),
            user=user.strip(),
            password=str(password),
            verify_ssl=False,
        )
        n = node.strip()
        qemu = proxmox.nodes(n).qemu

        if action == "list_vms":
            vms = qemu.get()
            return {"success": True, "action": action, "node": n, "vms": vms}

        if vmid is None:
            return {
                "success": False,
                "error": f"Parameter vmid is required for action '{action}'.",
            }

        vm = qemu(vmid)

        if action == "start_vm":
            vm.status.start.post()
            return {"success": True, "action": action, "node": n, "vmid": vmid}

        if action == "stop_vm":
            vm.status.stop.post()
            return {"success": True, "action": action, "node": n, "vmid": vmid}

        if action == "reset_vm":
            vm.status.reset.post()
            return {"success": True, "action": action, "node": n, "vmid": vmid}

        if action == "create_snapshot":
            snap = (snapshot_name or "").strip()
            if not snap:
                return {
                    "success": False,
                    "error": "snapshot_name is required for create_snapshot.",
                }
            vm.snapshot.post(snapname=snap)
            return {
                "success": True,
                "action": action,
                "node": n,
                "vmid": vmid,
                "snapshot_name": snap,
            }

        if action == "rollback_snapshot":
            snap = (snapshot_name or "").strip()
            if not snap:
                return {
                    "success": False,
                    "error": "snapshot_name is required for rollback_snapshot.",
                }
            vm.snapshot(snap).rollback.post()
            return {
                "success": True,
                "action": action,
                "node": n,
                "vmid": vmid,
                "snapshot_name": snap,
            }

        if action == "destroy_vm":
            vm.delete(purge=1)
            return {"success": True, "action": action, "node": n, "vmid": vmid}

        return {"success": False, "error": f"Unsupported action: {action}"}

    except Exception as exc:
        logger.exception("proxmox_control failed: %s", exc)
        return {
            "success": False,
            "error": str(exc),
            "action": action,
        }


async def _proxmox_save_credentials(
    host: str,
    user: str,
    password: str,
    node: str = "",
    profile: str = "default",
    ctx: Any = None,
) -> str:
    """Save encrypted Proxmox connection credentials as a named per-user profile.

    Handler for the ``proxmox_save_credentials`` tool. After authorizing the
    caller and validating the inputs, it stores host, user, password, and default
    node so later ``proxmox_control`` calls can reference them by
    ``credential_profile`` instead of repeating secrets.

    It enforces authorization via :func:`_check_priv`, validates ``host`` and
    ``user`` with :func:`_host_ok` / :func:`_user_ok`, then delegates to
    :func:`tools._credential_profile_store.save_profile` (imported as ``_cred_save``)
    under the ``"proxmox"`` prefix. That call encrypts the JSON blob with the
    per-user key (AES-GCM) and writes it into the Redis hash
    ``stargazer:proxmox_credentials:{user_id}`` at field ``profile``.

    Registered in the module ``TOOLS`` list as the ``proxmox_save_credentials``
    handler and dispatched by ``tool_loader.py``; no direct in-repo callers.

    Args:
        host: Proxmox API host to store.
        user: Auth user/realm to store (e.g. ``root@pam`` or a token identity).
        password: Account password or API-token secret to store (required).
        node: Default cluster node name to store; defaults to ``""``.
        profile: Profile name to save under; defaults to ``"default"``.
        ctx: The tool ``ToolContext`` supplying ``redis``, ``config``, and
            ``user_id``; ``None`` yields a "No context." error.

    Returns:
        str: A JSON result string — the success/message payload from the
        credential store on success, or a JSON error for missing context, denied
        privilege, invalid host/user, or empty password.
    """
    if ctx is None:
        return json.dumps({"success": False, "error": "No context."})
    auth_err = await _check_priv(ctx)
    if auth_err:
        return auth_err
    if not _host_ok(host) or not _user_ok(user):
        return json.dumps({"success": False, "error": "Invalid host or user."})
    if not password or str(password).strip() == "":
        return json.dumps({"success": False, "error": "password is required."})
    data = {
        "host": host.strip(),
        "user": user.strip(),
        "password": str(password),
        "node": (node or "").strip(),
    }
    return await _cred_save(CRED_PREFIX, profile, data, ctx)


async def _proxmox_list_credentials(ctx: Any = None) -> str:
    """List the names of this user's saved Proxmox credential profiles.

    Handler for the ``proxmox_list_credentials`` tool. Returns only profile names
    (never the encrypted secrets) so the caller can discover which profiles exist.

    It authorizes via :func:`_check_priv`, then delegates to
    :func:`tools._credential_profile_store.list_profile_names` (imported as
    ``_cred_list``) under the ``"proxmox"`` prefix, which reads the Redis hash
    ``stargazer:proxmox_credentials:{user_id}`` and returns its sorted field names.

    Registered in the module ``TOOLS`` list as the ``proxmox_list_credentials``
    handler and dispatched by ``tool_loader.py``; no direct in-repo callers.

    Args:
        ctx: The tool ``ToolContext`` supplying ``redis``, ``config``, and
            ``user_id``; ``None`` yields a "No context." error.

    Returns:
        str: A JSON string with ``profiles`` and ``count`` on success, or a JSON
        error for missing context or denied privilege.
    """
    if ctx is None:
        return json.dumps({"success": False, "error": "No context."})
    auth_err = await _check_priv(ctx)
    if auth_err:
        return auth_err
    return await _cred_list(CRED_PREFIX, ctx)


async def _proxmox_delete_credentials(profile: str = "default", ctx: Any = None) -> str:
    """Delete one of this user's saved Proxmox credential profiles.

    Handler for the ``proxmox_delete_credentials`` tool. Removes a single named
    profile so its stored host/user/password/node can no longer be referenced.

    It authorizes via :func:`_check_priv`, then delegates to
    :func:`tools._credential_profile_store.delete_profile` (imported as
    ``_cred_delete``) under the ``"proxmox"`` prefix, which issues an ``HDEL`` on
    the field ``profile`` of the Redis hash
    ``stargazer:proxmox_credentials:{user_id}``.

    Registered in the module ``TOOLS`` list as the ``proxmox_delete_credentials``
    handler and dispatched by ``tool_loader.py``; no direct in-repo callers.

    Args:
        profile: Name of the profile to delete; defaults to ``"default"``.
        ctx: The tool ``ToolContext`` supplying ``redis``, ``config``, and
            ``user_id``; ``None`` yields a "No context." error.

    Returns:
        str: A JSON string reporting whether a profile was deleted, or a JSON error
        for missing context or denied privilege.
    """
    if ctx is None:
        return json.dumps({"success": False, "error": "No context."})
    auth_err = await _check_priv(ctx)
    if auth_err:
        return auth_err
    return await _cred_delete(CRED_PREFIX, profile, ctx)


[docs] async def run( host: str = "", user: str = "", password: str = "", node: str = "", action: str = "", vmid: int | None = None, snapshot_name: str | None = None, credential_profile: str = "", ctx: Any = None, ) -> str: """Authorize, validate, and execute one Proxmox VE control action, returning JSON. Handler for the ``proxmox_control`` tool — the public entry point for VM lifecycle and snapshot operations on a Proxmox hypervisor. It enforces the privilege gate, optionally hydrates connection details from a saved credential profile, validates every field, then runs the blocking Proxmox call off the event loop and serialises the result (truncating oversized payloads). It gates on :func:`_check_priv` (the ``UNSANDBOXED_EXEC`` privilege, resolved against Redis). When ``credential_profile`` is given it loads and decrypts that profile via :func:`tools._credential_profile_store.load_profile` (reading the Redis hash ``stargazer:proxmox_credentials:{user_id}``) and overlays explicit kwargs with :func:`tools._credential_profile_store.merge_profile`. It checks ``action`` against :data:`_VALID_ACTIONS` and validates host/user/node/snapshot with :func:`_host_ok`, :func:`_user_ok`, :func:`_node_ok`, and :func:`_snapshot_name_ok`, requiring ``vmid`` for every action except ``list_vms``. The real ``proxmoxer`` work — which makes live HTTP calls to the hypervisor and can mutate VM state — is run via ``asyncio.to_thread(_proxmox_dispatch, ...)`` so the loop is not blocked. Results larger than ``_MAX_JSON_CHARS`` are replaced with a "narrow the request" error. This function mutates no module state of its own. Registered in the module ``TOOLS`` list as the ``proxmox_control`` handler and dispatched by ``tool_loader.py``; no direct in-repo Python callers. Args: host (str): Proxmox API host; required unless supplied via ``credential_profile``. user (str): Auth user/realm, e.g. ``root@pam`` or ``root@pam!tokenid``. password (str): Account password or API-token secret (required). node (str): Cluster node name to target. action (str): One of :data:`_VALID_ACTIONS` (``list_vms``, ``start_vm``, ``stop_vm``, ``reset_vm``, ``create_snapshot``, ``rollback_snapshot``, ``destroy_vm``). vmid (int | None): Target QEMU VM id; required for every action except ``list_vms``. snapshot_name (str | None): Snapshot name; required for the snapshot and rollback actions. credential_profile (str): Name of a saved profile to load host/user/ password/node from; explicit kwargs override the loaded values. ctx: The tool ``ToolContext`` supplying ``redis``, ``config``, and ``user_id``; ``None`` yields a "No context." error. Returns: str: A JSON result string — the dispatch payload (with ``success`` and action-specific keys) on success, or a JSON error for missing context, denied privilege, a bad profile load, an invalid action/host/user/node/ snapshot, a missing required ``vmid``, or an over-limit response. """ if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx) if auth_err: return auth_err if credential_profile and str(credential_profile).strip(): loaded = await _cred_load(CRED_PREFIX, credential_profile.strip(), ctx) if isinstance(loaded, str): return json.dumps({"success": False, "error": loaded}) merged = _cred_merge( loaded, { "host": host, "user": user, "password": password, "node": node, }, ) host = str(merged.get("host") or "") user = str(merged.get("user") or "") password = str(merged.get("password") or "") node = str(merged.get("node") or "") act = (action or "").strip() if act not in _VALID_ACTIONS: return json.dumps( { "success": False, "error": f"Invalid action. Must be one of: {', '.join(sorted(_VALID_ACTIONS))}.", } ) if not _host_ok(host): return json.dumps({"success": False, "error": "Invalid or empty host."}) if not _user_ok(user): return json.dumps({"success": False, "error": "Invalid or empty user."}) if password is None or str(password) == "": return json.dumps({"success": False, "error": "password is required."}) if not _node_ok(node): return json.dumps({"success": False, "error": "Invalid node name."}) if not _snapshot_name_ok(snapshot_name): return json.dumps({"success": False, "error": "Invalid snapshot_name."}) if act != "list_vms" and vmid is None: return json.dumps( { "success": False, "error": f"vmid is required for action '{act}'.", } ) result = await asyncio.to_thread( _proxmox_dispatch, host, user, password, node, act, vmid, snapshot_name, ) out = json.dumps(result, default=str) if len(out) > _MAX_JSON_CHARS: return json.dumps( { "success": result.get("success", False), "error": "Response exceeded size limit; narrow the request (e.g. list_vms).", "truncated": True, } ) return out
_CONTROL_PARAMS = { "type": "object", "properties": { "host": { "type": "string", "description": "Proxmox API host (if not using credential_profile).", }, "user": {"type": "string"}, "password": {"type": "string"}, "node": {"type": "string", "description": "Cluster node name."}, "credential_profile": { "type": "string", "description": "Load host/user/password/node from saved profile; kwargs override.", "default": "", }, "action": { "type": "string", "enum": sorted(_VALID_ACTIONS), }, "vmid": {"type": "integer"}, "snapshot_name": {"type": "string"}, }, "required": ["action"], } TOOLS = [ { "name": "proxmox_save_credentials", "description": ( "Save Proxmox API credentials (host, user, password, default node) encrypted per-user. " "Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "host": {"type": "string"}, "user": {"type": "string"}, "password": {"type": "string"}, "node": { "type": "string", "description": "Default node name", "default": "", }, "profile": {"type": "string", "default": "default"}, }, "required": ["host", "user", "password"], }, "handler": _proxmox_save_credentials, }, { "name": "proxmox_list_credentials", "description": "List saved Proxmox profile names. Requires UNSANDBOXED_EXEC.", "parameters": {"type": "object", "properties": {}}, "handler": _proxmox_list_credentials, }, { "name": "proxmox_delete_credentials", "description": "Delete a saved Proxmox profile. Requires UNSANDBOXED_EXEC.", "parameters": { "type": "object", "properties": {"profile": {"type": "string", "default": "default"}}, "required": ["profile"], }, "handler": _proxmox_delete_credentials, }, { "name": "proxmox_control", "description": ( "Control Proxmox VE QEMU/KVM guests: list VMs, start/stop/reset, snapshots, " "rollback, or destroy a VM. Uses the Proxmox API (proxmoxer). " "Requires UNSANDBOXED_EXEC. " "Auth: password (user e.g. root@pam) or API token. " "Use credential_profile or pass host/user/password/node." ), "parameters": _CONTROL_PARAMS, "handler": run, }, ] TOOL_NAME = "proxmox_control" TOOL_DESCRIPTION = "Control Proxmox VE (see proxmox_control tool)." TOOL_PARAMETERS = _CONTROL_PARAMS