Source code for tools.ipmi_tools

"""IPMI / BMC control via the system ``ipmitool`` binary.

Uses ``-I lanplus`` over the network. Requires ``ipmitool`` installed on the bot host.

**Security:** requires ``UNSANDBOXED_EXEC`` — bare-metal power and sensor access.

Commands are built as argv lists (no shell) to avoid injection.
"""

from __future__ import annotations

import asyncio
import json
import logging
import shutil
from typing import Any

logger = logging.getLogger(__name__)

_MAX_STDOUT = 200_000
_MAX_STDERR = 20_000

_VALID_ACTIONS = frozenset({
    "power_status",
    "power_on",
    "power_off",
    "power_reset",
    "power_cycle",
    "sensor_read",
})

_POWER_SUBCOMMANDS = {
    "power_status": ("power", "status"),
    "power_on": ("power", "on"),
    "power_off": ("power", "off"),
    "power_reset": ("power", "reset"),
    "power_cycle": ("power", "cycle"),
}


async def _check_priv(ctx: Any) -> str | None:
    try:
        from tools.alter_privileges import PRIVILEGES, has_privilege

        redis = getattr(ctx, "redis", None)
        config = getattr(ctx, "config", None)
        user_id = getattr(ctx, "user_id", "") or ""
        if not await has_privilege(
            redis, user_id, PRIVILEGES["UNSANDBOXED_EXEC"], config,
        ):
            logger.warning(
                "SECURITY: User %s attempted ipmi_control without UNSANDBOXED_EXEC",
                user_id,
            )
            return json.dumps({
                "success": False,
                "error": (
                    "The user does not have the UNSANDBOXED_EXEC privilege. "
                    "Ask an admin to grant it with the alter_privileges tool."
                ),
            })
    except ImportError:
        return json.dumps({"success": False, "error": "Privilege system unavailable."})
    return None


def _bad_token(s: str) -> bool:
    if not s:
        return False
    return any(c in s for c in "\n\r\x00")


def _host_ok(host: str) -> bool:
    h = (host or "").strip()
    if not h or len(h) > 512 or _bad_token(h):
        return False
    return True


def _user_ok(user: str) -> bool:
    u = (user or "").strip()
    if not u or len(u) > 256 or _bad_token(u):
        return False
    return True


def _ipmitool_available() -> bool:
    return shutil.which("ipmitool") is not None


def _build_cmd(host: str, user: str, password: str, action: str) -> list[str]:
    base = [
        "ipmitool",
        "-I", "lanplus",
        "-H", host.strip(),
        "-U", user.strip(),
        "-P", password,
    ]
    if action in _POWER_SUBCOMMANDS:
        base.extend(_POWER_SUBCOMMANDS[action])
    elif action == "sensor_read":
        base.extend(["sensor"])
    else:
        raise ValueError(f"unknown action: {action}")
    return base


async def _run_ipmitool(argv: list[str], timeout: int = 120) -> tuple[int, str, str]:
    proc = await asyncio.create_subprocess_exec(
        *argv,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    try:
        stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
    except asyncio.TimeoutError:
        proc.kill()
        await proc.wait()
        return -1, "", "ipmitool timed out."
    code = proc.returncode if proc.returncode is not None else -1
    out = stdout.decode("utf-8", errors="replace")[:_MAX_STDOUT]
    err = stderr.decode("utf-8", errors="replace")[:_MAX_STDERR]
    return code, out, err


TOOL_NAME = "ipmi_control"
TOOL_DESCRIPTION = (
    "Control a machine via IPMI (BMC): power status/on/off/reset/cycle, or read "
    "sensors. Runs the system ipmitool with -I lanplus. Requires UNSANDBOXED_EXEC "
    "and ipmitool installed on the host."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "host": {
            "type": "string",
            "description": "BMC IP address or hostname.",
        },
        "user": {
            "type": "string",
            "description": "IPMI username.",
        },
        "password": {
            "type": "string",
            "description": "IPMI password.",
        },
        "action": {
            "type": "string",
            "description": (
                "power_status, power_on, power_off, power_reset, power_cycle, or "
                "sensor_read (full sensor listing)."
            ),
            "enum": sorted(_VALID_ACTIONS),
        },
    },
    "required": ["host", "user", "password", "action"],
}


[docs] async def run( host: str, user: str, password: str, action: str, ctx: Any = None, ) -> str: """Run ipmitool and return JSON with stdout/stderr and exit code.""" if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx) if auth_err: return auth_err if not _ipmitool_available(): return json.dumps({ "success": False, "error": "ipmitool not found in PATH.", }) act = (action or "").strip() if act not in _VALID_ACTIONS: return json.dumps({ "success": False, "error": f"Invalid action. Must be one of: {', '.join(sorted(_VALID_ACTIONS))}.", }) if not _host_ok(host): return json.dumps({"success": False, "error": "Invalid or empty host."}) if not _user_ok(user): return json.dumps({"success": False, "error": "Invalid or empty user."}) if password is None or str(password) == "": return json.dumps({"success": False, "error": "password is required."}) try: argv = _build_cmd(host, user, str(password), act) except ValueError as exc: return json.dumps({"success": False, "error": str(exc)}) code, out, err = await _run_ipmitool(argv) payload: dict[str, Any] = { "success": code == 0, "action": act, "exit_code": code, "stdout": out.rstrip(), "stderr": err.rstrip(), } if act == "sensor_read" and code == 0: rows: list[dict[str, str]] = [] for line in out.splitlines(): line = line.rstrip() if not line or line.startswith("Locating") or line.startswith("Sensor"): continue parts = [p.strip() for p in line.split("|")] if len(parts) >= 3: rows.append({ "name": parts[0], "value": parts[1], "unit": parts[2], "status": parts[3] if len(parts) > 3 else "", }) else: rows.append({"raw": line}) payload["sensors"] = rows payload["sensor_count"] = len(rows) return json.dumps(payload, default=str)