Source code for tools.ipmi_tools

"""IPMI / BMC control via the system ``ipmitool`` binary.

Uses ``-I lanplus`` over the network. Requires ``ipmitool`` installed on the bot host.

**Security:** requires ``UNSANDBOXED_EXEC`` — bare-metal power and sensor access.

Commands are built as argv lists (no shell) to avoid injection.
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
import shutil
from typing import Any

from tools._credential_profile_store import (
    delete_profile as _cred_delete,
    list_profile_names as _cred_list,
    load_profile as _cred_load,
    merge_profile as _cred_merge,
    save_profile as _cred_save,
)

logger = logging.getLogger(__name__)

CRED_PREFIX = "ipmi"

_MAX_STDOUT = 200_000
_MAX_STDERR = 20_000

_VALID_ACTIONS = frozenset(
    {
        "power_status",
        "power_on",
        "power_off",
        "power_reset",
        "power_cycle",
        "sensor_read",
    }
)

_POWER_SUBCOMMANDS = {
    "power_status": ("power", "status"),
    "power_on": ("power", "on"),
    "power_off": ("power", "off"),
    "power_reset": ("power", "reset"),
    "power_cycle": ("power", "cycle"),
}


async def _check_priv(ctx: Any, tool_name: str = "ipmi_control") -> str | None:
    """Authorize the caller for bare-metal IPMI access, gating on ``UNSANDBOXED_EXEC``.

    Every IPMI handler in this module begins by calling this guard: ipmitool can
    power-cycle physical hosts, so access is restricted to users holding the
    dangerous ``UNSANDBOXED_EXEC`` privilege bit.

    It imports ``PRIVILEGES`` and :func:`has_privilege` from
    ``tools.alter_privileges`` and reads ``ctx.redis``, ``ctx.config``, and
    ``ctx.user_id`` off the :class:`ToolContext`; the privilege check itself reads
    the user's stored privilege mask from Redis. On denial it logs a ``SECURITY``
    warning naming the user and tool. This function performs no IPMI work — it only
    returns a refusal payload or ``None``.

    Called by :func:`run` (the ``ipmi_control`` handler) and by
    :func:`_ipmi_save_credentials`, :func:`_ipmi_list_credentials`, and
    :func:`_ipmi_delete_credentials`; no external callers were found. Tests patch
    this symbol to bypass the privilege gate.

    Args:
        ctx: The :class:`ToolContext`, providing ``redis``, ``config``, and
            ``user_id`` used for the privilege lookup.
        tool_name: Label used only in the denial log line to identify which IPMI
            tool was attempted. Defaults to ``"ipmi_control"``.

    Returns:
        str | None: ``None`` when the caller is authorized, otherwise a JSON
        string ``{"success": False, "error": ...}`` describing why access was
        refused (missing privilege, or the privilege system being unavailable).
    """
    try:
        from tools.alter_privileges import PRIVILEGES, has_privilege

        redis = getattr(ctx, "redis", None)
        config = getattr(ctx, "config", None)
        user_id = getattr(ctx, "user_id", "") or ""
        if not await has_privilege(
            redis,
            user_id,
            PRIVILEGES["UNSANDBOXED_EXEC"],
            config,
        ):
            logger.warning(
                "SECURITY: User %s attempted %s without UNSANDBOXED_EXEC",
                user_id,
                tool_name,
            )
            return json.dumps(
                {
                    "success": False,
                    "error": (
                        "The user does not have the UNSANDBOXED_EXEC privilege. "
                        "Ask an admin to grant it with the alter_privileges tool."
                    ),
                }
            )
    except ImportError:
        return json.dumps({"success": False, "error": "Privilege system unavailable."})
    return None


def _bad_token(s: str) -> bool:
    """Report whether a string contains control characters unsafe for an argv token.

    Used to reject hosts and usernames that embed newlines, carriage returns, or
    NUL bytes before they reach the ipmitool argv list. Commands are built as argv
    lists rather than shell strings, so this guards against malformed/spoofed
    arguments rather than shell injection.

    Called by :func:`_host_ok` and :func:`_user_ok` within this module; no
    external callers were found.

    Args:
        s: The candidate token to inspect (typically a host or username).

    Returns:
        bool: ``True`` if ``s`` contains a newline, carriage return, or NUL byte;
        ``False`` for an empty string or one free of those characters.
    """
    if not s:
        return False
    return any(c in s for c in "\n\r\x00")


def _host_ok(host: str) -> bool:
    """Validate a BMC host/IP token before it is placed in the ipmitool argv.

    Accepts a non-empty value up to 512 characters that contains no unsafe
    control characters, delegating the character check to :func:`_bad_token`.

    Called by :func:`run` and :func:`_ipmi_save_credentials` to reject bad hosts
    before invoking ipmitool or saving a credential profile; no external callers
    were found.

    Args:
        host: The BMC IP address or hostname to validate (leading/trailing
            whitespace is stripped before checking).

    Returns:
        bool: ``True`` if the trimmed host is non-empty, at most 512 characters,
        and free of control characters; ``False`` otherwise.
    """
    h = (host or "").strip()
    if not h or len(h) > 512 or _bad_token(h):
        return False
    return True


def _user_ok(user: str) -> bool:
    """Validate an IPMI username token before it is placed in the ipmitool argv.

    Accepts a non-empty value up to 256 characters that contains no unsafe
    control characters, delegating the character check to :func:`_bad_token`.

    Called by :func:`run` and :func:`_ipmi_save_credentials` to reject bad
    usernames before invoking ipmitool or saving a credential profile; no
    external callers were found.

    Args:
        user: The IPMI username to validate (leading/trailing whitespace is
            stripped before checking).

    Returns:
        bool: ``True`` if the trimmed username is non-empty, at most 256
        characters, and free of control characters; ``False`` otherwise.
    """
    u = (user or "").strip()
    if not u or len(u) > 256 or _bad_token(u):
        return False
    return True


def _ipmitool_available() -> bool:
    """Report whether the ``ipmitool`` binary is present on the host ``PATH``.

    Uses :func:`shutil.which` to probe for the executable so that :func:`run`
    can return a clean error instead of failing inside subprocess creation when
    the binary is not installed.

    Called by :func:`run` before building/executing any command; no external
    callers were found.

    Returns:
        bool: ``True`` if ``ipmitool`` is resolvable on ``PATH``, else ``False``.
    """
    return shutil.which("ipmitool") is not None


def _build_cmd(host: str, user: str, password: str, action: str) -> list[str]:
    """Assemble the ``ipmitool`` argv list for a validated action.

    Builds a ``-I lanplus`` over-the-network invocation with the host, user, and
    password flags, then appends the action-specific subcommand: a power verb from
    :data:`_POWER_SUBCOMMANDS`, or ``sensor`` for ``sensor_read``. The argv form
    (no shell) is what keeps untrusted values from being interpreted as shell
    syntax.

    Called by :func:`run` after the action and credentials have already passed
    validation; no external callers were found.

    Args:
        host: The BMC host/IP (whitespace-stripped into the argv).
        user: The IPMI username (whitespace-stripped into the argv).
        password: The IPMI password, passed verbatim as the ``-P`` argument.
        action: One of the keys in :data:`_POWER_SUBCOMMANDS` or ``"sensor_read"``.

    Returns:
        list[str]: The complete ipmitool argument vector, ready for
        :func:`_run_ipmitool`.

    Raises:
        ValueError: If ``action`` is neither a known power subcommand nor
            ``"sensor_read"``.
    """
    base = [
        "ipmitool",
        "-I",
        "lanplus",
        "-H",
        host.strip(),
        "-U",
        user.strip(),
        "-P",
        password,
    ]
    if action in _POWER_SUBCOMMANDS:
        base.extend(_POWER_SUBCOMMANDS[action])
    elif action == "sensor_read":
        base.extend(["sensor"])
    else:
        raise ValueError(f"unknown action: {action}")
    return base


async def _run_ipmitool(argv: list[str], timeout: int = 120) -> tuple[int, str, str]:
    """Execute an ipmitool argv asynchronously and capture its result.

    Spawns the subprocess with :func:`asyncio.create_subprocess_exec` (no shell),
    awaits completion under a timeout, and decodes stdout/stderr as UTF-8 with
    replacement, truncating them to :data:`_MAX_STDOUT` / :data:`_MAX_STDERR`. On
    timeout the process is killed and reaped. This is the only place in the module
    that touches the operating system / spawns a process — its side effect is the
    real BMC interaction (e.g. powering a host on or off).

    Called by :func:`run` with the argv produced by :func:`_build_cmd`; no
    external callers were found.

    Args:
        argv: The full command vector to execute (first element is ``ipmitool``).
        timeout: Seconds to wait for completion before killing the process.
            Defaults to ``120``.

    Returns:
        tuple[int, str, str]: ``(exit_code, stdout, stderr)``. On timeout the exit
        code is ``-1``, stdout is empty, and stderr is ``"ipmitool timed out."``;
        a ``None`` return code from the process is normalized to ``-1``.
    """
    proc = await asyncio.create_subprocess_exec(
        *argv,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    try:
        stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
    except asyncio.TimeoutError:
        proc.kill()
        await proc.wait()
        return -1, "", "ipmitool timed out."
    code = proc.returncode if proc.returncode is not None else -1
    out = stdout.decode("utf-8", errors="replace")[:_MAX_STDOUT]
    err = stderr.decode("utf-8", errors="replace")[:_MAX_STDERR]
    return code, out, err


TOOL_NAME = "ipmi_control"
TOOL_DESCRIPTION = (
    "Generic IPMI over the network: power status/on/off/reset/cycle and full sensor "
    "listing via the system ipmitool binary (-I lanplus). Use this for standards-based "
    "IPMI when you do not need Redfish/HTTPS, Dell racadm, or Supermicro SMCIPMITool. "
    "For HTTPS Redfish (HPE iLO, Dell iDRAC, Lenovo XCC, many BMCs), use bmc_redfish; "
    "for Dell racadm-only features, use idrac_racadm; for Supermicro SMCIPMITool, use "
    "smc_supermicro. Requires UNSANDBOXED_EXEC and ipmitool on the host."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "host": {
            "type": "string",
            "description": "BMC IP address or hostname.",
        },
        "user": {
            "type": "string",
            "description": "IPMI username.",
        },
        "password": {
            "type": "string",
            "description": "IPMI password.",
        },
        "credential_profile": {
            "type": "string",
            "default": "",
            "description": "Load host, user, password from a saved encrypted profile.",
        },
        "action": {
            "type": "string",
            "description": (
                "power_status, power_on, power_off, power_reset, power_cycle, or "
                "sensor_read (full sensor listing)."
            ),
            "enum": sorted(_VALID_ACTIONS),
        },
    },
    "required": ["action"],
}


async def _ipmi_save_credentials(
    host: str,
    user: str,
    password: str,
    profile: str = "default",
    ctx: Any = None,
) -> str:
    """Validate and persist IPMI host/user/password as an encrypted credential profile.

    Handler for the ``ipmi_save_credentials`` tool. After enforcing the
    ``UNSANDBOXED_EXEC`` privilege and validating the inputs, it stores the three
    fields under a named profile so later ``ipmi_control`` calls can supply only
    ``credential_profile``.

    It calls :func:`_check_priv` for authorization, validates with :func:`_host_ok`
    and :func:`_user_ok`, and delegates persistence to ``_cred_save``
    (``tools._credential_profile_store.save_profile``), which AES-GCM-encrypts the
    JSON with the per-user key and writes it to the Redis hash
    ``stargazer:ipmi_credentials:{user_id}`` under field ``profile``.

    Registered in :data:`TOOLS` as the ``ipmi_save_credentials`` handler and
    dispatched by the tool loader/executor; no direct internal callers were found.
    Tests invoke it after patching :func:`_check_priv`.

    Args:
        host: BMC IP address or hostname to store.
        user: IPMI username to store.
        password: IPMI password to store (required, must be non-empty).
        profile: Name of the profile slot to write. Defaults to ``"default"``.
        ctx: The :class:`ToolContext` (provides ``redis``/``user_id``/``config``);
            if ``None`` the call fails early.

    Returns:
        str: A JSON string. ``{"success": False, "error": ...}`` for a missing
        context, denied privilege, or invalid/empty host/user/password; otherwise
        the JSON result produced by the credential store's save path.
    """
    if ctx is None:
        return json.dumps({"success": False, "error": "No context."})
    auth_err = await _check_priv(ctx, "ipmi_save_credentials")
    if auth_err:
        return auth_err
    if not _host_ok(host):
        return json.dumps({"success": False, "error": "Invalid or empty host."})
    if not _user_ok(user):
        return json.dumps({"success": False, "error": "Invalid or empty user."})
    if password is None or str(password) == "":
        return json.dumps({"success": False, "error": "password is required."})
    data = {
        "host": host.strip(),
        "user": user.strip(),
        "password": str(password),
    }
    return await _cred_save(CRED_PREFIX, profile, data, ctx)


async def _ipmi_list_credentials(ctx: Any = None) -> str:
    """List the names of the caller's saved IPMI credential profiles.

    Handler for the ``ipmi_list_credentials`` tool. Returns only the profile
    names (never the decrypted secrets), after enforcing the ``UNSANDBOXED_EXEC``
    privilege.

    It calls :func:`_check_priv` for authorization and delegates to ``_cred_list``
    (``tools._credential_profile_store.list_profile_names``), which reads the
    field names of the Redis hash ``stargazer:ipmi_credentials:{user_id}``.

    Registered in :data:`TOOLS` as the ``ipmi_list_credentials`` handler and
    dispatched by the tool loader/executor; no direct internal callers were found.

    Args:
        ctx: The :class:`ToolContext` (provides ``redis``/``user_id``); if
            ``None`` the call fails early.

    Returns:
        str: A JSON string. ``{"success": False, "error": ...}`` for a missing
        context or denied privilege; otherwise the credential store's JSON listing
        of profile names and their count.
    """
    if ctx is None:
        return json.dumps({"success": False, "error": "No context."})
    auth_err = await _check_priv(ctx, "ipmi_list_credentials")
    if auth_err:
        return auth_err
    return await _cred_list(CRED_PREFIX, ctx)


async def _ipmi_delete_credentials(profile: str = "default", ctx: Any = None) -> str:
    """Delete one of the caller's saved IPMI credential profiles.

    Handler for the ``ipmi_delete_credentials`` tool. After enforcing the
    ``UNSANDBOXED_EXEC`` privilege, it removes the named profile slot.

    It calls :func:`_check_priv` for authorization and delegates to
    ``_cred_delete`` (``tools._credential_profile_store.delete_profile``), which
    issues an ``HDEL`` against the Redis hash
    ``stargazer:ipmi_credentials:{user_id}`` for field ``profile``.

    Registered in :data:`TOOLS` as the ``ipmi_delete_credentials`` handler and
    dispatched by the tool loader/executor; no direct internal callers were found.

    Args:
        profile: Name of the profile to delete. Defaults to ``"default"``.
        ctx: The :class:`ToolContext` (provides ``redis``/``user_id``); if
            ``None`` the call fails early.

    Returns:
        str: A JSON string. ``{"success": False, "error": ...}`` for a missing
        context or denied privilege; otherwise the credential store's JSON result
        indicating whether a profile was deleted.
    """
    if ctx is None:
        return json.dumps({"success": False, "error": "No context."})
    auth_err = await _check_priv(ctx, "ipmi_delete_credentials")
    if auth_err:
        return auth_err
    return await _cred_delete(CRED_PREFIX, profile, ctx)


[docs] async def run( host: str, user: str, password: str, action: str, ctx: Any = None, *, credential_profile: str = "", ) -> str: """Execute an IPMI power or sensor action against a BMC and return its result. Handler for the ``ipmi_control`` tool — the main entry point of this module. It performs the privileged, real-world side effect of talking to bare-metal hardware: querying or changing a host's power state, or reading its full sensor list, over the network via the system ipmitool binary. The flow gates on :func:`_check_priv` (``UNSANDBOXED_EXEC`` required), then — when ``credential_profile`` is given — loads a saved encrypted bundle with ``_cred_load`` and overlays explicit args via ``_cred_merge`` so stored host/user/password can fill in the blanks. It confirms ipmitool is installed (:func:`_ipmitool_available`), validates the action against :data:`_VALID_ACTIONS`, and validates the host/user/password with :func:`_host_ok` / :func:`_user_ok`. It then builds the argv with :func:`_build_cmd` and runs it through :func:`_run_ipmitool` (no shell). For ``sensor_read`` it additionally parses the pipe-delimited sensor table into structured rows. Dispatched by the tool runtime as the ``ipmi_control`` handler (registered in :data:`TOOLS`); no direct internal callers were found. Args: host (str): BMC IP address or hostname (may be supplied by the profile). user (str): IPMI username (may be supplied by the profile). password (str): IPMI password (may be supplied by the profile). action (str): One of :data:`_VALID_ACTIONS` — a power verb or ``sensor_read``. ctx (Any): The :class:`ToolContext` for the privilege check and credential store; ``None`` fails the call. credential_profile (str): Optional saved profile name whose host/user/password are merged in (explicit args take precedence). Returns: str: A JSON string with ``success``, ``action``, ``exit_code``, ``stdout`` and ``stderr`` (plus parsed ``sensors``/``sensor_count`` for a successful ``sensor_read``), or ``{"success": False, "error": ...}`` on a privilege, validation, or environment failure. """ if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx) if auth_err: return auth_err if credential_profile and str(credential_profile).strip(): loaded = await _cred_load(CRED_PREFIX, credential_profile.strip(), ctx) if isinstance(loaded, str): return json.dumps({"success": False, "error": loaded}) merged = _cred_merge( loaded, {"host": host, "user": user, "password": password}, ) host = str(merged.get("host") or "") user = str(merged.get("user") or "") password = str(merged.get("password") or "") if not _ipmitool_available(): return json.dumps( { "success": False, "error": "ipmitool not found in PATH.", } ) act = (action or "").strip() if act not in _VALID_ACTIONS: return json.dumps( { "success": False, "error": f"Invalid action. Must be one of: {', '.join(sorted(_VALID_ACTIONS))}.", } ) if not _host_ok(host): return json.dumps({"success": False, "error": "Invalid or empty host."}) if not _user_ok(user): return json.dumps({"success": False, "error": "Invalid or empty user."}) if password is None or str(password) == "": return json.dumps({"success": False, "error": "password is required."}) try: argv = _build_cmd(host, user, str(password), act) except ValueError as exc: return json.dumps({"success": False, "error": str(exc)}) code, out, err = await _run_ipmitool(argv) payload: dict[str, Any] = { "success": code == 0, "action": act, "exit_code": code, "stdout": out.rstrip(), "stderr": err.rstrip(), } if act == "sensor_read" and code == 0: rows: list[dict[str, str]] = [] for line in out.splitlines(): line = line.rstrip() if not line or line.startswith("Locating") or line.startswith("Sensor"): continue parts = [p.strip() for p in line.split("|")] if len(parts) >= 3: rows.append( { "name": parts[0], "value": parts[1], "unit": parts[2], "status": parts[3] if len(parts) > 3 else "", } ) else: rows.append({"raw": line}) payload["sensors"] = rows payload["sensor_count"] = len(rows) return json.dumps(payload, default=str)
_TOOLS_IPMI_DESC = ( TOOL_DESCRIPTION + " credential_profile on ipmi_control loads saved host/user/password." ) TOOLS = [ { "name": "ipmi_save_credentials", "description": "Save IPMI/BMC host, user, and password encrypted per-user. Requires UNSANDBOXED_EXEC.", "parameters": { "type": "object", "properties": { "host": {"type": "string"}, "user": {"type": "string"}, "password": {"type": "string"}, "profile": {"type": "string", "default": "default"}, }, "required": ["host", "user", "password"], }, "handler": _ipmi_save_credentials, }, { "name": "ipmi_list_credentials", "description": "List saved IPMI profile names.", "parameters": {"type": "object", "properties": {}}, "handler": _ipmi_list_credentials, }, { "name": "ipmi_delete_credentials", "description": "Delete a saved IPMI profile.", "parameters": { "type": "object", "properties": {"profile": {"type": "string", "default": "default"}}, "required": ["profile"], }, "handler": _ipmi_delete_credentials, }, { "name": "ipmi_control", "description": _TOOLS_IPMI_DESC, "parameters": TOOL_PARAMETERS, "handler": run, }, ]