"""IPMI / BMC control via the system ``ipmitool`` binary.
Uses ``-I lanplus`` over the network. Requires ``ipmitool`` installed on the bot host.
**Security:** requires ``UNSANDBOXED_EXEC`` — bare-metal power and sensor access.
Commands are built as argv lists (no shell) to avoid injection.
"""
from __future__ import annotations
import asyncio
import json
import logging
import shutil
from typing import Any
logger = logging.getLogger(__name__)
_MAX_STDOUT = 200_000
_MAX_STDERR = 20_000
_VALID_ACTIONS = frozenset({
"power_status",
"power_on",
"power_off",
"power_reset",
"power_cycle",
"sensor_read",
})
_POWER_SUBCOMMANDS = {
"power_status": ("power", "status"),
"power_on": ("power", "on"),
"power_off": ("power", "off"),
"power_reset": ("power", "reset"),
"power_cycle": ("power", "cycle"),
}
async def _check_priv(ctx: Any) -> str | None:
try:
from tools.alter_privileges import PRIVILEGES, has_privilege
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
user_id = getattr(ctx, "user_id", "") or ""
if not await has_privilege(
redis, user_id, PRIVILEGES["UNSANDBOXED_EXEC"], config,
):
logger.warning(
"SECURITY: User %s attempted ipmi_control without UNSANDBOXED_EXEC",
user_id,
)
return json.dumps({
"success": False,
"error": (
"The user does not have the UNSANDBOXED_EXEC privilege. "
"Ask an admin to grant it with the alter_privileges tool."
),
})
except ImportError:
return json.dumps({"success": False, "error": "Privilege system unavailable."})
return None
def _bad_token(s: str) -> bool:
if not s:
return False
return any(c in s for c in "\n\r\x00")
def _host_ok(host: str) -> bool:
h = (host or "").strip()
if not h or len(h) > 512 or _bad_token(h):
return False
return True
def _user_ok(user: str) -> bool:
u = (user or "").strip()
if not u or len(u) > 256 or _bad_token(u):
return False
return True
def _ipmitool_available() -> bool:
return shutil.which("ipmitool") is not None
def _build_cmd(host: str, user: str, password: str, action: str) -> list[str]:
base = [
"ipmitool",
"-I", "lanplus",
"-H", host.strip(),
"-U", user.strip(),
"-P", password,
]
if action in _POWER_SUBCOMMANDS:
base.extend(_POWER_SUBCOMMANDS[action])
elif action == "sensor_read":
base.extend(["sensor"])
else:
raise ValueError(f"unknown action: {action}")
return base
async def _run_ipmitool(argv: list[str], timeout: int = 120) -> tuple[int, str, str]:
proc = await asyncio.create_subprocess_exec(
*argv,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return -1, "", "ipmitool timed out."
code = proc.returncode if proc.returncode is not None else -1
out = stdout.decode("utf-8", errors="replace")[:_MAX_STDOUT]
err = stderr.decode("utf-8", errors="replace")[:_MAX_STDERR]
return code, out, err
TOOL_NAME = "ipmi_control"
TOOL_DESCRIPTION = (
"Control a machine via IPMI (BMC): power status/on/off/reset/cycle, or read "
"sensors. Runs the system ipmitool with -I lanplus. Requires UNSANDBOXED_EXEC "
"and ipmitool installed on the host."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": "BMC IP address or hostname.",
},
"user": {
"type": "string",
"description": "IPMI username.",
},
"password": {
"type": "string",
"description": "IPMI password.",
},
"action": {
"type": "string",
"description": (
"power_status, power_on, power_off, power_reset, power_cycle, or "
"sensor_read (full sensor listing)."
),
"enum": sorted(_VALID_ACTIONS),
},
},
"required": ["host", "user", "password", "action"],
}
[docs]
async def run(
host: str,
user: str,
password: str,
action: str,
ctx: Any = None,
) -> str:
"""Run ipmitool and return JSON with stdout/stderr and exit code."""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
if not _ipmitool_available():
return json.dumps({
"success": False,
"error": "ipmitool not found in PATH.",
})
act = (action or "").strip()
if act not in _VALID_ACTIONS:
return json.dumps({
"success": False,
"error": f"Invalid action. Must be one of: {', '.join(sorted(_VALID_ACTIONS))}.",
})
if not _host_ok(host):
return json.dumps({"success": False, "error": "Invalid or empty host."})
if not _user_ok(user):
return json.dumps({"success": False, "error": "Invalid or empty user."})
if password is None or str(password) == "":
return json.dumps({"success": False, "error": "password is required."})
try:
argv = _build_cmd(host, user, str(password), act)
except ValueError as exc:
return json.dumps({"success": False, "error": str(exc)})
code, out, err = await _run_ipmitool(argv)
payload: dict[str, Any] = {
"success": code == 0,
"action": act,
"exit_code": code,
"stdout": out.rstrip(),
"stderr": err.rstrip(),
}
if act == "sensor_read" and code == 0:
rows: list[dict[str, str]] = []
for line in out.splitlines():
line = line.rstrip()
if not line or line.startswith("Locating") or line.startswith("Sensor"):
continue
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 3:
rows.append({
"name": parts[0],
"value": parts[1],
"unit": parts[2],
"status": parts[3] if len(parts) > 3 else "",
})
else:
rows.append({"raw": line})
payload["sensors"] = rows
payload["sensor_count"] = len(rows)
return json.dumps(payload, default=str)