Source code for tools.xenserver_tools

"""XenServer / XCP-ng pool management via the XenAPI XML-RPC (HTTPS).

Uses the ``XenAPI`` PyPI package (see ``requirements.txt``). Connects to the
pool master on port 443 by default. Works with XenServer and XCP-ng (same XAPI).

**Security:** requires ``UNSANDBOXED_EXEC`` — controls foundational infrastructure.

TLS verification is off by default (``verify_ssl=false``); typical self-signed
pool certs. Use only on trusted networks or terminate TLS elsewhere.
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
import re
import xmlrpc.client as xmlrpclib
from typing import Any, Callable

from tools._credential_profile_store import (
    delete_profile as _cred_delete,
    list_profile_names as _cred_list,
    load_profile as _cred_load,
    merge_profile as _cred_merge,
    save_profile as _cred_save,
)

logger = logging.getLogger(__name__)

CRED_PREFIX = "xenserver"

_MAX_JSON_CHARS = 500_000

_VALID_ACTIONS = frozenset(
    {
        "list_pools",
        "pool_info",
        "list_hosts",
        "host_info",
        "list_vms",
        "vm_start",
        "vm_clean_shutdown",
        "vm_hard_shutdown",
        "vm_reboot",
        "vm_suspend",
        "vm_resume",
        "snapshot_create",
        "snapshot_revert",
        "snapshot_delete",
        "vm_pool_migrate",
        "list_srs",
        "vm_destroy",
    }
)

_UUID_RE = re.compile(
    r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$",
)


async def _check_priv(ctx: Any, tool_name: str = "xenserver_control") -> str | None:
    """Enforce the ``UNSANDBOXED_EXEC`` privilege gate for XenServer tools.

    Verifies that the calling user holds the dangerous ``UNSANDBOXED_EXEC``
    privilege before any XenAPI or credential operation runs. This is the
    single security chokepoint every handler in this module passes through,
    since these tools control foundational virtualization infrastructure.

    Interactions: imports :data:`tools.alter_privileges.PRIVILEGES` and calls
    :func:`tools.alter_privileges.has_privilege`, passing ``ctx.redis``,
    ``ctx.user_id`` and ``ctx.config`` so the privilege bitmask can be read
    from the user's stored grants. On denial it logs a ``SECURITY`` warning
    naming the user and tool and returns a JSON error string; it never raises
    on a missing privilege.

    Called by: every public handler in this module —
    :func:`_xenserver_save_credentials`, :func:`_xenserver_list_credentials`,
    :func:`_xenserver_delete_credentials` (each passing its own ``tool_name``)
    and :func:`run` (using the default ``"xenserver_control"``).

    Args:
        ctx: The ToolContext for the invocation; supplies ``redis``, ``config``
            and ``user_id`` attributes used for the privilege lookup.
        tool_name: Name recorded in the security warning log line, identifying
            which tool the user attempted. Defaults to ``"xenserver_control"``.

    Returns:
        ``None`` when the user is authorized, otherwise a JSON string with
        ``success: False`` and an error explaining the missing privilege (or
        that the privilege system itself is unavailable). A non-``None`` return
        signals callers to short-circuit and return that error to the user.
    """
    try:
        from tools.alter_privileges import PRIVILEGES, has_privilege

        redis = getattr(ctx, "redis", None)
        config = getattr(ctx, "config", None)
        user_id = getattr(ctx, "user_id", "") or ""
        if not await has_privilege(
            redis,
            user_id,
            PRIVILEGES["UNSANDBOXED_EXEC"],
            config,
        ):
            logger.warning(
                "SECURITY: User %s attempted %s without UNSANDBOXED_EXEC",
                user_id,
                tool_name,
            )
            return json.dumps(
                {
                    "success": False,
                    "error": (
                        "The user does not have the UNSANDBOXED_EXEC privilege. "
                        "Ask an admin to grant it with the alter_privileges tool."
                    ),
                }
            )
    except ImportError:
        return json.dumps({"success": False, "error": "Privilege system unavailable."})
    return None


def _bad_token(s: str) -> bool:
    """Report whether a string contains control characters that could enable injection.

    Returns ``True`` if the string holds a newline, carriage return, or NUL
    byte — characters that have no place in a hostname, username, or UUID and
    could break out of the XML-RPC request line or smuggle extra data.

    Interactions: a pure predicate with no side effects.

    Called by: the per-field validators :func:`_host_ok`, :func:`_user_ok`,
    :func:`_uuid_ok` and :func:`_snapshot_name_ok` in this module.

    Args:
        s: The candidate token to inspect.

    Returns:
        ``True`` if ``s`` contains any of ``\\n``, ``\\r`` or ``\\x00``;
        ``False`` for an empty string or one with no such characters.
    """
    if not s:
        return False
    return any(c in s for c in "\n\r\x00")


def _host_ok(host: str) -> bool:
    """Validate a pool-master host string before it is turned into a URL.

    Accepts a non-empty, reasonably short (<= 512 char) hostname, IP, or URL
    that contains no control characters. This guards the value that
    :func:`_normalize_master_url` later passes to ``XenAPI.Session``.

    Interactions: strips the input and delegates the control-character check to
    :func:`_bad_token`; otherwise pure.

    Called by: :func:`run` and :func:`_xenserver_save_credentials`, each of
    which rejects the request with an "Invalid or empty host" error when this
    returns ``False``.

    Args:
        host: The candidate host string (hostname, IP, or ``http(s)://`` URL).

    Returns:
        ``True`` if the trimmed value is non-empty, at most 512 characters, and
        free of control characters; ``False`` otherwise.
    """
    h = (host or "").strip()
    if not h or len(h) > 512 or _bad_token(h):
        return False
    return True


def _user_ok(user: str) -> bool:
    """Validate the XenAPI login user before it is used to authenticate.

    Accepts a non-empty, reasonably short (<= 256 char) username free of
    control characters, guarding the value passed to
    ``session.login_with_password`` inside :func:`_xapi_dispatch`.

    Interactions: strips the input and delegates the control-character check to
    :func:`_bad_token`; otherwise pure.

    Called by: :func:`run` and :func:`_xenserver_save_credentials`, each of
    which rejects the request with an "Invalid or empty user" error when this
    returns ``False``.

    Args:
        user: The candidate API username.

    Returns:
        ``True`` if the trimmed value is non-empty, at most 256 characters, and
        free of control characters; ``False`` otherwise.
    """
    u = (user or "").strip()
    if not u or len(u) > 256 or _bad_token(u):
        return False
    return True


def _uuid_ok(u: str | None) -> bool:
    """Validate that a value is a well-formed XenServer object UUID.

    Requires a non-empty, short (<= 64 char), control-character-free string
    that matches the canonical ``8-4-4-4-12`` hyphenated UUID shape. Used to
    ensure VM, host, pool, and snapshot identifiers are syntactically valid
    before they reach ``get_by_uuid`` calls in :func:`_xapi_dispatch`.

    Interactions: delegates the control-character check to :func:`_bad_token`
    and matches against the module-level :data:`_UUID_RE` pattern; otherwise
    pure.

    Called by: :func:`run`, which uses it to validate ``vm_uuid``,
    ``host_uuid``, ``pool_uuid`` and ``snapshot_vm_uuid`` depending on the
    requested action.

    Args:
        u: The candidate UUID, or ``None``.

    Returns:
        ``True`` only if ``u`` is a string matching the canonical UUID format
        and passing the length and control-character checks; ``False`` for
        ``None`` or any malformed value.
    """
    if u is None:
        return False
    s = str(u).strip()
    if not s or len(s) > 64 or _bad_token(s):
        return False
    return bool(_UUID_RE.match(s))


def _snapshot_name_ok(name: str | None) -> bool:
    """Validate an optional snapshot label.

    Treats ``None`` as acceptable (the name is only required for
    ``snapshot_create``), but rejects an empty/whitespace string, an overly
    long one (> 256 chars), or one containing control characters.

    Interactions: delegates the control-character check to :func:`_bad_token`;
    otherwise pure.

    Called by: :func:`run`, which rejects the request with an "Invalid
    snapshot_name" error when this returns ``False``.

    Args:
        name: The candidate snapshot label, or ``None`` when not supplied.

    Returns:
        ``True`` when ``name`` is ``None`` or a valid label; ``False`` for a
        blank, too-long, or control-character-bearing string.
    """
    if name is None:
        return True
    s = str(name).strip()
    if not s or len(s) > 256 or _bad_token(s):
        return False
    return True


def _normalize_master_url(host: str) -> str:
    """Coerce a host string into a scheme-qualified pool-master URL.

    Leaves an explicit ``http://`` or ``https://`` URL intact (only trimming a
    trailing slash) and otherwise prefixes a bare hostname/IP with ``https://``,
    matching XenServer's default port-443 XML-RPC endpoint.

    Interactions: a pure string transform with no side effects; its output is
    handed to ``XenAPI.Session`` inside :func:`_xapi_dispatch`.

    Called by: :func:`run`, immediately before dispatching the XenAPI call.

    Args:
        host: A validated hostname, IP, or full URL.

    Returns:
        A normalized URL string with a scheme and no trailing slash.
    """
    h = host.strip()
    if h.startswith("http://") or h.startswith("https://"):
        return h.rstrip("/")
    return f"https://{h}".rstrip("/")


def _pool_ref_by_uuid(session: Any, pool_uuid: str) -> str:
    """Resolve a pool ``OpaqueRef`` from its UUID, with a scan fallback.

    Tries the fast ``pool.get_by_uuid`` lookup first; if that raises (some XAPI
    versions do not index pools by UUID), it falls back to enumerating every
    pool record and matching on the ``uuid`` field.

    Interactions: issues XenAPI XML-RPC calls over the live ``session`` —
    ``session.xenapi.pool.get_by_uuid`` and, on fallback, ``pool.get_all`` plus
    ``pool.get_record`` for each pool. It does not mutate pool state.

    Called by: :func:`_xapi_dispatch`'s ``pool_info`` branch, when an explicit
    ``pool_uuid`` was supplied.

    Args:
        session: An authenticated ``XenAPI.Session``.
        pool_uuid: The UUID of the pool to locate.

    Returns:
        The ``OpaqueRef`` string for the matching pool.

    Raises:
        XenAPI.Failure: Re-raised from the failed ``get_by_uuid`` if no pool in
            the fallback scan matches ``pool_uuid``.
    """
    x = session.xenapi.pool
    try:
        return x.get_by_uuid(pool_uuid)
    except Exception:
        for ref in x.get_all():
            rec = x.get_record(ref)
            if rec.get("uuid") == pool_uuid:
                return ref
        raise


def _summarize_pool(rec: dict[str, Any]) -> dict[str, Any]:
    """Project a raw XenAPI pool record into a compact, JSON-safe summary.

    Selects the operator-relevant fields (identity, HA flag, master and default
    SR refs, restrictions) and stringifies the ``master``/``default_SR``
    ``OpaqueRef`` values so the result serializes cleanly.

    Interactions: a pure dict transform; no XenAPI or I/O side effects.

    Called by: :func:`_xapi_dispatch` in both the ``list_pools`` and
    ``pool_info`` branches.

    Args:
        rec: A pool record dict as returned by ``pool.get_record``.

    Returns:
        A dict with ``uuid``, ``name_label``, ``name_description``,
        ``ha_enabled``, ``master``, ``default_SR`` and ``restrictions`` keys.
    """
    return {
        "uuid": rec.get("uuid"),
        "name_label": rec.get("name_label"),
        "name_description": rec.get("name_description"),
        "ha_enabled": rec.get("ha_enabled"),
        "master": str(rec.get("master")) if rec.get("master") else None,
        "default_SR": str(rec.get("default_SR")) if rec.get("default_SR") else None,
        "restrictions": rec.get("restrictions"),
    }


def _summarize_host(rec: dict[str, Any]) -> dict[str, Any]:
    """Project a raw XenAPI host record into a compact summary.

    Keeps the fields useful for identifying and triaging a pool member: UUID,
    label, hostname, management address, enabled flag, and power state.

    Interactions: a pure dict transform; no XenAPI or I/O side effects.

    Called by: :func:`_xapi_dispatch` in the ``list_hosts`` branch. (Full host
    detail in ``host_info`` returns the raw record instead.)

    Args:
        rec: A host record dict as returned by ``host.get_record``.

    Returns:
        A dict with ``uuid``, ``name_label``, ``hostname``, ``address``,
        ``enabled`` and ``power_state`` keys.
    """
    return {
        "uuid": rec.get("uuid"),
        "name_label": rec.get("name_label"),
        "hostname": rec.get("hostname"),
        "address": rec.get("address"),
        "enabled": rec.get("enabled"),
        "power_state": rec.get("power_state"),
    }


def _summarize_vm(
    session: Any,
    vm_ref: str,
    rec: dict[str, Any],
) -> dict[str, Any]:
    """Project a raw XenAPI VM record into a compact summary, resolving its host.

    Reports the VM's UUID, label, and power state, and translates the
    ``resident_on`` host ``OpaqueRef`` into a host UUID for readability. A null
    or unresolvable reference yields ``None`` for the host.

    Interactions: when the VM is resident on a host, issues a live
    ``session.xenapi.host.get_uuid`` XML-RPC call; failures are logged via
    ``logger.exception`` and degrade to a ``None`` host UUID rather than
    raising. No state is mutated.

    Called by: :func:`_xapi_dispatch` in the ``list_vms`` branch, once per VM
    after templates and control domains have been filtered out.

    Args:
        session: An authenticated ``XenAPI.Session`` used to resolve the host
            UUID.
        vm_ref: The VM's ``OpaqueRef`` (accepted for symmetry; not otherwise
            used by this function).
        rec: A VM record dict as returned by ``VM.get_record``.

    Returns:
        A dict with ``uuid``, ``name_label``, ``power_state`` and
        ``resident_on_host_uuid`` keys.
    """
    resident = rec.get("resident_on")
    host_uuid: str | None = None
    if resident and str(resident) != "OpaqueRef:NULL":
        try:
            host_uuid = session.xenapi.host.get_uuid(resident)
        except Exception:
            logger.exception("host get_uuid failed for resident_on")
            host_uuid = None
    return {
        "uuid": rec.get("uuid"),
        "name_label": rec.get("name_label"),
        "power_state": rec.get("power_state"),
        "resident_on_host_uuid": host_uuid,
    }


def _summarize_sr(rec: dict[str, Any]) -> dict[str, Any]:
    """Project a raw XenAPI storage-repository (SR) record into a compact summary.

    Keeps the fields needed to judge an SR's identity, kind, shared status, and
    capacity/usage in bytes.

    Interactions: a pure dict transform; no XenAPI or I/O side effects.

    Called by: :func:`_xapi_dispatch` in the ``list_srs`` branch.

    Args:
        rec: An SR record dict as returned by ``SR.get_record``.

    Returns:
        A dict with ``uuid``, ``name_label``, ``type``, ``content_type``,
        ``shared``, ``physical_size`` and ``physical_utilisation`` keys.
    """
    return {
        "uuid": rec.get("uuid"),
        "name_label": rec.get("name_label"),
        "type": rec.get("type"),
        "content_type": rec.get("content_type"),
        "shared": rec.get("shared"),
        "physical_size": rec.get("physical_size"),
        "physical_utilisation": rec.get("physical_utilisation"),
    }


def _xapi_dispatch(
    url: str,
    user: str,
    password: str,
    ignore_ssl: bool,
    action: str,
    pool_uuid: str | None,
    host_uuid: str | None,
    vm_uuid: str | None,
    snapshot_name: str | None,
    snapshot_vm_uuid: str | None,
) -> dict[str, Any]:
    """Open a XenAPI session and execute one validated pool/VM action synchronously.

    This is the blocking worker that performs the actual XML-RPC work: it logs
    in to the pool master, dispatches on ``action`` to read pool/host/VM/SR data
    or to drive a VM lifecycle/snapshot/migrate/destroy operation, and returns a
    plain result dict. All inputs are assumed already validated by :func:`run`.

    Interactions: imports the ``XenAPI`` package (returning a structured error if
    absent) and opens a ``XenAPI.Session`` against ``url``, authenticating with
    ``session.login_with_password``. Read actions call ``get_all``/``get_record``
    and the ``_summarize_*`` helpers (and :func:`_pool_ref_by_uuid` for
    ``pool_info``); mutating actions call XenAPI methods such as ``VM.start``,
    ``VM.clean_shutdown``, ``VM.hard_shutdown``, ``VM.clean_reboot``,
    ``VM.suspend``, ``VM.resume``, ``VM.snapshot``, ``VM.revert``, ``VM.destroy``
    and ``VM.pool_migrate`` — these change live infrastructure state. XenAPI,
    XML-RPC ``Fault``, network, and generic errors are caught and logged via
    ``logger.exception`` and converted to ``success: False`` dicts; the session
    is always logged out in a ``finally`` block.

    Called by: :func:`run`, which invokes it through ``asyncio.to_thread`` so the
    blocking XML-RPC I/O does not stall the event loop. (Tests patch
    ``tools.xenserver_tools._xapi_dispatch`` directly.)

    Args:
        url: Normalized pool-master URL produced by :func:`_normalize_master_url`.
        user: API username for ``login_with_password``.
        password: Account password for ``login_with_password``.
        ignore_ssl: When ``True``, TLS certificate verification is skipped
            (``XenAPI.Session(..., ignore_ssl=ignore_ssl)``); this is the
            inverse of the caller's ``verify_ssl``.
        action: One of the supported action names in :data:`_VALID_ACTIONS`.
        pool_uuid: Optional pool UUID for ``pool_info``.
        host_uuid: Host UUID for ``host_info`` or the destination of
            ``vm_pool_migrate``.
        vm_uuid: VM UUID for VM lifecycle, snapshot-create, migrate and destroy
            actions.
        snapshot_name: Label used when creating a snapshot.
        snapshot_vm_uuid: UUID of the snapshot VM for ``snapshot_revert`` and
            ``snapshot_delete``.

    Returns:
        A result dict that always carries a ``success`` boolean and ``action``
        (on success or error paths), plus action-specific payload keys such as
        ``pools``, ``hosts``, ``vms``, ``srs``, ``host``, ``pool`` or the
        affected ``vm_uuid``/``snapshot_vm_uuid``. Errors are returned as data
        rather than raised.
    """
    try:
        import XenAPI
    except ImportError:
        return {
            "success": False,
            "error": (
                "The XenAPI package is not installed. "
                "Add it to the environment: pip install XenAPI"
            ),
        }

    def with_session(fn: Callable[[Any], dict[str, Any]]) -> dict[str, Any]:
        """Run ``fn`` inside an authenticated XenAPI session, handling teardown and errors.

        Opens and logs in to a ``XenAPI.Session``, invokes ``fn(session)``, and
        guarantees logout in a ``finally`` block. All XenAPI/XML-RPC/network and
        unexpected exceptions are caught, logged, and returned as error dicts so
        the worker never propagates an exception to the caller.

        Interactions: constructs ``XenAPI.Session(url, ignore_ssl=...)`` and
        calls ``login_with_password``; on the way out calls ``session.logout``
        when a session token exists. Logs failures via ``logger.exception``.

        Called by: the enclosing :func:`_xapi_dispatch`, with the nested
        :func:`go` closure as ``fn``.

        Args:
            fn: A callable that takes the live session and returns a result dict.

        Returns:
            ``fn``\\ 's result dict on success, or a ``success: False`` error dict
            (tagged with ``action``) when login or execution fails.
        """
        session = XenAPI.Session(url, ignore_ssl=ignore_ssl)
        try:
            session.login_with_password(
                user.strip(),
                str(password),
                "1.0",
                "stargazer-v3-xenserver-tools",
            )
            return fn(session)
        except XenAPI.Failure as exc:
            logger.exception("xenserver_control XenAPI.Failure: %s", exc)
            return {"success": False, "error": str(exc), "action": action}
        except xmlrpclib.Fault as exc:
            logger.exception("xenserver_control xmlrpc Fault: %s", exc)
            return {"success": False, "error": str(exc), "action": action}
        except OSError as exc:
            logger.exception("xenserver_control network error: %s", exc)
            return {"success": False, "error": str(exc), "action": action}
        except Exception as exc:
            logger.exception("xenserver_control failed: %s", exc)
            return {"success": False, "error": str(exc), "action": action}
        finally:
            try:
                if getattr(session, "_session", None):
                    session.logout()
            except Exception:
                logger.exception("xenserver session logout failed")

    def go(session: Any) -> dict[str, Any]:
        """Dispatch the requested ``action`` against an open XenAPI session.

        Contains the per-action body of the tool: branches on the enclosing
        ``action`` to either gather summarized read data or perform a mutating
        VM/snapshot/migrate/destroy operation, validating that the
        action-specific UUIDs are present before acting.

        Interactions: drives ``session.xenapi`` directly — read branches call
        ``get_all``/``get_record`` and the ``_summarize_*`` helpers (plus
        :func:`_pool_ref_by_uuid` for ``pool_info``); write branches call live
        XenAPI methods (``VM.start``, ``VM.clean_shutdown``,
        ``VM.hard_shutdown``, ``VM.clean_reboot``, ``VM.suspend``,
        ``VM.resume``, ``VM.snapshot``, ``VM.revert``, ``VM.destroy``,
        ``VM.pool_migrate``) that change pool state. UUIDs are resolved to refs
        via ``get_by_uuid``.

        Called by: :func:`with_session`, which supplies the authenticated
        ``session`` and wraps this call in error handling and logout.

        Args:
            session: An authenticated ``XenAPI.Session``.

        Returns:
            A result dict carrying ``success`` and, where applicable, ``action``
            plus the action-specific payload or an inline validation/error
            message for an unsupported action or a missing required UUID.
        """
        x = session.xenapi

        if action == "list_pools":
            out: list[dict[str, Any]] = []
            for pref in x.pool.get_all():
                rec = x.pool.get_record(pref)
                out.append(_summarize_pool(rec))
            return {"success": True, "action": action, "pools": out}

        if action == "pool_info":
            if pool_uuid:
                pref = _pool_ref_by_uuid(session, pool_uuid)
            else:
                prefs = x.pool.get_all()
                if not prefs:
                    return {
                        "success": False,
                        "error": "No pools found.",
                        "action": action,
                    }
                pref = prefs[0]
            rec = x.pool.get_record(pref)
            return {
                "success": True,
                "action": action,
                "pool": _summarize_pool(rec),
                "pool_record_excerpt": {
                    k: rec.get(k)
                    for k in (
                        "ha_configuration",
                        "ha_host_failures_to_tolerate",
                        "ha_statefiles",
                        "tags",
                        "gui_config",
                        "cpu_info",
                        "ha_allow_overcommit",
                    )
                    if k in rec
                },
            }

        if action == "list_hosts":
            out = []
            for href in x.host.get_all():
                rec = x.host.get_record(href)
                out.append(_summarize_host(rec))
            return {"success": True, "action": action, "hosts": out}

        if action == "host_info":
            if not host_uuid:
                return {
                    "success": False,
                    "error": "host_uuid is required for host_info.",
                }
            href = x.host.get_by_uuid(host_uuid)
            rec = x.host.get_record(href)
            return {"success": True, "action": action, "host": rec}

        if action == "list_vms":
            out = []
            for vm_ref in x.VM.get_all():
                rec = x.VM.get_record(vm_ref)
                if rec.get("is_a_template") or rec.get("is_control_domain"):
                    continue
                out.append(_summarize_vm(session, vm_ref, rec))
            return {"success": True, "action": action, "vms": out}

        if action == "list_srs":
            out = []
            for sr_ref in x.SR.get_all():
                rec = x.SR.get_record(sr_ref)
                out.append(_summarize_sr(rec))
            return {"success": True, "action": action, "srs": out}

        if action == "snapshot_revert":
            if not snapshot_vm_uuid:
                return {
                    "success": False,
                    "error": "snapshot_vm_uuid is required for snapshot_revert.",
                }
            snap_ref = x.VM.get_by_uuid(snapshot_vm_uuid)
            x.VM.revert(snap_ref)
            return {
                "success": True,
                "action": action,
                "snapshot_vm_uuid": snapshot_vm_uuid,
            }

        if action == "snapshot_delete":
            if not snapshot_vm_uuid:
                return {
                    "success": False,
                    "error": "snapshot_vm_uuid is required for snapshot_delete.",
                }
            snap_ref = x.VM.get_by_uuid(snapshot_vm_uuid)
            x.VM.destroy(snap_ref)
            return {
                "success": True,
                "action": action,
                "snapshot_vm_uuid": snapshot_vm_uuid,
            }

        if not vm_uuid:
            return {
                "success": False,
                "error": f"vm_uuid is required for action '{action}'.",
            }

        vm_ref = x.VM.get_by_uuid(vm_uuid)

        if action == "vm_start":
            x.VM.start(vm_ref, False, False)
            return {"success": True, "action": action, "vm_uuid": vm_uuid}

        if action == "vm_clean_shutdown":
            x.VM.clean_shutdown(vm_ref)
            return {"success": True, "action": action, "vm_uuid": vm_uuid}

        if action == "vm_hard_shutdown":
            x.VM.hard_shutdown(vm_ref)
            return {"success": True, "action": action, "vm_uuid": vm_uuid}

        if action == "vm_reboot":
            x.VM.clean_reboot(vm_ref)
            return {"success": True, "action": action, "vm_uuid": vm_uuid}

        if action == "vm_suspend":
            x.VM.suspend(vm_ref)
            return {"success": True, "action": action, "vm_uuid": vm_uuid}

        if action == "vm_resume":
            x.VM.resume(vm_ref, False, False)
            return {"success": True, "action": action, "vm_uuid": vm_uuid}

        if action == "snapshot_create":
            snap = (snapshot_name or "").strip()
            if not snap:
                return {
                    "success": False,
                    "error": "snapshot_name is required for snapshot_create.",
                }
            snap_ref = x.VM.snapshot(vm_ref, snap)
            snap_uuid = x.VM.get_uuid(snap_ref)
            return {
                "success": True,
                "action": action,
                "vm_uuid": vm_uuid,
                "snapshot_vm_ref": str(snap_ref),
                "snapshot_vm_uuid": snap_uuid,
            }

        if action == "vm_pool_migrate":
            if not host_uuid:
                return {
                    "success": False,
                    "error": "host_uuid (destination) is required for vm_pool_migrate.",
                }
            dest_ref = x.host.get_by_uuid(host_uuid)
            x.VM.pool_migrate(vm_ref, dest_ref, {})
            return {
                "success": True,
                "action": action,
                "vm_uuid": vm_uuid,
                "destination_host_uuid": host_uuid,
            }

        if action == "vm_destroy":
            x.VM.destroy(vm_ref)
            return {"success": True, "action": action, "vm_uuid": vm_uuid}

        return {"success": False, "error": f"Unsupported action: {action}"}

    return with_session(go)


async def _xenserver_save_credentials(
    host: str,
    user: str,
    password: str,
    profile: str = "default",
    verify_ssl: bool = False,
    ctx: Any = None,
) -> str:
    """Persist XenServer pool-master credentials as an encrypted per-user profile.

    Backs the ``xenserver_save_credentials`` tool: after privilege and input
    validation, it stores host, user, password and ``verify_ssl`` under a named
    profile so later ``xenserver_control`` calls can reference them by
    ``credential_profile`` instead of re-sending secrets.

    Interactions: gates on :func:`_check_priv` (``UNSANDBOXED_EXEC``), validates
    inputs with :func:`_host_ok`/:func:`_user_ok`, then calls
    :func:`tools._credential_profile_store.save_profile`, which AES-GCM-encrypts
    the password and writes it into the Redis hash
    ``stargazer:xenserver_credentials:{user_id}`` under field ``profile``. The
    returned JSON echoes host/user/verify_ssl and ``has_password`` but never the
    plaintext password.

    Called by: the tool dispatcher via the ``handler`` entry for
    ``xenserver_save_credentials`` in this module's :data:`TOOLS` list; no direct
    internal callers.

    Args:
        host: Pool master hostname, IP, or https URL.
        user: API username.
        password: Account password (required; stored encrypted).
        profile: Profile name to save under. Defaults to ``"default"``.
        verify_ssl: Whether future connections using this profile should verify
            TLS. Defaults to ``False``.
        ctx: The ToolContext (supplies ``redis``, ``config``, ``user_id``);
            required.

    Returns:
        A JSON string. On success, ``success: True`` plus the stored host, user,
        ``verify_ssl`` and ``has_password: True``; otherwise ``success: False``
        with an error (missing context, missing privilege, or a validation
        failure).
    """
    if ctx is None:
        return json.dumps({"success": False, "error": "No context."})
    auth_err = await _check_priv(ctx, "xenserver_save_credentials")
    if auth_err:
        return auth_err
    if not _host_ok(host):
        return json.dumps({"success": False, "error": "Invalid or empty host."})
    if not _user_ok(user):
        return json.dumps({"success": False, "error": "Invalid or empty user."})
    if not password or str(password).strip() == "":
        return json.dumps({"success": False, "error": "password is required."})
    data = {
        "host": host.strip(),
        "user": user.strip(),
        "password": str(password),
        "verify_ssl": bool(verify_ssl),
    }
    raw = await _cred_save(CRED_PREFIX, profile, data, ctx)
    out = json.loads(raw)
    if out.get("success"):
        out["host"] = data["host"]
        out["user"] = data["user"]
        out["verify_ssl"] = data["verify_ssl"]
        out["has_password"] = True
    return json.dumps(out)


async def _xenserver_list_credentials(ctx: Any = None) -> str:
    """List the saved XenServer credential profile names for the current user.

    Backs the ``xenserver_list_credentials`` tool. Returns only profile names
    (never secrets) so an operator can see which saved targets are available.

    Interactions: gates on :func:`_check_priv` (``UNSANDBOXED_EXEC``), then calls
    :func:`tools._credential_profile_store.list_profile_names`, which reads the
    field names of the Redis hash ``stargazer:xenserver_credentials:{user_id}``.

    Called by: the tool dispatcher via the ``handler`` entry for
    ``xenserver_list_credentials`` in this module's :data:`TOOLS` list; no direct
    internal callers.

    Args:
        ctx: The ToolContext (supplies ``redis``/``user_id``); required.

    Returns:
        A JSON string: the profile-name listing on success, or ``success:
        False`` with an error when context or the privilege is missing.
    """
    if ctx is None:
        return json.dumps({"success": False, "error": "No context."})
    auth_err = await _check_priv(ctx, "xenserver_list_credentials")
    if auth_err:
        return auth_err
    return await _cred_list(CRED_PREFIX, ctx)


async def _xenserver_delete_credentials(
    profile: str = "default", ctx: Any = None
) -> str:
    """Delete one saved XenServer credential profile for the current user.

    Backs the ``xenserver_delete_credentials`` tool, removing a previously saved
    profile so it can no longer be referenced by ``credential_profile``.

    Interactions: gates on :func:`_check_priv` (``UNSANDBOXED_EXEC``), then calls
    :func:`tools._credential_profile_store.delete_profile`, which removes field
    ``profile`` from the Redis hash ``stargazer:xenserver_credentials:{user_id}``.

    Called by: the tool dispatcher via the ``handler`` entry for
    ``xenserver_delete_credentials`` in this module's :data:`TOOLS` list; no
    direct internal callers.

    Args:
        profile: Name of the profile to delete. Defaults to ``"default"``.
        ctx: The ToolContext (supplies ``redis``/``user_id``); required.

    Returns:
        A JSON string reporting the deletion outcome, or ``success: False`` when
        context or the privilege is missing.
    """
    if ctx is None:
        return json.dumps({"success": False, "error": "No context."})
    auth_err = await _check_priv(ctx, "xenserver_delete_credentials")
    if auth_err:
        return auth_err
    return await _cred_delete(CRED_PREFIX, profile, ctx)


[docs] async def run( host: str = "", user: str = "", password: str = "", action: str = "", verify_ssl: bool = False, pool_uuid: str | None = None, host_uuid: str | None = None, vm_uuid: str | None = None, sr_uuid: str | None = None, snapshot_name: str | None = None, snapshot_vm_uuid: str | None = None, credential_profile: str = "", ctx: Any = None, ) -> str: """Validate and execute one XenServer/XCP-ng pool action, returning JSON. Entry point for the ``xenserver_control`` tool. It enforces the ``UNSANDBOXED_EXEC`` gate, optionally hydrates host/user/password from a saved credential profile, validates the action and every supplied identifier, then runs the blocking XenAPI work off the event loop and serialises the result. A response that would exceed ``_MAX_JSON_CHARS`` is replaced with a "narrow the request" error so a huge pool dump cannot blow up the message pipeline. Interactions and side effects: gates via :func:`_check_priv`; when a ``credential_profile`` is given it loads and merges saved credentials through :func:`tools._credential_profile_store.load_profile` / :func:`tools._credential_profile_store.merge_profile` (kwargs override the profile). It validates the action against :data:`_VALID_ACTIONS` and the inputs with :func:`_host_ok`, :func:`_user_ok`, :func:`_uuid_ok`, and :func:`_snapshot_name_ok`, normalises the URL with :func:`_normalize_master_url`, and dispatches the actual XML-RPC call by running :func:`_xapi_dispatch` in a worker thread via :func:`asyncio.to_thread`. Mutating actions (VM start/shutdown/reboot/migrate/ destroy, snapshot create/revert/delete) change live infrastructure; read actions only fetch data. Reads ``redis``/``config``/``user_id`` off ``ctx``. Dispatched by the tool loader via the ``xenserver_control`` ``handler`` entry in this module's :data:`TOOLS` list. Args: host (str): Pool master hostname, IP, or URL; optional when ``credential_profile`` supplies it. user (str): XenAPI username; optional when supplied by a profile. password (str): Account password; optional when supplied by a profile. action (str): The operation to perform; must be one of :data:`_VALID_ACTIONS`. verify_ssl (bool): When ``True``, verify TLS; defaults to ``False`` for self-signed pool certs (mapped to ``ignore_ssl`` for XenAPI). pool_uuid (str | None): Optional pool UUID for ``pool_info``. host_uuid (str | None): Host UUID for ``host_info`` or the ``vm_pool_migrate`` destination. vm_uuid (str | None): VM UUID for VM lifecycle, snapshot-create, migrate, and destroy actions. sr_uuid (str | None): Reserved for future SR-scoped actions; currently ignored. snapshot_name (str | None): Label used by ``snapshot_create``. snapshot_vm_uuid (str | None): Snapshot VM UUID for ``snapshot_revert`` and ``snapshot_delete``. credential_profile (str): Name of a saved profile to load credentials from; explicit kwargs override the loaded values. ctx: The ToolContext supplying ``redis``, ``config``, and ``user_id``; required. Returns: str: A JSON string with the action result (always carrying ``success``), or a ``success: false`` error for a missing context, denied privilege, invalid action/identifier, or an oversized (truncated) response. """ del sr_uuid # reserved for future SR-scoped actions if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx) if auth_err: return auth_err if credential_profile and str(credential_profile).strip(): loaded = await _cred_load(CRED_PREFIX, credential_profile.strip(), ctx) if isinstance(loaded, str): return json.dumps({"success": False, "error": loaded}) merged = _cred_merge( loaded, { "host": host, "user": user, "password": password, "verify_ssl": verify_ssl, }, ) host = str(merged.get("host") or "") user = str(merged.get("user") or "") password = str(merged.get("password") or "") if "verify_ssl" in merged: verify_ssl = bool(merged["verify_ssl"]) act = (action or "").strip() if act not in _VALID_ACTIONS: return json.dumps( { "success": False, "error": f"Invalid action. Must be one of: {', '.join(sorted(_VALID_ACTIONS))}.", } ) if not _host_ok(host): return json.dumps({"success": False, "error": "Invalid or empty host."}) if not _user_ok(user): return json.dumps({"success": False, "error": "Invalid or empty user."}) if password is None or str(password) == "": return json.dumps({"success": False, "error": "password is required."}) if not _snapshot_name_ok(snapshot_name): return json.dumps({"success": False, "error": "Invalid snapshot_name."}) need_vm = act in { "vm_start", "vm_clean_shutdown", "vm_hard_shutdown", "vm_reboot", "vm_suspend", "vm_resume", "snapshot_create", "vm_pool_migrate", "vm_destroy", } if need_vm and not _uuid_ok(vm_uuid): return json.dumps( {"success": False, "error": "vm_uuid is required and must be a valid UUID."} ) if act == "host_info" and not _uuid_ok(host_uuid): return json.dumps( { "success": False, "error": "host_uuid is required and must be a valid UUID.", } ) if ( act == "pool_info" and pool_uuid is not None and str(pool_uuid).strip() and not _uuid_ok(pool_uuid) ): return json.dumps( {"success": False, "error": "pool_uuid must be a valid UUID when provided."} ) if act in {"snapshot_revert", "snapshot_delete"} and not _uuid_ok(snapshot_vm_uuid): return json.dumps( { "success": False, "error": "snapshot_vm_uuid is required and must be a valid UUID.", } ) if act == "vm_pool_migrate": if not _uuid_ok(host_uuid): return json.dumps( { "success": False, "error": "host_uuid (destination) is required and must be a valid UUID.", } ) url = _normalize_master_url(host) ignore_ssl = not verify_ssl def _strip_opt(s: str | None) -> str | None: """Normalize an optional UUID argument to a trimmed string or ``None``. Strips surrounding whitespace and collapses empty/whitespace-only values to ``None`` so :func:`_xapi_dispatch` receives ``None`` rather than a blank string for unset identifiers. Interactions: pure; no side effects. Called by the enclosing :func:`run` when marshalling ``pool_uuid``, ``host_uuid``, ``vm_uuid`` and ``snapshot_vm_uuid`` for the dispatch call. Args: s: The raw optional string, or ``None``. Returns: The trimmed string, or ``None`` if it was empty/whitespace/``None``. """ t = (s or "").strip() return t or None result = await asyncio.to_thread( _xapi_dispatch, url, user, password, ignore_ssl, act, _strip_opt(pool_uuid), _strip_opt(host_uuid), _strip_opt(vm_uuid), snapshot_name, _strip_opt(snapshot_vm_uuid), ) out = json.dumps(result, default=str) if len(out) > _MAX_JSON_CHARS: return json.dumps( { "success": result.get("success", False), "error": ( "Response exceeded size limit; narrow the request " "(e.g. filter by pool or use host_info / pool_info)." ), "truncated": True, } ) return out
_CONTROL_PARAMETERS = { "type": "object", "properties": { "host": { "type": "string", "description": ( "Pool master hostname or IP, or full URL (optional if credential_profile set)." ), }, "user": { "type": "string", "description": "API user (optional if credential_profile set).", }, "password": { "type": "string", "description": "Account password (optional if credential_profile set).", }, "credential_profile": { "type": "string", "description": "Load host/user/password from this saved profile; kwargs override.", "default": "", }, "verify_ssl": { "type": "boolean", "description": "If true, verify TLS certificates; if false (default), allow self-signed.", }, "action": { "type": "string", "description": "Operation to perform.", "enum": sorted(_VALID_ACTIONS), }, "pool_uuid": { "type": "string", "description": "Pool UUID — optional for pool_info (defaults to the only/first pool).", }, "host_uuid": { "type": "string", "description": "Host UUID — required for host_info and vm_pool_migrate (destination).", }, "vm_uuid": { "type": "string", "description": "VM UUID — required for VM lifecycle, snapshots, migrate, destroy.", }, "sr_uuid": { "type": "string", "description": "Reserved for future use (e.g. filtered SR ops).", }, "snapshot_name": { "type": "string", "description": "Label for snapshot_create.", }, "snapshot_vm_uuid": { "type": "string", "description": "UUID of the snapshot VM — for snapshot_revert and snapshot_delete.", }, }, "required": ["action"], } TOOLS = [ { "name": "xenserver_save_credentials", "description": ( "Save XenServer/XCP-ng pool master credentials (host, user, password, verify_ssl) " "encrypted per-user. Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "host": { "type": "string", "description": "Pool master hostname or IP or https URL.", }, "user": {"type": "string", "description": "API user."}, "password": {"type": "string", "description": "Password."}, "profile": { "type": "string", "description": "Profile name.", "default": "default", }, "verify_ssl": { "type": "boolean", "description": "Verify TLS", "default": False, }, }, "required": ["host", "user", "password"], }, "handler": _xenserver_save_credentials, }, { "name": "xenserver_list_credentials", "description": ( "List saved XenServer profile names for this user. Requires UNSANDBOXED_EXEC." ), "parameters": {"type": "object", "properties": {}}, "handler": _xenserver_list_credentials, }, { "name": "xenserver_delete_credentials", "description": "Delete a saved XenServer credential profile. Requires UNSANDBOXED_EXEC.", "parameters": { "type": "object", "properties": { "profile": { "type": "string", "description": "Profile name.", "default": "default", }, }, "required": ["profile"], }, "handler": _xenserver_delete_credentials, }, { "name": "xenserver_control", "description": ( "Manage XenServer or XCP-ng pools via XenAPI (XML-RPC over HTTPS to the pool master): " "pools, hosts, SRs, VM lifecycle, snapshots, pool migrate, destroy VM. " "Requires UNSANDBOXED_EXEC. " "Use credential_profile to load saved auth or pass host/user/password. " "Target VMs and hosts by UUID. " "For snapshot_revert and snapshot_delete, use snapshot_vm_uuid (UUID of the snapshot VM). " "verify_ssl defaults to false for self-signed certificates." ), "parameters": _CONTROL_PARAMETERS, "handler": run, }, ]