"""XenServer / XCP-ng pool management via the XenAPI XML-RPC (HTTPS).
Uses the ``XenAPI`` PyPI package (see ``requirements.txt``). Connects to the
pool master on port 443 by default. Works with XenServer and XCP-ng (same XAPI).
**Security:** requires ``UNSANDBOXED_EXEC`` — controls foundational infrastructure.
TLS verification is off by default (``verify_ssl=false``); typical self-signed
pool certs. Use only on trusted networks or terminate TLS elsewhere.
"""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
import re
import xmlrpc.client as xmlrpclib
from typing import Any, Callable
from tools._credential_profile_store import (
delete_profile as _cred_delete,
list_profile_names as _cred_list,
load_profile as _cred_load,
merge_profile as _cred_merge,
save_profile as _cred_save,
)
logger = logging.getLogger(__name__)
CRED_PREFIX = "xenserver"
_MAX_JSON_CHARS = 500_000
_VALID_ACTIONS = frozenset(
{
"list_pools",
"pool_info",
"list_hosts",
"host_info",
"list_vms",
"vm_start",
"vm_clean_shutdown",
"vm_hard_shutdown",
"vm_reboot",
"vm_suspend",
"vm_resume",
"snapshot_create",
"snapshot_revert",
"snapshot_delete",
"vm_pool_migrate",
"list_srs",
"vm_destroy",
}
)
_UUID_RE = re.compile(
r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$",
)
async def _check_priv(ctx: Any, tool_name: str = "xenserver_control") -> str | None:
"""Enforce the ``UNSANDBOXED_EXEC`` privilege gate for XenServer tools.
Verifies that the calling user holds the dangerous ``UNSANDBOXED_EXEC``
privilege before any XenAPI or credential operation runs. This is the
single security chokepoint every handler in this module passes through,
since these tools control foundational virtualization infrastructure.
Interactions: imports :data:`tools.alter_privileges.PRIVILEGES` and calls
:func:`tools.alter_privileges.has_privilege`, passing ``ctx.redis``,
``ctx.user_id`` and ``ctx.config`` so the privilege bitmask can be read
from the user's stored grants. On denial it logs a ``SECURITY`` warning
naming the user and tool and returns a JSON error string; it never raises
on a missing privilege.
Called by: every public handler in this module —
:func:`_xenserver_save_credentials`, :func:`_xenserver_list_credentials`,
:func:`_xenserver_delete_credentials` (each passing its own ``tool_name``)
and :func:`run` (using the default ``"xenserver_control"``).
Args:
ctx: The ToolContext for the invocation; supplies ``redis``, ``config``
and ``user_id`` attributes used for the privilege lookup.
tool_name: Name recorded in the security warning log line, identifying
which tool the user attempted. Defaults to ``"xenserver_control"``.
Returns:
``None`` when the user is authorized, otherwise a JSON string with
``success: False`` and an error explaining the missing privilege (or
that the privilege system itself is unavailable). A non-``None`` return
signals callers to short-circuit and return that error to the user.
"""
try:
from tools.alter_privileges import PRIVILEGES, has_privilege
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
user_id = getattr(ctx, "user_id", "") or ""
if not await has_privilege(
redis,
user_id,
PRIVILEGES["UNSANDBOXED_EXEC"],
config,
):
logger.warning(
"SECURITY: User %s attempted %s without UNSANDBOXED_EXEC",
user_id,
tool_name,
)
return json.dumps(
{
"success": False,
"error": (
"The user does not have the UNSANDBOXED_EXEC privilege. "
"Ask an admin to grant it with the alter_privileges tool."
),
}
)
except ImportError:
return json.dumps({"success": False, "error": "Privilege system unavailable."})
return None
def _bad_token(s: str) -> bool:
"""Report whether a string contains control characters that could enable injection.
Returns ``True`` if the string holds a newline, carriage return, or NUL
byte — characters that have no place in a hostname, username, or UUID and
could break out of the XML-RPC request line or smuggle extra data.
Interactions: a pure predicate with no side effects.
Called by: the per-field validators :func:`_host_ok`, :func:`_user_ok`,
:func:`_uuid_ok` and :func:`_snapshot_name_ok` in this module.
Args:
s: The candidate token to inspect.
Returns:
``True`` if ``s`` contains any of ``\\n``, ``\\r`` or ``\\x00``;
``False`` for an empty string or one with no such characters.
"""
if not s:
return False
return any(c in s for c in "\n\r\x00")
def _host_ok(host: str) -> bool:
"""Validate a pool-master host string before it is turned into a URL.
Accepts a non-empty, reasonably short (<= 512 char) hostname, IP, or URL
that contains no control characters. This guards the value that
:func:`_normalize_master_url` later passes to ``XenAPI.Session``.
Interactions: strips the input and delegates the control-character check to
:func:`_bad_token`; otherwise pure.
Called by: :func:`run` and :func:`_xenserver_save_credentials`, each of
which rejects the request with an "Invalid or empty host" error when this
returns ``False``.
Args:
host: The candidate host string (hostname, IP, or ``http(s)://`` URL).
Returns:
``True`` if the trimmed value is non-empty, at most 512 characters, and
free of control characters; ``False`` otherwise.
"""
h = (host or "").strip()
if not h or len(h) > 512 or _bad_token(h):
return False
return True
def _user_ok(user: str) -> bool:
"""Validate the XenAPI login user before it is used to authenticate.
Accepts a non-empty, reasonably short (<= 256 char) username free of
control characters, guarding the value passed to
``session.login_with_password`` inside :func:`_xapi_dispatch`.
Interactions: strips the input and delegates the control-character check to
:func:`_bad_token`; otherwise pure.
Called by: :func:`run` and :func:`_xenserver_save_credentials`, each of
which rejects the request with an "Invalid or empty user" error when this
returns ``False``.
Args:
user: The candidate API username.
Returns:
``True`` if the trimmed value is non-empty, at most 256 characters, and
free of control characters; ``False`` otherwise.
"""
u = (user or "").strip()
if not u or len(u) > 256 or _bad_token(u):
return False
return True
def _uuid_ok(u: str | None) -> bool:
"""Validate that a value is a well-formed XenServer object UUID.
Requires a non-empty, short (<= 64 char), control-character-free string
that matches the canonical ``8-4-4-4-12`` hyphenated UUID shape. Used to
ensure VM, host, pool, and snapshot identifiers are syntactically valid
before they reach ``get_by_uuid`` calls in :func:`_xapi_dispatch`.
Interactions: delegates the control-character check to :func:`_bad_token`
and matches against the module-level :data:`_UUID_RE` pattern; otherwise
pure.
Called by: :func:`run`, which uses it to validate ``vm_uuid``,
``host_uuid``, ``pool_uuid`` and ``snapshot_vm_uuid`` depending on the
requested action.
Args:
u: The candidate UUID, or ``None``.
Returns:
``True`` only if ``u`` is a string matching the canonical UUID format
and passing the length and control-character checks; ``False`` for
``None`` or any malformed value.
"""
if u is None:
return False
s = str(u).strip()
if not s or len(s) > 64 or _bad_token(s):
return False
return bool(_UUID_RE.match(s))
def _snapshot_name_ok(name: str | None) -> bool:
"""Validate an optional snapshot label.
Treats ``None`` as acceptable (the name is only required for
``snapshot_create``), but rejects an empty/whitespace string, an overly
long one (> 256 chars), or one containing control characters.
Interactions: delegates the control-character check to :func:`_bad_token`;
otherwise pure.
Called by: :func:`run`, which rejects the request with an "Invalid
snapshot_name" error when this returns ``False``.
Args:
name: The candidate snapshot label, or ``None`` when not supplied.
Returns:
``True`` when ``name`` is ``None`` or a valid label; ``False`` for a
blank, too-long, or control-character-bearing string.
"""
if name is None:
return True
s = str(name).strip()
if not s or len(s) > 256 or _bad_token(s):
return False
return True
def _normalize_master_url(host: str) -> str:
"""Coerce a host string into a scheme-qualified pool-master URL.
Leaves an explicit ``http://`` or ``https://`` URL intact (only trimming a
trailing slash) and otherwise prefixes a bare hostname/IP with ``https://``,
matching XenServer's default port-443 XML-RPC endpoint.
Interactions: a pure string transform with no side effects; its output is
handed to ``XenAPI.Session`` inside :func:`_xapi_dispatch`.
Called by: :func:`run`, immediately before dispatching the XenAPI call.
Args:
host: A validated hostname, IP, or full URL.
Returns:
A normalized URL string with a scheme and no trailing slash.
"""
h = host.strip()
if h.startswith("http://") or h.startswith("https://"):
return h.rstrip("/")
return f"https://{h}".rstrip("/")
def _pool_ref_by_uuid(session: Any, pool_uuid: str) -> str:
"""Resolve a pool ``OpaqueRef`` from its UUID, with a scan fallback.
Tries the fast ``pool.get_by_uuid`` lookup first; if that raises (some XAPI
versions do not index pools by UUID), it falls back to enumerating every
pool record and matching on the ``uuid`` field.
Interactions: issues XenAPI XML-RPC calls over the live ``session`` —
``session.xenapi.pool.get_by_uuid`` and, on fallback, ``pool.get_all`` plus
``pool.get_record`` for each pool. It does not mutate pool state.
Called by: :func:`_xapi_dispatch`'s ``pool_info`` branch, when an explicit
``pool_uuid`` was supplied.
Args:
session: An authenticated ``XenAPI.Session``.
pool_uuid: The UUID of the pool to locate.
Returns:
The ``OpaqueRef`` string for the matching pool.
Raises:
XenAPI.Failure: Re-raised from the failed ``get_by_uuid`` if no pool in
the fallback scan matches ``pool_uuid``.
"""
x = session.xenapi.pool
try:
return x.get_by_uuid(pool_uuid)
except Exception:
for ref in x.get_all():
rec = x.get_record(ref)
if rec.get("uuid") == pool_uuid:
return ref
raise
def _summarize_pool(rec: dict[str, Any]) -> dict[str, Any]:
"""Project a raw XenAPI pool record into a compact, JSON-safe summary.
Selects the operator-relevant fields (identity, HA flag, master and default
SR refs, restrictions) and stringifies the ``master``/``default_SR``
``OpaqueRef`` values so the result serializes cleanly.
Interactions: a pure dict transform; no XenAPI or I/O side effects.
Called by: :func:`_xapi_dispatch` in both the ``list_pools`` and
``pool_info`` branches.
Args:
rec: A pool record dict as returned by ``pool.get_record``.
Returns:
A dict with ``uuid``, ``name_label``, ``name_description``,
``ha_enabled``, ``master``, ``default_SR`` and ``restrictions`` keys.
"""
return {
"uuid": rec.get("uuid"),
"name_label": rec.get("name_label"),
"name_description": rec.get("name_description"),
"ha_enabled": rec.get("ha_enabled"),
"master": str(rec.get("master")) if rec.get("master") else None,
"default_SR": str(rec.get("default_SR")) if rec.get("default_SR") else None,
"restrictions": rec.get("restrictions"),
}
def _summarize_host(rec: dict[str, Any]) -> dict[str, Any]:
"""Project a raw XenAPI host record into a compact summary.
Keeps the fields useful for identifying and triaging a pool member: UUID,
label, hostname, management address, enabled flag, and power state.
Interactions: a pure dict transform; no XenAPI or I/O side effects.
Called by: :func:`_xapi_dispatch` in the ``list_hosts`` branch. (Full host
detail in ``host_info`` returns the raw record instead.)
Args:
rec: A host record dict as returned by ``host.get_record``.
Returns:
A dict with ``uuid``, ``name_label``, ``hostname``, ``address``,
``enabled`` and ``power_state`` keys.
"""
return {
"uuid": rec.get("uuid"),
"name_label": rec.get("name_label"),
"hostname": rec.get("hostname"),
"address": rec.get("address"),
"enabled": rec.get("enabled"),
"power_state": rec.get("power_state"),
}
def _summarize_vm(
session: Any,
vm_ref: str,
rec: dict[str, Any],
) -> dict[str, Any]:
"""Project a raw XenAPI VM record into a compact summary, resolving its host.
Reports the VM's UUID, label, and power state, and translates the
``resident_on`` host ``OpaqueRef`` into a host UUID for readability. A null
or unresolvable reference yields ``None`` for the host.
Interactions: when the VM is resident on a host, issues a live
``session.xenapi.host.get_uuid`` XML-RPC call; failures are logged via
``logger.exception`` and degrade to a ``None`` host UUID rather than
raising. No state is mutated.
Called by: :func:`_xapi_dispatch` in the ``list_vms`` branch, once per VM
after templates and control domains have been filtered out.
Args:
session: An authenticated ``XenAPI.Session`` used to resolve the host
UUID.
vm_ref: The VM's ``OpaqueRef`` (accepted for symmetry; not otherwise
used by this function).
rec: A VM record dict as returned by ``VM.get_record``.
Returns:
A dict with ``uuid``, ``name_label``, ``power_state`` and
``resident_on_host_uuid`` keys.
"""
resident = rec.get("resident_on")
host_uuid: str | None = None
if resident and str(resident) != "OpaqueRef:NULL":
try:
host_uuid = session.xenapi.host.get_uuid(resident)
except Exception:
logger.exception("host get_uuid failed for resident_on")
host_uuid = None
return {
"uuid": rec.get("uuid"),
"name_label": rec.get("name_label"),
"power_state": rec.get("power_state"),
"resident_on_host_uuid": host_uuid,
}
def _summarize_sr(rec: dict[str, Any]) -> dict[str, Any]:
"""Project a raw XenAPI storage-repository (SR) record into a compact summary.
Keeps the fields needed to judge an SR's identity, kind, shared status, and
capacity/usage in bytes.
Interactions: a pure dict transform; no XenAPI or I/O side effects.
Called by: :func:`_xapi_dispatch` in the ``list_srs`` branch.
Args:
rec: An SR record dict as returned by ``SR.get_record``.
Returns:
A dict with ``uuid``, ``name_label``, ``type``, ``content_type``,
``shared``, ``physical_size`` and ``physical_utilisation`` keys.
"""
return {
"uuid": rec.get("uuid"),
"name_label": rec.get("name_label"),
"type": rec.get("type"),
"content_type": rec.get("content_type"),
"shared": rec.get("shared"),
"physical_size": rec.get("physical_size"),
"physical_utilisation": rec.get("physical_utilisation"),
}
def _xapi_dispatch(
url: str,
user: str,
password: str,
ignore_ssl: bool,
action: str,
pool_uuid: str | None,
host_uuid: str | None,
vm_uuid: str | None,
snapshot_name: str | None,
snapshot_vm_uuid: str | None,
) -> dict[str, Any]:
"""Open a XenAPI session and execute one validated pool/VM action synchronously.
This is the blocking worker that performs the actual XML-RPC work: it logs
in to the pool master, dispatches on ``action`` to read pool/host/VM/SR data
or to drive a VM lifecycle/snapshot/migrate/destroy operation, and returns a
plain result dict. All inputs are assumed already validated by :func:`run`.
Interactions: imports the ``XenAPI`` package (returning a structured error if
absent) and opens a ``XenAPI.Session`` against ``url``, authenticating with
``session.login_with_password``. Read actions call ``get_all``/``get_record``
and the ``_summarize_*`` helpers (and :func:`_pool_ref_by_uuid` for
``pool_info``); mutating actions call XenAPI methods such as ``VM.start``,
``VM.clean_shutdown``, ``VM.hard_shutdown``, ``VM.clean_reboot``,
``VM.suspend``, ``VM.resume``, ``VM.snapshot``, ``VM.revert``, ``VM.destroy``
and ``VM.pool_migrate`` — these change live infrastructure state. XenAPI,
XML-RPC ``Fault``, network, and generic errors are caught and logged via
``logger.exception`` and converted to ``success: False`` dicts; the session
is always logged out in a ``finally`` block.
Called by: :func:`run`, which invokes it through ``asyncio.to_thread`` so the
blocking XML-RPC I/O does not stall the event loop. (Tests patch
``tools.xenserver_tools._xapi_dispatch`` directly.)
Args:
url: Normalized pool-master URL produced by :func:`_normalize_master_url`.
user: API username for ``login_with_password``.
password: Account password for ``login_with_password``.
ignore_ssl: When ``True``, TLS certificate verification is skipped
(``XenAPI.Session(..., ignore_ssl=ignore_ssl)``); this is the
inverse of the caller's ``verify_ssl``.
action: One of the supported action names in :data:`_VALID_ACTIONS`.
pool_uuid: Optional pool UUID for ``pool_info``.
host_uuid: Host UUID for ``host_info`` or the destination of
``vm_pool_migrate``.
vm_uuid: VM UUID for VM lifecycle, snapshot-create, migrate and destroy
actions.
snapshot_name: Label used when creating a snapshot.
snapshot_vm_uuid: UUID of the snapshot VM for ``snapshot_revert`` and
``snapshot_delete``.
Returns:
A result dict that always carries a ``success`` boolean and ``action``
(on success or error paths), plus action-specific payload keys such as
``pools``, ``hosts``, ``vms``, ``srs``, ``host``, ``pool`` or the
affected ``vm_uuid``/``snapshot_vm_uuid``. Errors are returned as data
rather than raised.
"""
try:
import XenAPI
except ImportError:
return {
"success": False,
"error": (
"The XenAPI package is not installed. "
"Add it to the environment: pip install XenAPI"
),
}
def with_session(fn: Callable[[Any], dict[str, Any]]) -> dict[str, Any]:
"""Run ``fn`` inside an authenticated XenAPI session, handling teardown and errors.
Opens and logs in to a ``XenAPI.Session``, invokes ``fn(session)``, and
guarantees logout in a ``finally`` block. All XenAPI/XML-RPC/network and
unexpected exceptions are caught, logged, and returned as error dicts so
the worker never propagates an exception to the caller.
Interactions: constructs ``XenAPI.Session(url, ignore_ssl=...)`` and
calls ``login_with_password``; on the way out calls ``session.logout``
when a session token exists. Logs failures via ``logger.exception``.
Called by: the enclosing :func:`_xapi_dispatch`, with the nested
:func:`go` closure as ``fn``.
Args:
fn: A callable that takes the live session and returns a result dict.
Returns:
``fn``\\ 's result dict on success, or a ``success: False`` error dict
(tagged with ``action``) when login or execution fails.
"""
session = XenAPI.Session(url, ignore_ssl=ignore_ssl)
try:
session.login_with_password(
user.strip(),
str(password),
"1.0",
"stargazer-v3-xenserver-tools",
)
return fn(session)
except XenAPI.Failure as exc:
logger.exception("xenserver_control XenAPI.Failure: %s", exc)
return {"success": False, "error": str(exc), "action": action}
except xmlrpclib.Fault as exc:
logger.exception("xenserver_control xmlrpc Fault: %s", exc)
return {"success": False, "error": str(exc), "action": action}
except OSError as exc:
logger.exception("xenserver_control network error: %s", exc)
return {"success": False, "error": str(exc), "action": action}
except Exception as exc:
logger.exception("xenserver_control failed: %s", exc)
return {"success": False, "error": str(exc), "action": action}
finally:
try:
if getattr(session, "_session", None):
session.logout()
except Exception:
logger.exception("xenserver session logout failed")
def go(session: Any) -> dict[str, Any]:
"""Dispatch the requested ``action`` against an open XenAPI session.
Contains the per-action body of the tool: branches on the enclosing
``action`` to either gather summarized read data or perform a mutating
VM/snapshot/migrate/destroy operation, validating that the
action-specific UUIDs are present before acting.
Interactions: drives ``session.xenapi`` directly — read branches call
``get_all``/``get_record`` and the ``_summarize_*`` helpers (plus
:func:`_pool_ref_by_uuid` for ``pool_info``); write branches call live
XenAPI methods (``VM.start``, ``VM.clean_shutdown``,
``VM.hard_shutdown``, ``VM.clean_reboot``, ``VM.suspend``,
``VM.resume``, ``VM.snapshot``, ``VM.revert``, ``VM.destroy``,
``VM.pool_migrate``) that change pool state. UUIDs are resolved to refs
via ``get_by_uuid``.
Called by: :func:`with_session`, which supplies the authenticated
``session`` and wraps this call in error handling and logout.
Args:
session: An authenticated ``XenAPI.Session``.
Returns:
A result dict carrying ``success`` and, where applicable, ``action``
plus the action-specific payload or an inline validation/error
message for an unsupported action or a missing required UUID.
"""
x = session.xenapi
if action == "list_pools":
out: list[dict[str, Any]] = []
for pref in x.pool.get_all():
rec = x.pool.get_record(pref)
out.append(_summarize_pool(rec))
return {"success": True, "action": action, "pools": out}
if action == "pool_info":
if pool_uuid:
pref = _pool_ref_by_uuid(session, pool_uuid)
else:
prefs = x.pool.get_all()
if not prefs:
return {
"success": False,
"error": "No pools found.",
"action": action,
}
pref = prefs[0]
rec = x.pool.get_record(pref)
return {
"success": True,
"action": action,
"pool": _summarize_pool(rec),
"pool_record_excerpt": {
k: rec.get(k)
for k in (
"ha_configuration",
"ha_host_failures_to_tolerate",
"ha_statefiles",
"tags",
"gui_config",
"cpu_info",
"ha_allow_overcommit",
)
if k in rec
},
}
if action == "list_hosts":
out = []
for href in x.host.get_all():
rec = x.host.get_record(href)
out.append(_summarize_host(rec))
return {"success": True, "action": action, "hosts": out}
if action == "host_info":
if not host_uuid:
return {
"success": False,
"error": "host_uuid is required for host_info.",
}
href = x.host.get_by_uuid(host_uuid)
rec = x.host.get_record(href)
return {"success": True, "action": action, "host": rec}
if action == "list_vms":
out = []
for vm_ref in x.VM.get_all():
rec = x.VM.get_record(vm_ref)
if rec.get("is_a_template") or rec.get("is_control_domain"):
continue
out.append(_summarize_vm(session, vm_ref, rec))
return {"success": True, "action": action, "vms": out}
if action == "list_srs":
out = []
for sr_ref in x.SR.get_all():
rec = x.SR.get_record(sr_ref)
out.append(_summarize_sr(rec))
return {"success": True, "action": action, "srs": out}
if action == "snapshot_revert":
if not snapshot_vm_uuid:
return {
"success": False,
"error": "snapshot_vm_uuid is required for snapshot_revert.",
}
snap_ref = x.VM.get_by_uuid(snapshot_vm_uuid)
x.VM.revert(snap_ref)
return {
"success": True,
"action": action,
"snapshot_vm_uuid": snapshot_vm_uuid,
}
if action == "snapshot_delete":
if not snapshot_vm_uuid:
return {
"success": False,
"error": "snapshot_vm_uuid is required for snapshot_delete.",
}
snap_ref = x.VM.get_by_uuid(snapshot_vm_uuid)
x.VM.destroy(snap_ref)
return {
"success": True,
"action": action,
"snapshot_vm_uuid": snapshot_vm_uuid,
}
if not vm_uuid:
return {
"success": False,
"error": f"vm_uuid is required for action '{action}'.",
}
vm_ref = x.VM.get_by_uuid(vm_uuid)
if action == "vm_start":
x.VM.start(vm_ref, False, False)
return {"success": True, "action": action, "vm_uuid": vm_uuid}
if action == "vm_clean_shutdown":
x.VM.clean_shutdown(vm_ref)
return {"success": True, "action": action, "vm_uuid": vm_uuid}
if action == "vm_hard_shutdown":
x.VM.hard_shutdown(vm_ref)
return {"success": True, "action": action, "vm_uuid": vm_uuid}
if action == "vm_reboot":
x.VM.clean_reboot(vm_ref)
return {"success": True, "action": action, "vm_uuid": vm_uuid}
if action == "vm_suspend":
x.VM.suspend(vm_ref)
return {"success": True, "action": action, "vm_uuid": vm_uuid}
if action == "vm_resume":
x.VM.resume(vm_ref, False, False)
return {"success": True, "action": action, "vm_uuid": vm_uuid}
if action == "snapshot_create":
snap = (snapshot_name or "").strip()
if not snap:
return {
"success": False,
"error": "snapshot_name is required for snapshot_create.",
}
snap_ref = x.VM.snapshot(vm_ref, snap)
snap_uuid = x.VM.get_uuid(snap_ref)
return {
"success": True,
"action": action,
"vm_uuid": vm_uuid,
"snapshot_vm_ref": str(snap_ref),
"snapshot_vm_uuid": snap_uuid,
}
if action == "vm_pool_migrate":
if not host_uuid:
return {
"success": False,
"error": "host_uuid (destination) is required for vm_pool_migrate.",
}
dest_ref = x.host.get_by_uuid(host_uuid)
x.VM.pool_migrate(vm_ref, dest_ref, {})
return {
"success": True,
"action": action,
"vm_uuid": vm_uuid,
"destination_host_uuid": host_uuid,
}
if action == "vm_destroy":
x.VM.destroy(vm_ref)
return {"success": True, "action": action, "vm_uuid": vm_uuid}
return {"success": False, "error": f"Unsupported action: {action}"}
return with_session(go)
async def _xenserver_save_credentials(
host: str,
user: str,
password: str,
profile: str = "default",
verify_ssl: bool = False,
ctx: Any = None,
) -> str:
"""Persist XenServer pool-master credentials as an encrypted per-user profile.
Backs the ``xenserver_save_credentials`` tool: after privilege and input
validation, it stores host, user, password and ``verify_ssl`` under a named
profile so later ``xenserver_control`` calls can reference them by
``credential_profile`` instead of re-sending secrets.
Interactions: gates on :func:`_check_priv` (``UNSANDBOXED_EXEC``), validates
inputs with :func:`_host_ok`/:func:`_user_ok`, then calls
:func:`tools._credential_profile_store.save_profile`, which AES-GCM-encrypts
the password and writes it into the Redis hash
``stargazer:xenserver_credentials:{user_id}`` under field ``profile``. The
returned JSON echoes host/user/verify_ssl and ``has_password`` but never the
plaintext password.
Called by: the tool dispatcher via the ``handler`` entry for
``xenserver_save_credentials`` in this module's :data:`TOOLS` list; no direct
internal callers.
Args:
host: Pool master hostname, IP, or https URL.
user: API username.
password: Account password (required; stored encrypted).
profile: Profile name to save under. Defaults to ``"default"``.
verify_ssl: Whether future connections using this profile should verify
TLS. Defaults to ``False``.
ctx: The ToolContext (supplies ``redis``, ``config``, ``user_id``);
required.
Returns:
A JSON string. On success, ``success: True`` plus the stored host, user,
``verify_ssl`` and ``has_password: True``; otherwise ``success: False``
with an error (missing context, missing privilege, or a validation
failure).
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "xenserver_save_credentials")
if auth_err:
return auth_err
if not _host_ok(host):
return json.dumps({"success": False, "error": "Invalid or empty host."})
if not _user_ok(user):
return json.dumps({"success": False, "error": "Invalid or empty user."})
if not password or str(password).strip() == "":
return json.dumps({"success": False, "error": "password is required."})
data = {
"host": host.strip(),
"user": user.strip(),
"password": str(password),
"verify_ssl": bool(verify_ssl),
}
raw = await _cred_save(CRED_PREFIX, profile, data, ctx)
out = json.loads(raw)
if out.get("success"):
out["host"] = data["host"]
out["user"] = data["user"]
out["verify_ssl"] = data["verify_ssl"]
out["has_password"] = True
return json.dumps(out)
async def _xenserver_list_credentials(ctx: Any = None) -> str:
"""List the saved XenServer credential profile names for the current user.
Backs the ``xenserver_list_credentials`` tool. Returns only profile names
(never secrets) so an operator can see which saved targets are available.
Interactions: gates on :func:`_check_priv` (``UNSANDBOXED_EXEC``), then calls
:func:`tools._credential_profile_store.list_profile_names`, which reads the
field names of the Redis hash ``stargazer:xenserver_credentials:{user_id}``.
Called by: the tool dispatcher via the ``handler`` entry for
``xenserver_list_credentials`` in this module's :data:`TOOLS` list; no direct
internal callers.
Args:
ctx: The ToolContext (supplies ``redis``/``user_id``); required.
Returns:
A JSON string: the profile-name listing on success, or ``success:
False`` with an error when context or the privilege is missing.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "xenserver_list_credentials")
if auth_err:
return auth_err
return await _cred_list(CRED_PREFIX, ctx)
async def _xenserver_delete_credentials(
profile: str = "default", ctx: Any = None
) -> str:
"""Delete one saved XenServer credential profile for the current user.
Backs the ``xenserver_delete_credentials`` tool, removing a previously saved
profile so it can no longer be referenced by ``credential_profile``.
Interactions: gates on :func:`_check_priv` (``UNSANDBOXED_EXEC``), then calls
:func:`tools._credential_profile_store.delete_profile`, which removes field
``profile`` from the Redis hash ``stargazer:xenserver_credentials:{user_id}``.
Called by: the tool dispatcher via the ``handler`` entry for
``xenserver_delete_credentials`` in this module's :data:`TOOLS` list; no
direct internal callers.
Args:
profile: Name of the profile to delete. Defaults to ``"default"``.
ctx: The ToolContext (supplies ``redis``/``user_id``); required.
Returns:
A JSON string reporting the deletion outcome, or ``success: False`` when
context or the privilege is missing.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "xenserver_delete_credentials")
if auth_err:
return auth_err
return await _cred_delete(CRED_PREFIX, profile, ctx)
[docs]
async def run(
host: str = "",
user: str = "",
password: str = "",
action: str = "",
verify_ssl: bool = False,
pool_uuid: str | None = None,
host_uuid: str | None = None,
vm_uuid: str | None = None,
sr_uuid: str | None = None,
snapshot_name: str | None = None,
snapshot_vm_uuid: str | None = None,
credential_profile: str = "",
ctx: Any = None,
) -> str:
"""Validate and execute one XenServer/XCP-ng pool action, returning JSON.
Entry point for the ``xenserver_control`` tool. It enforces the
``UNSANDBOXED_EXEC`` gate, optionally hydrates host/user/password from a saved
credential profile, validates the action and every supplied identifier, then
runs the blocking XenAPI work off the event loop and serialises the result. A
response that would exceed ``_MAX_JSON_CHARS`` is replaced with a "narrow the
request" error so a huge pool dump cannot blow up the message pipeline.
Interactions and side effects: gates via :func:`_check_priv`; when a
``credential_profile`` is given it loads and merges saved credentials through
:func:`tools._credential_profile_store.load_profile` /
:func:`tools._credential_profile_store.merge_profile` (kwargs override the
profile). It validates the action against :data:`_VALID_ACTIONS` and the
inputs with :func:`_host_ok`, :func:`_user_ok`, :func:`_uuid_ok`, and
:func:`_snapshot_name_ok`, normalises the URL with
:func:`_normalize_master_url`, and dispatches the actual XML-RPC call by
running :func:`_xapi_dispatch` in a worker thread via
:func:`asyncio.to_thread`. Mutating actions (VM start/shutdown/reboot/migrate/
destroy, snapshot create/revert/delete) change live infrastructure; read
actions only fetch data. Reads ``redis``/``config``/``user_id`` off ``ctx``.
Dispatched by the tool loader via the ``xenserver_control`` ``handler`` entry
in this module's :data:`TOOLS` list.
Args:
host (str): Pool master hostname, IP, or URL; optional when
``credential_profile`` supplies it.
user (str): XenAPI username; optional when supplied by a profile.
password (str): Account password; optional when supplied by a profile.
action (str): The operation to perform; must be one of
:data:`_VALID_ACTIONS`.
verify_ssl (bool): When ``True``, verify TLS; defaults to ``False`` for
self-signed pool certs (mapped to ``ignore_ssl`` for XenAPI).
pool_uuid (str | None): Optional pool UUID for ``pool_info``.
host_uuid (str | None): Host UUID for ``host_info`` or the
``vm_pool_migrate`` destination.
vm_uuid (str | None): VM UUID for VM lifecycle, snapshot-create, migrate,
and destroy actions.
sr_uuid (str | None): Reserved for future SR-scoped actions; currently
ignored.
snapshot_name (str | None): Label used by ``snapshot_create``.
snapshot_vm_uuid (str | None): Snapshot VM UUID for ``snapshot_revert`` and
``snapshot_delete``.
credential_profile (str): Name of a saved profile to load credentials
from; explicit kwargs override the loaded values.
ctx: The ToolContext supplying ``redis``, ``config``, and ``user_id``;
required.
Returns:
str: A JSON string with the action result (always carrying ``success``),
or a ``success: false`` error for a missing context, denied privilege,
invalid action/identifier, or an oversized (truncated) response.
"""
del sr_uuid # reserved for future SR-scoped actions
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
if credential_profile and str(credential_profile).strip():
loaded = await _cred_load(CRED_PREFIX, credential_profile.strip(), ctx)
if isinstance(loaded, str):
return json.dumps({"success": False, "error": loaded})
merged = _cred_merge(
loaded,
{
"host": host,
"user": user,
"password": password,
"verify_ssl": verify_ssl,
},
)
host = str(merged.get("host") or "")
user = str(merged.get("user") or "")
password = str(merged.get("password") or "")
if "verify_ssl" in merged:
verify_ssl = bool(merged["verify_ssl"])
act = (action or "").strip()
if act not in _VALID_ACTIONS:
return json.dumps(
{
"success": False,
"error": f"Invalid action. Must be one of: {', '.join(sorted(_VALID_ACTIONS))}.",
}
)
if not _host_ok(host):
return json.dumps({"success": False, "error": "Invalid or empty host."})
if not _user_ok(user):
return json.dumps({"success": False, "error": "Invalid or empty user."})
if password is None or str(password) == "":
return json.dumps({"success": False, "error": "password is required."})
if not _snapshot_name_ok(snapshot_name):
return json.dumps({"success": False, "error": "Invalid snapshot_name."})
need_vm = act in {
"vm_start",
"vm_clean_shutdown",
"vm_hard_shutdown",
"vm_reboot",
"vm_suspend",
"vm_resume",
"snapshot_create",
"vm_pool_migrate",
"vm_destroy",
}
if need_vm and not _uuid_ok(vm_uuid):
return json.dumps(
{"success": False, "error": "vm_uuid is required and must be a valid UUID."}
)
if act == "host_info" and not _uuid_ok(host_uuid):
return json.dumps(
{
"success": False,
"error": "host_uuid is required and must be a valid UUID.",
}
)
if (
act == "pool_info"
and pool_uuid is not None
and str(pool_uuid).strip()
and not _uuid_ok(pool_uuid)
):
return json.dumps(
{"success": False, "error": "pool_uuid must be a valid UUID when provided."}
)
if act in {"snapshot_revert", "snapshot_delete"} and not _uuid_ok(snapshot_vm_uuid):
return json.dumps(
{
"success": False,
"error": "snapshot_vm_uuid is required and must be a valid UUID.",
}
)
if act == "vm_pool_migrate":
if not _uuid_ok(host_uuid):
return json.dumps(
{
"success": False,
"error": "host_uuid (destination) is required and must be a valid UUID.",
}
)
url = _normalize_master_url(host)
ignore_ssl = not verify_ssl
def _strip_opt(s: str | None) -> str | None:
"""Normalize an optional UUID argument to a trimmed string or ``None``.
Strips surrounding whitespace and collapses empty/whitespace-only values
to ``None`` so :func:`_xapi_dispatch` receives ``None`` rather than a
blank string for unset identifiers.
Interactions: pure; no side effects. Called by the enclosing :func:`run`
when marshalling ``pool_uuid``, ``host_uuid``, ``vm_uuid`` and
``snapshot_vm_uuid`` for the dispatch call.
Args:
s: The raw optional string, or ``None``.
Returns:
The trimmed string, or ``None`` if it was empty/whitespace/``None``.
"""
t = (s or "").strip()
return t or None
result = await asyncio.to_thread(
_xapi_dispatch,
url,
user,
password,
ignore_ssl,
act,
_strip_opt(pool_uuid),
_strip_opt(host_uuid),
_strip_opt(vm_uuid),
snapshot_name,
_strip_opt(snapshot_vm_uuid),
)
out = json.dumps(result, default=str)
if len(out) > _MAX_JSON_CHARS:
return json.dumps(
{
"success": result.get("success", False),
"error": (
"Response exceeded size limit; narrow the request "
"(e.g. filter by pool or use host_info / pool_info)."
),
"truncated": True,
}
)
return out
_CONTROL_PARAMETERS = {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": (
"Pool master hostname or IP, or full URL (optional if credential_profile set)."
),
},
"user": {
"type": "string",
"description": "API user (optional if credential_profile set).",
},
"password": {
"type": "string",
"description": "Account password (optional if credential_profile set).",
},
"credential_profile": {
"type": "string",
"description": "Load host/user/password from this saved profile; kwargs override.",
"default": "",
},
"verify_ssl": {
"type": "boolean",
"description": "If true, verify TLS certificates; if false (default), allow self-signed.",
},
"action": {
"type": "string",
"description": "Operation to perform.",
"enum": sorted(_VALID_ACTIONS),
},
"pool_uuid": {
"type": "string",
"description": "Pool UUID — optional for pool_info (defaults to the only/first pool).",
},
"host_uuid": {
"type": "string",
"description": "Host UUID — required for host_info and vm_pool_migrate (destination).",
},
"vm_uuid": {
"type": "string",
"description": "VM UUID — required for VM lifecycle, snapshots, migrate, destroy.",
},
"sr_uuid": {
"type": "string",
"description": "Reserved for future use (e.g. filtered SR ops).",
},
"snapshot_name": {
"type": "string",
"description": "Label for snapshot_create.",
},
"snapshot_vm_uuid": {
"type": "string",
"description": "UUID of the snapshot VM — for snapshot_revert and snapshot_delete.",
},
},
"required": ["action"],
}
TOOLS = [
{
"name": "xenserver_save_credentials",
"description": (
"Save XenServer/XCP-ng pool master credentials (host, user, password, verify_ssl) "
"encrypted per-user. Requires UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": "Pool master hostname or IP or https URL.",
},
"user": {"type": "string", "description": "API user."},
"password": {"type": "string", "description": "Password."},
"profile": {
"type": "string",
"description": "Profile name.",
"default": "default",
},
"verify_ssl": {
"type": "boolean",
"description": "Verify TLS",
"default": False,
},
},
"required": ["host", "user", "password"],
},
"handler": _xenserver_save_credentials,
},
{
"name": "xenserver_list_credentials",
"description": (
"List saved XenServer profile names for this user. Requires UNSANDBOXED_EXEC."
),
"parameters": {"type": "object", "properties": {}},
"handler": _xenserver_list_credentials,
},
{
"name": "xenserver_delete_credentials",
"description": "Delete a saved XenServer credential profile. Requires UNSANDBOXED_EXEC.",
"parameters": {
"type": "object",
"properties": {
"profile": {
"type": "string",
"description": "Profile name.",
"default": "default",
},
},
"required": ["profile"],
},
"handler": _xenserver_delete_credentials,
},
{
"name": "xenserver_control",
"description": (
"Manage XenServer or XCP-ng pools via XenAPI (XML-RPC over HTTPS to the pool master): "
"pools, hosts, SRs, VM lifecycle, snapshots, pool migrate, destroy VM. "
"Requires UNSANDBOXED_EXEC. "
"Use credential_profile to load saved auth or pass host/user/password. "
"Target VMs and hosts by UUID. "
"For snapshot_revert and snapshot_delete, use snapshot_vm_uuid (UUID of the snapshot VM). "
"verify_ssl defaults to false for self-signed certificates."
),
"parameters": _CONTROL_PARAMETERS,
"handler": run,
},
]