Source code for tools.bmc_vendor_tools

"""Vendor BMC tools: Redfish/HTTPS, Dell racadm, Supermicro SMCIPMITool.

**Security:** all handlers require ``UNSANDBOXED_EXEC``. No shell; argv lists only
for subprocess tools. Redfish uses enum actions mapped to fixed paths.

Remote HPE iLO is accessed via Redfish (not ``hponcfg``, which is local OS only).
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
import shutil
from typing import Any
from urllib.parse import quote

import httpx

from tools._safe_http import safe_http_request, safe_httpx_client
from tools._credential_profile_store import (
    delete_profile as _cred_delete,
    list_profile_names as _cred_list,
    load_profile as _cred_load,
    merge_profile as _cred_merge,
    save_profile as _cred_save,
)
from tools.ipmi_tools import _check_priv, _host_ok, _user_ok

logger = logging.getLogger(__name__)

# Redis: stargazer:{prefix}_credentials:{user_id}
_PREFIX_BMC_REDFISH = "bmc_redfish"
_PREFIX_IDRAC_RACADM = "idrac_racadm"
_PREFIX_SMC_SUPERMICRO = "smc_supermicro"

_MAX_STDOUT = 200_000
_MAX_STDERR = 20_000
_MAX_JSON_BODY = 2_000_000

# --- Redfish -----------------------------------------------------------------

_REDFISH_ACTIONS = frozenset(
    {
        "service_root",
        "systems_collection",
        "system_detail",
        "chassis_collection",
        "chassis_detail",
        "managers_collection",
        "manager_detail",
        "thermal",
        "chassis_power",
        "processors_memory",
        "network_adapters",
        "log_services",
        "log_entries",
        "software_inventory",
        "update_service",
        "boot_properties",
        "boot_override",
        "system_reset",
        "virtual_media_collection",
    }
)

_RESET_TYPES = frozenset(
    {
        "On",
        "ForceOff",
        "GracefulShutdown",
        "GracefulRestart",
        "ForceRestart",
        "ForcePowerCycle",
        "Nmi",
        "PowerCycle",
        "PowerOn",
        "ColdReset",
        "WarmReset",
    }
)


def _bmc_base_url(host: str, port: int | None) -> str:
    """Build the HTTPS base URL for a BMC's Redfish endpoint.

    Normalises a host that may already carry an ``http(s)://`` scheme, otherwise
    defaults to ``https://`` and appends the port when one is supplied. Trailing
    slashes are stripped so callers can concatenate ``/redfish/v1/...`` paths.

    Called by :func:`run_bmc_redfish` to derive ``base`` before any Redfish
    request; no other callers exist.

    Args:
        host: BMC hostname or IP, optionally prefixed with a scheme.
        port: Optional HTTPS port; ignored when ``None`` or not positive.

    Returns:
        str: The normalised base URL without a trailing slash, or ``""`` when
        ``host`` is empty/whitespace.
    """
    h = (host or "").strip()
    if not h:
        return ""
    if h.startswith("https://") or h.startswith("http://"):
        return h.rstrip("/")
    if port is not None and port > 0:
        return f"https://{h}:{int(port)}"
    return f"https://{h}"


def _port_ok(port: int | None) -> bool:
    """Validate that an optional TCP port is within the legal range.

    Treats ``None`` as valid (meaning "use the default"); otherwise requires a
    value in ``1..65535``.

    Called by :func:`run_bmc_redfish` during input validation; no other callers.

    Args:
        port: Port number to check, or ``None`` to accept the default.

    Returns:
        bool: ``True`` if ``None`` or a port in ``1..65535``, else ``False``.
    """
    if port is None:
        return True
    return 1 <= int(port) <= 65535


def _bad_pw(s: str | None) -> bool:
    """Report whether a password contains characters that must be rejected.

    Rejects ``None`` outright and any string containing a newline, carriage
    return, or NUL byte. These could break header/credential framing when the
    password is passed to ``httpx`` BasicAuth or to subprocess argv, so callers
    refuse such values.

    Called by :func:`run_bmc_redfish` while validating the supplied password; no
    other callers.

    Args:
        s: Candidate password, or ``None``.

    Returns:
        bool: ``True`` if the value is unusable (``None`` or contains a
        control character), else ``False``.
    """
    if s is None:
        return True
    return any(c in str(s) for c in "\n\r\x00")


def _bad_token(s: str) -> bool:
    """Report whether an identifier token contains forbidden control characters.

    Used to sanitise caller-supplied Redfish member ids (system/chassis/manager)
    before they are interpolated into request URLs, rejecting newline, carriage
    return, and NUL bytes. An empty token is considered acceptable here (callers
    handle emptiness separately, e.g. by auto-discovery).

    Note: this name shadows the unrelated ``_bad_token`` defined in other tool
    modules; this module imports its own host/user validators from
    ``tools.ipmi_tools`` but defines this token check locally. Called by
    :func:`_resolve_system_id`, :func:`_resolve_chassis_id`, and
    :func:`_resolve_manager_id`.

    Args:
        s: Identifier token to validate.

    Returns:
        bool: ``True`` if ``s`` is non-empty and contains a control character,
        else ``False``.
    """
    if not s:
        return False
    return any(c in s for c in "\n\r\x00")


async def _redfish_json(
    client: httpx.AsyncClient,
    method: str,
    url: str,
    *,
    json_body: dict[str, Any] | None = None,
) -> tuple[int, Any]:
    """Issue one SSRF-guarded Redfish HTTP request and parse the JSON reply.

    Supports ``GET``/``POST``/``PATCH`` only; ``POST``/``PATCH`` send
    ``json_body`` (defaulting to ``{}``). The response is decoded as JSON when
    the ``Content-Type`` mentions json or the body starts with ``{``, otherwise
    the raw text is returned under a ``raw`` key. Both the parsed body and any
    raw text are capped at ``_MAX_JSON_BODY`` bytes.

    Delegates the actual transfer to :func:`tools._safe_http.safe_http_request`
    with ``allow_private=True`` (BMCs typically live on private/management
    networks) and ``max_redirects=5``; network and validation failures are
    caught and surfaced as a result rather than raised. Called throughout
    :func:`run_bmc_redfish` for every Redfish read/write and by
    :func:`_first_odata_member_id` during id discovery.

    Args:
        client: An ``httpx.AsyncClient`` (from :func:`safe_httpx_client`) already
            carrying BasicAuth and TLS settings.
        method: HTTP verb; one of ``GET``, ``POST``, ``PATCH`` (case-insensitive).
        url: Fully-qualified Redfish URL to request.
        json_body: Optional JSON payload for ``POST``/``PATCH`` requests.

    Returns:
        tuple[int, Any]: ``(status_code, body)`` where ``body`` is the parsed
        JSON object, a ``{"raw": text}`` fallback, or a ``{"error": ...}`` dict.
        The status is ``400`` for an unsupported method and ``-1`` when the
        request raised a ``ValueError`` (e.g. SSRF block) or ``httpx.HTTPError``.
    """
    try:
        m = method.upper()
        if m == "GET":
            r = await safe_http_request(
                client,
                "GET",
                url,
                allow_private=True,
                max_redirects=5,
            )
        elif m == "POST":
            r = await safe_http_request(
                client,
                "POST",
                url,
                json=json_body or {},
                allow_private=True,
                max_redirects=5,
            )
        elif m == "PATCH":
            r = await safe_http_request(
                client,
                "PATCH",
                url,
                json=json_body or {},
                allow_private=True,
                max_redirects=5,
            )
        else:
            return 400, {"error": f"unsupported method {method}"}
    except ValueError as exc:
        return -1, {"error": str(exc)}
    except httpx.HTTPError as exc:
        return -1, {"error": str(exc)}

    ct = (r.headers.get("content-type") or "").lower()
    text = r.text[:_MAX_JSON_BODY]
    if "json" in ct or text.strip().startswith("{"):
        try:
            return r.status_code, json.loads(text) if text.strip() else {}
        except Exception:
            return r.status_code, {"raw": text}
    return r.status_code, {"raw": text}


async def _first_odata_member_id(
    client: httpx.AsyncClient,
    base: str,
    collection_path: str,
) -> str | None:
    """Fetch a Redfish collection and return the id of its first member.

    GETs the collection, then reads the trailing path segment of the first
    ``Members[0]["@odata.id"]`` value (e.g. ``"1"`` from
    ``/redfish/v1/Systems/1``). Returns ``None`` if the collection is missing,
    non-200, empty, or malformed.

    Issues the request via :func:`_redfish_json`. Called by
    :func:`_resolve_system_id`, :func:`_resolve_chassis_id`, and
    :func:`_resolve_manager_id` to auto-discover a default id when the caller
    did not supply one.

    Args:
        client: Authenticated ``httpx.AsyncClient`` for the BMC.
        base: Normalised BMC base URL from :func:`_bmc_base_url`.
        collection_path: Collection path appended to ``base`` (e.g.
            ``/redfish/v1/Systems``).

    Returns:
        str | None: The first member's id, or ``None`` when it cannot be
        discovered.
    """
    code, data = await _redfish_json(client, "GET", f"{base}{collection_path}")
    if code != 200 or not isinstance(data, dict):
        return None
    members = data.get("Members") or []
    if not members:
        return None
    first = members[0]
    if not isinstance(first, dict):
        return None
    oid = first.get("@odata.id")
    if not oid or not isinstance(oid, str):
        return None
    return oid.rstrip("/").split("/")[-1]


async def _resolve_system_id(
    client: httpx.AsyncClient,
    base: str,
    explicit: str | None,
) -> tuple[str | None, str | None]:
    """Resolve the Redfish Systems member id to operate on.

    Uses ``explicit`` when provided (after validating it with
    :func:`_bad_token`), otherwise auto-discovers the first member via
    :func:`_first_odata_member_id` against ``/redfish/v1/Systems``.

    Called by :func:`run_bmc_redfish` before any system-scoped action
    (``system_detail``, ``processors_memory``, ``network_adapters``,
    ``log_services``, ``log_entries``, ``boot_properties``, ``boot_override``,
    ``system_reset``).

    Args:
        client: Authenticated ``httpx.AsyncClient`` for the BMC.
        base: Normalised BMC base URL.
        explicit: Caller-supplied system id, or ``None``/empty to auto-detect.

    Returns:
        tuple[str | None, str | None]: ``(system_id, None)`` on success, or
        ``(None, error_message)`` when the id is invalid or undiscoverable.
    """
    if explicit and explicit.strip():
        sid = explicit.strip()
        if _bad_token(sid):
            return None, "Invalid system_id."
        return sid, None
    sid = await _first_odata_member_id(client, base, "/redfish/v1/Systems")
    if not sid:
        return None, "Could not discover a Systems member id."
    return sid, None


async def _resolve_chassis_id(
    client: httpx.AsyncClient,
    base: str,
    explicit: str | None,
) -> tuple[str | None, str | None]:
    """Resolve the Redfish Chassis member id to operate on.

    Uses ``explicit`` when given (validated with :func:`_bad_token`), otherwise
    auto-discovers the first member via :func:`_first_odata_member_id` against
    ``/redfish/v1/Chassis``.

    Called by :func:`run_bmc_redfish` before chassis-scoped actions
    (``chassis_detail``, ``thermal``, ``chassis_power``).

    Args:
        client: Authenticated ``httpx.AsyncClient`` for the BMC.
        base: Normalised BMC base URL.
        explicit: Caller-supplied chassis id, or ``None``/empty to auto-detect.

    Returns:
        tuple[str | None, str | None]: ``(chassis_id, None)`` on success, or
        ``(None, error_message)`` when invalid or undiscoverable.
    """
    if explicit and explicit.strip():
        cid = explicit.strip()
        if _bad_token(cid):
            return None, "Invalid chassis_id."
        return cid, None
    cid = await _first_odata_member_id(client, base, "/redfish/v1/Chassis")
    if not cid:
        return None, "Could not discover a Chassis member id."
    return cid, None


async def _resolve_manager_id(
    client: httpx.AsyncClient,
    base: str,
    explicit: str | None,
) -> tuple[str | None, str | None]:
    """Resolve the Redfish Managers member id to operate on.

    Uses ``explicit`` when given (validated with :func:`_bad_token`), otherwise
    auto-discovers the first member via :func:`_first_odata_member_id` against
    ``/redfish/v1/Managers``.

    Called by :func:`run_bmc_redfish` for ``manager_detail`` and (twice) for
    ``virtual_media_collection``, since virtual media hangs off a manager.

    Args:
        client: Authenticated ``httpx.AsyncClient`` for the BMC.
        base: Normalised BMC base URL.
        explicit: Caller-supplied manager id, or ``None``/empty to auto-detect.

    Returns:
        tuple[str | None, str | None]: ``(manager_id, None)`` on success, or
        ``(None, error_message)`` when invalid or undiscoverable.
    """
    if explicit and explicit.strip():
        mid = explicit.strip()
        if _bad_token(mid):
            return None, "Invalid manager_id."
        return mid, None
    mid = await _first_odata_member_id(client, base, "/redfish/v1/Managers")
    if not mid:
        return None, "Could not discover a Managers member id."
    return mid, None


def _truncate_payload(obj: Any) -> Any:
    """Recursively bound the size of a parsed Redfish payload before returning it.

    Caps dicts at the first 500 keys, lists at the first 2000 elements, and any
    string longer than 50,000 characters, appending an ellipsis marker to
    truncated strings. This keeps a verbose BMC response from overrunning the
    tool result that is serialised back to the model.

    Called by :func:`run_bmc_redfish` to wrap every ``data``/``boot``/
    ``processors``/``memory`` value placed in the JSON result; recurses into
    itself for nested structures.

    Args:
        obj: Arbitrary JSON-decoded value (dict, list, str, or scalar).

    Returns:
        Any: A size-bounded copy of ``obj`` with the same structure; scalars and
        short strings are returned unchanged.
    """
    if isinstance(obj, dict):
        return {k: _truncate_payload(v) for k, v in list(obj.items())[:500]}
    if isinstance(obj, list):
        return [_truncate_payload(x) for x in obj[:2000]]
    if isinstance(obj, str) and len(obj) > 50_000:
        return obj[:50_000] + "…(truncated)"
    return obj


[docs] async def run_bmc_redfish( host: str, user: str, password: str, action: str, ctx: Any = None, *, credential_profile: str = "", system_id: str = "", chassis_id: str = "", manager_id: str = "", reset_type: str = "ForceRestart", boot_override_enabled: str = "Once", boot_override_target: str = "Pxe", verify_ssl: bool = False, port: int | None = None, timeout: float = 120.0, ) -> str: """Perform one allowlisted Redfish operation over HTTPS against a BMC. Backing implementation of the ``bmc_redfish`` tool, covering HPE iLO, Dell iDRAC (Redfish mode), Lenovo XCC, and many Supermicro/Cisco controllers. Dispatches on ``action`` — an enum drawn from ``_REDFISH_ACTIONS`` — to a fixed Redfish path (service root, systems/chassis/managers inventory, thermal and power readings, log services and entries, firmware/software inventory, virtual-media listing, boot properties, a boot-override PATCH, or a ComputerSystem.Reset POST). Member ids are auto-discovered when not supplied. Only enum actions reach fixed paths; arbitrary URLs are never issued. When ``credential_profile`` is set it loads saved connection fields via :func:`tools._credential_profile_store.load_profile` (aliased ``_cred_load``) and merges them with ``_cred_merge`` before validating the host, user, password, and port with :func:`_host_ok`, :func:`_user_ok`, :func:`_bad_pw`, and :func:`_port_ok`. Requires ``UNSANDBOXED_EXEC`` via :func:`tools.ipmi_tools._check_priv`. Builds the base URL with :func:`_bmc_base_url`, opens an SSRF-guarded client from :func:`tools._safe_http.safe_httpx_client` carrying ``httpx.BasicAuth``, resolves ids through :func:`_resolve_system_id`, :func:`_resolve_chassis_id`, and :func:`_resolve_manager_id`, issues each request via :func:`_redfish_json`, and bounds every response with :func:`_truncate_payload`. Side effects: outbound HTTPS to the BMC management network, and (for ``boot_override``/``system_reset``) a state change on the target server. Errors are returned as JSON, not raised. Called by the ``tool_loader`` dispatcher as the registered ``handler`` for the ``bmc_redfish`` entry in ``TOOLS`` (and directly in ``tests/test_credential_profile_merges.py``). Args: host: BMC hostname or IP, optionally with an ``http(s)://`` scheme. user: BMC username for BasicAuth. password: BMC password for BasicAuth. action: Redfish operation to perform; must be in ``_REDFISH_ACTIONS``. ctx: Tool context providing ``redis``/``user_id`` for the privilege and credential lookups; required. credential_profile: Optional saved profile name supplying host, user, password, and TLS options. system_id: Redfish Systems member id; empty auto-detects the first. chassis_id: Chassis member id; empty auto-detects the first. manager_id: Managers member id; empty auto-detects the first. reset_type: ResetType for ``system_reset`` (must be in ``_RESET_TYPES``; default ``"ForceRestart"``). boot_override_enabled: BootSourceOverrideEnabled for ``boot_override`` (default ``"Once"``). boot_override_target: BootSourceOverrideTarget for ``boot_override`` (default ``"Pxe"``). verify_ssl: Whether to verify the BMC's TLS certificate (default ``False`` for self-signed BMC certs). port: Optional HTTPS port when not 443 and not embedded in ``host``. timeout: HTTP timeout in seconds (default 120). Returns: str: A JSON object carrying ``success``, ``http_status``, ``action``, the resolved id(s), and the truncated ``data`` (or per-action payload keys), or ``{"success": False, "error": ...}`` on a validation, auth, or request failure. """ if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx, "bmc_redfish") if auth_err: return auth_err if credential_profile and str(credential_profile).strip(): loaded = await _cred_load(_PREFIX_BMC_REDFISH, credential_profile.strip(), ctx) if isinstance(loaded, str): return json.dumps({"success": False, "error": loaded}) merged = _cred_merge( loaded, { "host": host, "user": user, "password": password, "verify_ssl": verify_ssl, "port": port, "timeout": timeout, }, ) host = str(merged.get("host") or "") user = str(merged.get("user") or "") password = str(merged.get("password") or "") if "verify_ssl" in merged: verify_ssl = bool(merged["verify_ssl"]) if "port" in merged: port = merged["port"] if "timeout" in merged: timeout = float(merged["timeout"]) if not _host_ok(host): return json.dumps({"success": False, "error": "Invalid or empty host."}) if not _user_ok(user): return json.dumps({"success": False, "error": "Invalid or empty user."}) if password is None or str(password) == "" or _bad_pw(str(password)): return json.dumps({"success": False, "error": "password is required."}) if not _port_ok(port): return json.dumps({"success": False, "error": "Invalid port."}) act = (action or "").strip() if act not in _REDFISH_ACTIONS: return json.dumps( { "success": False, "error": f"Invalid action. Must be one of: {', '.join(sorted(_REDFISH_ACTIONS))}.", } ) base = _bmc_base_url(host, port) auth = httpx.BasicAuth(user.strip(), str(password)) async with safe_httpx_client( auth=auth, verify=verify_ssl, timeout=httpx.Timeout(timeout), ) as client: if act == "service_root": code, body = await _redfish_json(client, "GET", f"{base}/redfish/v1/") return json.dumps( { "success": code == 200, "http_status": code, "action": act, "data": _truncate_payload(body), }, default=str, ) if act == "systems_collection": code, body = await _redfish_json( client, "GET", f"{base}/redfish/v1/Systems" ) return json.dumps( { "success": code == 200, "http_status": code, "action": act, "data": _truncate_payload(body), }, default=str, ) if act == "chassis_collection": code, body = await _redfish_json( client, "GET", f"{base}/redfish/v1/Chassis" ) return json.dumps( { "success": code == 200, "http_status": code, "action": act, "data": _truncate_payload(body), }, default=str, ) if act == "managers_collection": code, body = await _redfish_json( client, "GET", f"{base}/redfish/v1/Managers" ) return json.dumps( { "success": code == 200, "http_status": code, "action": act, "data": _truncate_payload(body), }, default=str, ) if act == "update_service": code, body = await _redfish_json( client, "GET", f"{base}/redfish/v1/UpdateService", ) return json.dumps( { "success": code == 200, "http_status": code, "action": act, "data": _truncate_payload(body), }, default=str, ) sid, serr = await _resolve_system_id(client, base, system_id or None) if serr and act in { "system_detail", "processors_memory", "network_adapters", "log_services", "log_entries", "boot_properties", "boot_override", "system_reset", }: return json.dumps({"success": False, "error": serr}) if act == "system_detail" and sid: code, body = await _redfish_json( client, "GET", f"{base}/redfish/v1/Systems/{quote(sid, safe='')}", ) return json.dumps( { "success": code == 200, "http_status": code, "action": act, "system_id": sid, "data": _truncate_payload(body), }, default=str, ) if act == "processors_memory" and sid: proc, mem = await asyncio.gather( _redfish_json( client, "GET", f"{base}/redfish/v1/Systems/{quote(sid, safe='')}/Processors", ), _redfish_json( client, "GET", f"{base}/redfish/v1/Systems/{quote(sid, safe='')}/Memory", ), ) return json.dumps( { "success": proc[0] == 200 and mem[0] == 200, "http_status": {"processors": proc[0], "memory": mem[0]}, "action": act, "system_id": sid, "processors": _truncate_payload(proc[1]), "memory": _truncate_payload(mem[1]), }, default=str, ) if act == "network_adapters" and sid: code, body = await _redfish_json( client, "GET", f"{base}/redfish/v1/Systems/{quote(sid, safe='')}/NetworkAdapters", ) return json.dumps( { "success": code == 200, "http_status": code, "action": act, "system_id": sid, "data": _truncate_payload(body), }, default=str, ) if act == "log_services" and sid: code, body = await _redfish_json( client, "GET", f"{base}/redfish/v1/Systems/{quote(sid, safe='')}/LogServices", ) return json.dumps( { "success": code == 200, "http_status": code, "action": act, "system_id": sid, "data": _truncate_payload(body), }, default=str, ) if act == "log_entries" and sid: code, body = await _redfish_json( client, "GET", f"{base}/redfish/v1/Systems/{quote(sid, safe='')}/LogServices", ) entries: list[Any] = [] if code == 200 and isinstance(body, dict): for m in (body.get("Members") or [])[:50]: if not isinstance(m, dict): continue oid = m.get("@odata.id") if not oid or not isinstance(oid, str): continue ec, ed = await _redfish_json(client, "GET", f"{base}{oid}/Entries") if ec == 200: entries.append( {"log_service": oid, "entries": _truncate_payload(ed)} ) return json.dumps( { "success": bool(entries) or code == 200, "http_status": code, "action": act, "system_id": sid, "data": entries if entries else _truncate_payload(body), }, default=str, ) if act == "software_inventory": paths = [ "/redfish/v1/UpdateService/FirmwareInventory", "/redfish/v1/UpdateService/SoftwareInventory", ] results: list[dict[str, Any]] = [] for p in paths: code, body = await _redfish_json(client, "GET", f"{base}{p}") results.append( {"path": p, "http_status": code, "data": _truncate_payload(body)} ) ok = any(r["http_status"] == 200 for r in results) return json.dumps( { "success": ok, "action": act, "results": results, }, default=str, ) cid, cerr = await _resolve_chassis_id(client, base, chassis_id or None) if cerr and act in {"chassis_detail", "thermal", "chassis_power"}: return json.dumps({"success": False, "error": cerr}) if act == "chassis_detail" and cid: code, body = await _redfish_json( client, "GET", f"{base}/redfish/v1/Chassis/{quote(cid, safe='')}", ) return json.dumps( { "success": code == 200, "http_status": code, "action": act, "chassis_id": cid, "data": _truncate_payload(body), }, default=str, ) if act == "thermal" and cid: code, body = await _redfish_json( client, "GET", f"{base}/redfish/v1/Chassis/{quote(cid, safe='')}/Thermal", ) return json.dumps( { "success": code == 200, "http_status": code, "action": act, "chassis_id": cid, "data": _truncate_payload(body), }, default=str, ) if act == "chassis_power" and cid: code, body = await _redfish_json( client, "GET", f"{base}/redfish/v1/Chassis/{quote(cid, safe='')}/Power", ) return json.dumps( { "success": code == 200, "http_status": code, "action": act, "chassis_id": cid, "data": _truncate_payload(body), }, default=str, ) mid, merr = await _resolve_manager_id(client, base, manager_id or None) if merr and act == "manager_detail": return json.dumps({"success": False, "error": merr}) if act == "manager_detail" and mid: code, body = await _redfish_json( client, "GET", f"{base}/redfish/v1/Managers/{quote(mid, safe='')}", ) return json.dumps( { "success": code == 200, "http_status": code, "action": act, "manager_id": mid, "data": _truncate_payload(body), }, default=str, ) if act == "virtual_media_collection": mid2, merr2 = await _resolve_manager_id(client, base, manager_id or None) if merr2: return json.dumps({"success": False, "error": merr2}) code, body = await _redfish_json( client, "GET", f"{base}/redfish/v1/Managers/{quote(mid2, safe='')}/VirtualMedia", ) return json.dumps( { "success": code == 200, "http_status": code, "action": act, "manager_id": mid2, "data": _truncate_payload(body), }, default=str, ) if act == "boot_properties" and sid: code, body = await _redfish_json( client, "GET", f"{base}/redfish/v1/Systems/{quote(sid, safe='')}", ) boot = {} if isinstance(body, dict): boot = body.get("Boot") or {} return json.dumps( { "success": code == 200, "http_status": code, "action": act, "system_id": sid, "boot": _truncate_payload(boot), }, default=str, ) if act == "boot_override" and sid: patch_body = { "Boot": { "BootSourceOverrideEnabled": boot_override_enabled, "BootSourceOverrideTarget": boot_override_target, }, } code, body = await _redfish_json( client, "PATCH", f"{base}/redfish/v1/Systems/{quote(sid, safe='')}", json_body=patch_body, ) return json.dumps( { "success": code in (200, 204), "http_status": code, "action": act, "system_id": sid, "data": _truncate_payload(body), }, default=str, ) if act == "system_reset" and sid: rt = (reset_type or "ForceRestart").strip() if rt not in _RESET_TYPES: return json.dumps( { "success": False, "error": f"Invalid reset_type. Must be one of: {', '.join(sorted(_RESET_TYPES))}.", } ) url = f"{base}/redfish/v1/Systems/{quote(sid, safe='')}/Actions/ComputerSystem.Reset" code, body = await _redfish_json( client, "POST", url, json_body={"ResetType": rt}, ) return json.dumps( { "success": code in (200, 202, 204), "http_status": code, "action": act, "system_id": sid, "reset_type": rt, "data": _truncate_payload(body), }, default=str, ) return json.dumps({"success": False, "error": "Unhandled action."})
# --- Dell racadm ------------------------------------------------------------- _RACADM_ACTIONS = frozenset( { "getsysinfo", "getversion", "getsel", "getniccfg", "getraclog", "gettracelog", "getidrac", "getsysteminfo", "getversion_bios", "getversion_nic", "getversion_perc", "jobqueue_view", "jobqueue_view_all", "serveraction_powerstatus", "serveraction_powerup", "serveraction_powerdown", "serveraction_powercycle", "serveraction_graceshutdown", "serveraction_hardreset", "serveraction_powerreset", "storage_get_controllers", "storage_get_physicaldisks", "storage_get_virtualdisks", "storage_get_batteries", "storage_get_enclosures", "storage_get_smarts", "storage_get_summary", } ) def _racadm_tail(action: str) -> list[str] | None: """Map an allowlisted racadm action name to its fixed argv tail. Returns the exact, pre-vetted ``racadm`` subcommand argument list for a known action (e.g. ``"getsysinfo"`` -> ``["getsysinfo"]``, ``"getidrac"`` -> ``["get", "iDRAC"]``). Because only these constant tails are ever appended to argv, no user-controlled tokens can reach the subcommand, which is central to this tool's no-shell safety model. Called by :func:`run_idrac_racadm` to translate the requested action and to double-check (alongside ``_RACADM_ACTIONS``) that the action is allowed. Args: action: Action name; stripped of surrounding whitespace before lookup. Returns: list[str] | None: The fixed argv tail for the action, or ``None`` if the action is not in the allowlist. """ fixed: dict[str, list[str]] = { "getsysinfo": ["getsysinfo"], "getversion": ["getversion"], "getsel": ["getsel"], "getniccfg": ["getniccfg"], "getraclog": ["getraclog"], "gettracelog": ["gettracelog"], "getidrac": ["get", "iDRAC"], "getsysteminfo": ["get", "SystemInfo"], "getversion_bios": ["get", "BIOS"], "getversion_nic": ["get", "NIC"], "getversion_perc": ["get", "PERC"], "jobqueue_view": ["jobqueue", "view"], "jobqueue_view_all": ["jobqueue", "view", "-i", "all"], "serveraction_powerstatus": ["serveraction", "powerstatus"], "serveraction_powerup": ["serveraction", "powerup"], "serveraction_powerdown": ["serveraction", "powerdown"], "serveraction_powercycle": ["serveraction", "powercycle"], "serveraction_graceshutdown": ["serveraction", "graceshutdown"], "serveraction_hardreset": ["serveraction", "hardreset"], "serveraction_powerreset": ["serveraction", "powerreset"], "storage_get_controllers": ["storage", "get", "controllers"], "storage_get_physicaldisks": ["storage", "get", "pdisks"], "storage_get_virtualdisks": ["storage", "get", "vdisks"], "storage_get_batteries": ["storage", "get", "batteries"], "storage_get_enclosures": ["storage", "get", "enclosures"], "storage_get_smarts": ["storage", "get", "smarts"], "storage_get_summary": ["storage", "get", "summary"], } return fixed.get(action.strip()) def _racadm_available() -> bool: """Report whether the Dell ``racadm`` binary is on ``PATH``. Resolves the executable with :func:`shutil.which`. Called by :func:`run_idrac_racadm` to fail fast with a clear error when the host has no racadm installed. Returns: bool: ``True`` if ``racadm`` is found on ``PATH``, else ``False``. """ return shutil.which("racadm") is not None async def _run_subprocess( argv: list[str], timeout: float = 180.0, ) -> tuple[int, str, str]: """Run a subprocess from an argv list (no shell) and capture bounded output. Spawns the command with :func:`asyncio.create_subprocess_exec` (argv only, so no shell interpolation), waits up to ``timeout`` seconds, and kills the process on timeout. Captured stdout/stderr are decoded as UTF-8 with replacement and truncated to ``_MAX_STDOUT``/``_MAX_STDERR`` bytes. Called by :func:`run_idrac_racadm` (for the ``racadm`` argv) and :func:`run_smc_supermicro` (for the SMCIPMITool argv). This is a module-local helper; unrelated functions of the same name exist in other modules (``message_processor.admin_ops_commands``, ``tools.workflow_subagent_tools``) and are not this one. Args: argv: Full command vector; ``argv[0]`` is the executable. timeout: Wall-clock limit in seconds before the process is killed. Returns: tuple[int, str, str]: ``(exit_code, stdout, stderr)``. On timeout the code is ``-1`` and stderr is ``"Command timed out."``; a missing ``returncode`` is also reported as ``-1``. """ proc = await asyncio.create_subprocess_exec( *argv, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.wait() return -1, "", "Command timed out." code = proc.returncode if proc.returncode is not None else -1 out = stdout.decode("utf-8", errors="replace")[:_MAX_STDOUT] err = stderr.decode("utf-8", errors="replace")[:_MAX_STDERR] return code, out, err
[docs] async def run_idrac_racadm( host: str, user: str, password: str, action: str, ctx: Any = None, *, credential_profile: str = "", timeout: float = 180.0, ) -> str: """Run one allowlisted Dell iDRAC racadm action against a remote BMC. Backing implementation of the ``idrac_racadm`` tool. It invokes the local ``racadm`` binary in remote mode (``racadm -r`` host, ``-u`` user, ``-p`` password, then the action tail) with a fixed, pre-vetted subcommand tail chosen from ``_RACADM_ACTIONS`` — read and power actions plus storage ``get`` views. Because only constant argv tails (resolved by :func:`_racadm_tail`) are appended and no shell is used, no user-controlled token can reach the subcommand; firmware flash and destructive RAID actions are not exposed. When ``credential_profile`` is set it loads and merges saved fields via ``_cred_load``/``_cred_merge``. Requires ``UNSANDBOXED_EXEC`` via :func:`tools.ipmi_tools._check_priv`; checks the binary with :func:`_racadm_available` and validates host/user/password with :func:`_host_ok` and :func:`_user_ok`; then runs the argv through :func:`_run_subprocess` (no shell, bounded output, timeout-killed). Side effect: spawns the ``racadm`` process, which reaches out to the iDRAC over the network (and may change power state for ``serveraction_*``). Errors are returned as JSON, not raised. Called by the ``tool_loader`` dispatcher as the registered ``handler`` for the ``idrac_racadm`` entry in ``TOOLS``; no internal callers in this module. Args: host: iDRAC IP or hostname. user: iDRAC username. password: iDRAC password. action: Action name; must be in ``_RACADM_ACTIONS`` and map to a tail. ctx: Tool context providing ``redis``/``user_id``; required. credential_profile: Optional saved profile supplying host, user, password, and timeout. timeout: Subprocess wall-clock timeout in seconds (default 180). Returns: str: A JSON object with ``success`` (true on exit code 0), ``action``, ``exit_code``, and the captured ``stdout``/``stderr``, or ``{"success": False, "error": ...}`` on a missing binary, validation, or auth failure. """ if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx, "idrac_racadm") if auth_err: return auth_err if credential_profile and str(credential_profile).strip(): loaded = await _cred_load(_PREFIX_IDRAC_RACADM, credential_profile.strip(), ctx) if isinstance(loaded, str): return json.dumps({"success": False, "error": loaded}) merged = _cred_merge( loaded, { "host": host, "user": user, "password": password, "timeout": timeout, }, ) host = str(merged.get("host") or "") user = str(merged.get("user") or "") password = str(merged.get("password") or "") if "timeout" in merged: timeout = float(merged["timeout"]) if not _racadm_available(): return json.dumps({"success": False, "error": "racadm not found in PATH."}) if not _host_ok(host): return json.dumps({"success": False, "error": "Invalid or empty host."}) if not _user_ok(user): return json.dumps({"success": False, "error": "Invalid or empty user."}) if password is None or str(password) == "": return json.dumps({"success": False, "error": "password is required."}) act = (action or "").strip() tail = _racadm_tail(act) if act not in _RACADM_ACTIONS or tail is None: return json.dumps( { "success": False, "error": f"Invalid action. Must be one of: {', '.join(sorted(_RACADM_ACTIONS))}.", } ) argv = [ "racadm", "-r", host.strip(), "-u", user.strip(), "-p", str(password), *tail, ] code, out, err = await _run_subprocess(argv, timeout=timeout) return json.dumps( { "success": code == 0, "action": act, "exit_code": code, "stdout": out.rstrip(), "stderr": err.rstrip(), } )
# --- Supermicro -------------------------------------------------------------- _SMC_ACTIONS = frozenset( { "ipmi_power_status", "ipmi_power_on", "ipmi_power_off", "ipmi_power_cycle", "ipmi_power_reset", "ipmi_sensor_list", "ipmi_fru_print", "ipmi_lan_print", "ipmi_sel_list", } ) def _smc_bin() -> str | None: """Locate the Supermicro SMCIPMITool executable on ``PATH``. Probes the common binary names (``smcipmitool``, ``SMCIPMITool``, ``smcipmitool.sh``) with :func:`shutil.which` and returns the first match. Called by :func:`run_smc_supermicro` to find the binary or fail with a clear "not found in PATH" error. Returns: str | None: The resolved absolute path to the tool, or ``None`` if none of the candidate names are found. """ for name in ("smcipmitool", "SMCIPMITool", "smcipmitool.sh"): p = shutil.which(name) if p: return p return None def _smc_tail(action: str) -> list[str] | None: """Map an allowlisted SMCIPMITool action name to its fixed argv tail. Returns the pre-vetted ``ipmi`` subcommand tokens for a known action (e.g. ``"ipmi_power_status"`` -> ``["ipmi", "power", "status"]``). As with :func:`_racadm_tail`, only these constant tails are appended to argv so no user input reaches the subcommand. Called by :func:`run_smc_supermicro` to translate the requested action and to confirm (with ``_SMC_ACTIONS``) that it is allowed. Args: action: Action name; surrounding whitespace is stripped before lookup. Returns: list[str] | None: The fixed argv tail, or ``None`` if the action is not allowlisted. """ m: dict[str, list[str]] = { "ipmi_power_status": ["ipmi", "power", "status"], "ipmi_power_on": ["ipmi", "power", "on"], "ipmi_power_off": ["ipmi", "power", "off"], "ipmi_power_cycle": ["ipmi", "power", "cycle"], "ipmi_power_reset": ["ipmi", "power", "reset"], "ipmi_sensor_list": ["ipmi", "sensor", "list"], "ipmi_fru_print": ["ipmi", "fru", "print"], "ipmi_lan_print": ["ipmi", "lan", "print"], "ipmi_sel_list": ["ipmi", "sel", "list"], } return m.get((action or "").strip())
[docs] async def run_smc_supermicro( host: str, user: str, password: str, action: str, ctx: Any = None, *, credential_profile: str = "", timeout: float = 180.0, ) -> str: """Run one allowlisted Supermicro SMCIPMITool action against a BMC. Backing implementation of the ``smc_supermicro`` tool. It invokes the Supermicro CLI in remote IP mode (the resolved binary followed by host, user, password, then the ``ipmi`` subcommand tokens) with a fixed subcommand tail chosen from ``_SMC_ACTIONS`` — power, sensor, FRU, LAN, and SEL reads. Only the constant tails resolved by :func:`_smc_tail` are appended and no shell is used, so user input cannot reach the subcommand. When ``credential_profile`` is set it loads and merges saved fields via ``_cred_load``/``_cred_merge``. Requires ``UNSANDBOXED_EXEC`` via :func:`tools.ipmi_tools._check_priv`; locates the binary with :func:`_smc_bin` and validates host/user/password with :func:`_host_ok` and :func:`_user_ok`; then runs the argv through :func:`_run_subprocess` (no shell, bounded output, timeout-killed). Side effect: spawns the SMCIPMITool process, which contacts the BMC over the network (and may change power state for the ``ipmi_power_*`` actions). Errors are returned as JSON, not raised. Called by the ``tool_loader`` dispatcher as the registered ``handler`` for the ``smc_supermicro`` entry in ``TOOLS``; no internal callers in this module. Args: host: BMC IP address. user: BMC username. password: BMC password. action: Action name; must be in ``_SMC_ACTIONS`` and map to a tail. ctx: Tool context providing ``redis``/``user_id``; required. credential_profile: Optional saved profile supplying host, user, password, and timeout. timeout: Subprocess wall-clock timeout in seconds (default 180). Returns: str: A JSON object with ``success`` (true on exit code 0), ``action``, ``exit_code``, and the captured ``stdout``/``stderr``, or ``{"success": False, "error": ...}`` on a missing binary, validation, or auth failure. """ if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx, "smc_supermicro") if auth_err: return auth_err if credential_profile and str(credential_profile).strip(): loaded = await _cred_load( _PREFIX_SMC_SUPERMICRO, credential_profile.strip(), ctx ) if isinstance(loaded, str): return json.dumps({"success": False, "error": loaded}) merged = _cred_merge( loaded, { "host": host, "user": user, "password": password, "timeout": timeout, }, ) host = str(merged.get("host") or "") user = str(merged.get("user") or "") password = str(merged.get("password") or "") if "timeout" in merged: timeout = float(merged["timeout"]) binary = _smc_bin() if not binary: return json.dumps( { "success": False, "error": "smcipmitool (SMCIPMITool) not found in PATH.", } ) if not _host_ok(host): return json.dumps({"success": False, "error": "Invalid or empty host."}) if not _user_ok(user): return json.dumps({"success": False, "error": "Invalid or empty user."}) if password is None or str(password) == "": return json.dumps({"success": False, "error": "password is required."}) act = (action or "").strip() tail = _smc_tail(act) if act not in _SMC_ACTIONS or tail is None: return json.dumps( { "success": False, "error": f"Invalid action. Must be one of: {', '.join(sorted(_SMC_ACTIONS))}.", } ) argv = [binary, host.strip(), user.strip(), str(password), *tail] code, out, err = await _run_subprocess(argv, timeout=timeout) return json.dumps( { "success": code == 0, "action": act, "exit_code": code, "stdout": out.rstrip(), "stderr": err.rstrip(), } )
# --- Encrypted credential profiles -------------------------------------------
[docs] async def bmc_redfish_save_credentials( host: str, user: str, password: str, profile: str = "default", ctx: Any = None, *, verify_ssl: bool = False, port: int | None = None, timeout: float = 120.0, ) -> str: """Encrypt and persist a reusable Redfish BMC credential profile. Bundles the connection fields (host, user, password, ``verify_ssl``, port, timeout) so a later :func:`run_bmc_redfish` call can load them by name via ``credential_profile`` instead of re-supplying secrets each time. Requires the ``UNSANDBOXED_EXEC`` privilege, enforced via :func:`tools.ipmi_tools._check_priv`. Delegates storage to :func:`tools._credential_profile_store.save_profile` (aliased ``_cred_save``) under prefix ``bmc_redfish``, which encrypts the JSON with the per-user key and writes it to the user's Redis hash ``stargazer:bmc_redfish_credentials:{user_id}`` (field = profile name). This is a tool handler registered in ``TOOLS`` and is invoked by the tool dispatcher rather than by other functions in this module. Args: host: BMC hostname or IP (whitespace-stripped before saving). user: BMC username to store. password: BMC password to store (encrypted at rest). profile: Profile name to save under; defaults to ``"default"``. ctx: Tool context providing ``redis`` and ``user_id``; required. verify_ssl: Whether saved sessions should verify TLS certs. port: Optional HTTPS port to remember. timeout: HTTP timeout in seconds to remember. Returns: str: A JSON result string ``{"success": ...}`` from the credential store (or an error envelope when ``ctx`` is missing or the privilege check fails). """ if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx, "bmc_redfish") if auth_err: return auth_err data: dict[str, Any] = { "host": host.strip(), "user": user, "password": password, "verify_ssl": bool(verify_ssl), "port": port, "timeout": float(timeout), } return await _cred_save(_PREFIX_BMC_REDFISH, profile, data, ctx)
[docs] async def bmc_redfish_list_credentials(ctx: Any = None) -> str: """List the saved Redfish BMC credential profile names for the user. Returns only the profile names, never the decrypted secrets. Requires ``UNSANDBOXED_EXEC`` via :func:`_check_priv`, then delegates to :func:`tools._credential_profile_store.list_profile_names` (``_cred_list``), which reads the keys of the Redis hash ``stargazer:bmc_redfish_credentials:{user_id}``. Registered in ``TOOLS`` and called by the tool dispatcher. Args: ctx: Tool context providing ``redis`` and ``user_id``; required. Returns: str: JSON string ``{"success": True, "profiles": [...], "count": N}`` on success, or an error envelope when ``ctx`` is missing or unprivileged. """ if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx, "bmc_redfish") if auth_err: return auth_err return await _cred_list(_PREFIX_BMC_REDFISH, ctx)
[docs] async def bmc_redfish_delete_credentials( profile: str = "default", ctx: Any = None ) -> str: """Delete one saved Redfish BMC credential profile for the user. Requires ``UNSANDBOXED_EXEC`` via :func:`_check_priv`, then delegates to :func:`tools._credential_profile_store.delete_profile` (``_cred_delete``), which removes the ``profile`` field from the Redis hash ``stargazer:bmc_redfish_credentials:{user_id}``. Registered in ``TOOLS`` and invoked by the tool dispatcher. Args: profile: Name of the profile to delete; defaults to ``"default"``. ctx: Tool context providing ``redis`` and ``user_id``; required. Returns: str: JSON result string from the credential store, or an error envelope when ``ctx`` is missing or the privilege check fails. """ if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx, "bmc_redfish") if auth_err: return auth_err return await _cred_delete(_PREFIX_BMC_REDFISH, profile, ctx)
[docs] async def idrac_racadm_save_credentials( host: str, user: str, password: str, profile: str = "default", ctx: Any = None, *, timeout: float = 180.0, ) -> str: """Encrypt and persist a reusable Dell iDRAC racadm credential profile. Stores host, user, password, and subprocess timeout so a later :func:`run_idrac_racadm` call can load them by ``credential_profile`` name. Requires ``UNSANDBOXED_EXEC`` via :func:`_check_priv`, then delegates to :func:`tools._credential_profile_store.save_profile` (``_cred_save``) under prefix ``idrac_racadm``, encrypting the JSON to the user's Redis hash ``stargazer:idrac_racadm_credentials:{user_id}``. Registered in ``TOOLS`` and invoked by the tool dispatcher. Args: host: iDRAC IP or hostname (whitespace-stripped before saving). user: iDRAC username. password: iDRAC password (encrypted at rest). profile: Profile name to save under; defaults to ``"default"``. ctx: Tool context providing ``redis`` and ``user_id``; required. timeout: Subprocess timeout in seconds to remember. Returns: str: JSON result string from the credential store, or an error envelope when ``ctx`` is missing or unprivileged. """ if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx, "idrac_racadm") if auth_err: return auth_err data: dict[str, Any] = { "host": host.strip(), "user": user, "password": password, "timeout": float(timeout), } return await _cred_save(_PREFIX_IDRAC_RACADM, profile, data, ctx)
[docs] async def idrac_racadm_list_credentials(ctx: Any = None) -> str: """List the saved iDRAC racadm credential profile names for the user. Returns names only, no secrets. Requires ``UNSANDBOXED_EXEC`` via :func:`_check_priv`, then delegates to :func:`tools._credential_profile_store.list_profile_names` (``_cred_list``) reading the Redis hash ``stargazer:idrac_racadm_credentials:{user_id}``. Registered in ``TOOLS`` and invoked by the tool dispatcher. Args: ctx: Tool context providing ``redis`` and ``user_id``; required. Returns: str: JSON ``{"success": True, "profiles": [...], "count": N}`` on success, else an error envelope. """ if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx, "idrac_racadm") if auth_err: return auth_err return await _cred_list(_PREFIX_IDRAC_RACADM, ctx)
[docs] async def idrac_racadm_delete_credentials( profile: str = "default", ctx: Any = None ) -> str: """Delete one saved iDRAC racadm credential profile for the user. Requires ``UNSANDBOXED_EXEC`` via :func:`_check_priv`, then delegates to :func:`tools._credential_profile_store.delete_profile` (``_cred_delete``), removing the ``profile`` field from the Redis hash ``stargazer:idrac_racadm_credentials:{user_id}``. Registered in ``TOOLS`` and invoked by the tool dispatcher. Args: profile: Name of the profile to delete; defaults to ``"default"``. ctx: Tool context providing ``redis`` and ``user_id``; required. Returns: str: JSON result string from the credential store, or an error envelope when ``ctx`` is missing or unprivileged. """ if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx, "idrac_racadm") if auth_err: return auth_err return await _cred_delete(_PREFIX_IDRAC_RACADM, profile, ctx)
[docs] async def smc_supermicro_save_credentials( host: str, user: str, password: str, profile: str = "default", ctx: Any = None, *, timeout: float = 180.0, ) -> str: """Encrypt and persist a reusable Supermicro SMCIPMITool credential profile. Stores host, user, password, and subprocess timeout so a later :func:`run_smc_supermicro` call can load them by ``credential_profile`` name. Requires ``UNSANDBOXED_EXEC`` via :func:`_check_priv`, then delegates to :func:`tools._credential_profile_store.save_profile` (``_cred_save``) under prefix ``smc_supermicro``, encrypting the JSON to the user's Redis hash ``stargazer:smc_supermicro_credentials:{user_id}``. Registered in ``TOOLS`` and invoked by the tool dispatcher. Args: host: BMC IP address (whitespace-stripped before saving). user: BMC username. password: BMC password (encrypted at rest). profile: Profile name to save under; defaults to ``"default"``. ctx: Tool context providing ``redis`` and ``user_id``; required. timeout: Subprocess timeout in seconds to remember. Returns: str: JSON result string from the credential store, or an error envelope when ``ctx`` is missing or unprivileged. """ if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx, "smc_supermicro") if auth_err: return auth_err data: dict[str, Any] = { "host": host.strip(), "user": user, "password": password, "timeout": float(timeout), } return await _cred_save(_PREFIX_SMC_SUPERMICRO, profile, data, ctx)
[docs] async def smc_supermicro_list_credentials(ctx: Any = None) -> str: """List the saved Supermicro SMCIPMITool credential profile names. Returns names only, no secrets. Requires ``UNSANDBOXED_EXEC`` via :func:`_check_priv`, then delegates to :func:`tools._credential_profile_store.list_profile_names` (``_cred_list``) reading the Redis hash ``stargazer:smc_supermicro_credentials:{user_id}``. Registered in ``TOOLS`` and invoked by the tool dispatcher. Args: ctx: Tool context providing ``redis`` and ``user_id``; required. Returns: str: JSON ``{"success": True, "profiles": [...], "count": N}`` on success, else an error envelope. """ if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx, "smc_supermicro") if auth_err: return auth_err return await _cred_list(_PREFIX_SMC_SUPERMICRO, ctx)
[docs] async def smc_supermicro_delete_credentials( profile: str = "default", ctx: Any = None ) -> str: """Delete one saved Supermicro SMCIPMITool credential profile. Requires ``UNSANDBOXED_EXEC`` via :func:`_check_priv`, then delegates to :func:`tools._credential_profile_store.delete_profile` (``_cred_delete``), removing the ``profile`` field from the Redis hash ``stargazer:smc_supermicro_credentials:{user_id}``. Registered in ``TOOLS`` and invoked by the tool dispatcher. Args: profile: Name of the profile to delete; defaults to ``"default"``. ctx: Tool context providing ``redis`` and ``user_id``; required. Returns: str: JSON result string from the credential store, or an error envelope when ``ctx`` is missing or unprivileged. """ if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx, "smc_supermicro") if auth_err: return auth_err return await _cred_delete(_PREFIX_SMC_SUPERMICRO, profile, ctx)
_BMC_REDFISH_DESC = ( "HTTPS Redfish to a BMC (HPE iLO, Dell iDRAC with Redfish, Lenovo XCC, many " "Supermicro/Cisco controllers). Enum actions only — no arbitrary URLs. Covers " "inventory, thermal/power readings, logs, firmware/software inventory views, " "virtual media listing, boot properties, boot override (PATCH), and " "ComputerSystem.Reset. Default verify_ssl=false (self-signed BMC certs); MITM " "risk if used on untrusted networks. Not for generic ipmitool or Dell racadm — " "use ipmi_control or idrac_racadm for those. Requires UNSANDBOXED_EXEC." ) _IDRAC_DESC = ( "Dell iDRAC remote management via the system racadm binary (-r -u -P). " "Allowlisted read and power actions including storage get (controllers, pdisks, " "vdisks, etc.). Does not expose firmware flash or destructive RAID. For " "standards-based IPMI use ipmi_control; for Redfish use bmc_redfish. Requires " "UNSANDBOXED_EXEC and racadm in PATH." ) _SMC_DESC = ( "Supermicro BMC via smcipmitool/SMCIPMITool (IP, user, password, then ipmi " "subcommands). Allowlisted power, sensor, FRU, LAN, SEL. For Redfish on " "Supermicro use bmc_redfish; for generic IPMI use ipmi_control. Requires " "UNSANDBOXED_EXEC and the SMC tool in PATH." ) _BMC_REDFISH_MAIN_DESC = ( _BMC_REDFISH_DESC + " credential_profile loads saved host, user, password, TLS options." ) _IDRAC_MAIN_DESC = _IDRAC_DESC + " credential_profile loads saved host, user, password." _SMC_MAIN_DESC = _SMC_DESC + " credential_profile loads saved host, user, password." _BMC_REDFISH_PARAMS = { "type": "object", "properties": { "host": { "type": "string", "description": "BMC hostname or IP (optional https://). Omit if credential_profile supplies it.", }, "user": {"type": "string", "description": "BMC username."}, "password": {"type": "string", "description": "BMC password."}, "credential_profile": { "type": "string", "default": "", "description": "Load host, user, password, verify_ssl, port, timeout from a saved encrypted profile.", }, "action": { "type": "string", "enum": sorted(_REDFISH_ACTIONS), "description": "Redfish operation to perform.", }, "system_id": { "type": "string", "description": "Redfish Systems member id (e.g. 1). Empty = auto-detect first.", }, "chassis_id": { "type": "string", "description": "Chassis member id. Empty = auto-detect first.", }, "manager_id": { "type": "string", "description": "Managers member id. Empty = auto-detect first.", }, "reset_type": { "type": "string", "enum": sorted(_RESET_TYPES), "description": "For system_reset only (ComputerSystem.Reset).", }, "boot_override_enabled": { "type": "string", "description": "For boot_override: e.g. Once, Continuous, Disabled.", }, "boot_override_target": { "type": "string", "description": "For boot_override: e.g. Pxe, Cd, Usb, Hdd.", }, "verify_ssl": { "type": "boolean", "description": "Verify TLS certificates (default false for typical BMC certs).", }, "port": { "type": "integer", "description": "Optional HTTPS port if not 443 and not in host URL.", }, "timeout": { "type": "number", "description": "HTTP timeout seconds (default 120).", }, }, "required": ["action"], } _IDRAC_PARAMS = { "type": "object", "properties": { "host": { "type": "string", "description": "iDRAC IP or hostname (optional if credential_profile set).", }, "user": {"type": "string", "description": "iDRAC username."}, "password": {"type": "string", "description": "iDRAC password."}, "credential_profile": { "type": "string", "default": "", "description": "Load host, user, password, timeout from a saved encrypted profile.", }, "action": { "type": "string", "enum": sorted(_RACADM_ACTIONS), "description": "Allowlisted racadm remote action.", }, "timeout": {"type": "number", "description": "Subprocess timeout seconds."}, }, "required": ["action"], } _SMC_PARAMS = { "type": "object", "properties": { "host": { "type": "string", "description": "BMC IP address (optional if credential_profile set).", }, "user": {"type": "string", "description": "BMC username."}, "password": {"type": "string", "description": "BMC password."}, "credential_profile": { "type": "string", "default": "", "description": "Load host, user, password, timeout from a saved encrypted profile.", }, "action": { "type": "string", "enum": sorted(_SMC_ACTIONS), "description": "Allowlisted smcipmitool ipmi action.", }, "timeout": {"type": "number", "description": "Subprocess timeout seconds."}, }, "required": ["action"], } TOOLS = [ { "name": "bmc_redfish_save_credentials", "description": ( "Save Redfish BMC host, user, password, verify_ssl, port, and timeout encrypted " "per-user for bmc_redfish. Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "host": {"type": "string"}, "user": {"type": "string"}, "password": {"type": "string"}, "profile": {"type": "string", "default": "default"}, "verify_ssl": {"type": "boolean", "default": False}, "port": {"type": "integer", "description": "Optional HTTPS port."}, "timeout": {"type": "number", "default": 120}, }, "required": ["host", "user", "password"], }, "handler": bmc_redfish_save_credentials, }, { "name": "bmc_redfish_list_credentials", "description": "List saved Redfish BMC profile names. Requires UNSANDBOXED_EXEC.", "parameters": {"type": "object", "properties": {}}, "handler": bmc_redfish_list_credentials, }, { "name": "bmc_redfish_delete_credentials", "description": "Delete a saved Redfish BMC profile. Requires UNSANDBOXED_EXEC.", "parameters": { "type": "object", "properties": {"profile": {"type": "string", "default": "default"}}, "required": ["profile"], }, "handler": bmc_redfish_delete_credentials, }, { "name": "bmc_redfish", "description": _BMC_REDFISH_MAIN_DESC, "parameters": _BMC_REDFISH_PARAMS, "handler": run_bmc_redfish, }, { "name": "idrac_racadm_save_credentials", "description": ( "Save iDRAC racadm host, user, password, and timeout encrypted per-user. " "Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "host": {"type": "string"}, "user": {"type": "string"}, "password": {"type": "string"}, "profile": {"type": "string", "default": "default"}, "timeout": {"type": "number", "default": 180}, }, "required": ["host", "user", "password"], }, "handler": idrac_racadm_save_credentials, }, { "name": "idrac_racadm_list_credentials", "description": "List saved iDRAC racadm profile names. Requires UNSANDBOXED_EXEC.", "parameters": {"type": "object", "properties": {}}, "handler": idrac_racadm_list_credentials, }, { "name": "idrac_racadm_delete_credentials", "description": "Delete a saved iDRAC racadm profile. Requires UNSANDBOXED_EXEC.", "parameters": { "type": "object", "properties": {"profile": {"type": "string", "default": "default"}}, "required": ["profile"], }, "handler": idrac_racadm_delete_credentials, }, { "name": "idrac_racadm", "description": _IDRAC_MAIN_DESC, "parameters": _IDRAC_PARAMS, "handler": run_idrac_racadm, }, { "name": "smc_supermicro_save_credentials", "description": ( "Save Supermicro SMCIPMITool host, user, password, and timeout encrypted per-user. " "Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "host": {"type": "string"}, "user": {"type": "string"}, "password": {"type": "string"}, "profile": {"type": "string", "default": "default"}, "timeout": {"type": "number", "default": 180}, }, "required": ["host", "user", "password"], }, "handler": smc_supermicro_save_credentials, }, { "name": "smc_supermicro_list_credentials", "description": "List saved Supermicro SMC profile names. Requires UNSANDBOXED_EXEC.", "parameters": {"type": "object", "properties": {}}, "handler": smc_supermicro_list_credentials, }, { "name": "smc_supermicro_delete_credentials", "description": "Delete a saved Supermicro SMC profile. Requires UNSANDBOXED_EXEC.", "parameters": { "type": "object", "properties": {"profile": {"type": "string", "default": "default"}}, "required": ["profile"], }, "handler": smc_supermicro_delete_credentials, }, { "name": "smc_supermicro", "description": _SMC_MAIN_DESC, "parameters": _SMC_PARAMS, "handler": run_smc_supermicro, }, ]