"""IPMI / BMC control via the system ``ipmitool`` binary.
Uses ``-I lanplus`` over the network. Requires ``ipmitool`` installed on the bot host.
**Security:** requires ``UNSANDBOXED_EXEC`` — bare-metal power and sensor access.
Commands are built as argv lists (no shell) to avoid injection.
"""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
import shutil
from typing import Any
from tools._credential_profile_store import (
delete_profile as _cred_delete,
list_profile_names as _cred_list,
load_profile as _cred_load,
merge_profile as _cred_merge,
save_profile as _cred_save,
)
logger = logging.getLogger(__name__)
CRED_PREFIX = "ipmi"
_MAX_STDOUT = 200_000
_MAX_STDERR = 20_000
_VALID_ACTIONS = frozenset(
{
"power_status",
"power_on",
"power_off",
"power_reset",
"power_cycle",
"sensor_read",
}
)
_POWER_SUBCOMMANDS = {
"power_status": ("power", "status"),
"power_on": ("power", "on"),
"power_off": ("power", "off"),
"power_reset": ("power", "reset"),
"power_cycle": ("power", "cycle"),
}
async def _check_priv(ctx: Any, tool_name: str = "ipmi_control") -> str | None:
"""Authorize the caller for bare-metal IPMI access, gating on ``UNSANDBOXED_EXEC``.
Every IPMI handler in this module begins by calling this guard: ipmitool can
power-cycle physical hosts, so access is restricted to users holding the
dangerous ``UNSANDBOXED_EXEC`` privilege bit.
It imports ``PRIVILEGES`` and :func:`has_privilege` from
``tools.alter_privileges`` and reads ``ctx.redis``, ``ctx.config``, and
``ctx.user_id`` off the :class:`ToolContext`; the privilege check itself reads
the user's stored privilege mask from Redis. On denial it logs a ``SECURITY``
warning naming the user and tool. This function performs no IPMI work — it only
returns a refusal payload or ``None``.
Called by :func:`run` (the ``ipmi_control`` handler) and by
:func:`_ipmi_save_credentials`, :func:`_ipmi_list_credentials`, and
:func:`_ipmi_delete_credentials`; no external callers were found. Tests patch
this symbol to bypass the privilege gate.
Args:
ctx: The :class:`ToolContext`, providing ``redis``, ``config``, and
``user_id`` used for the privilege lookup.
tool_name: Label used only in the denial log line to identify which IPMI
tool was attempted. Defaults to ``"ipmi_control"``.
Returns:
str | None: ``None`` when the caller is authorized, otherwise a JSON
string ``{"success": False, "error": ...}`` describing why access was
refused (missing privilege, or the privilege system being unavailable).
"""
try:
from tools.alter_privileges import PRIVILEGES, has_privilege
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
user_id = getattr(ctx, "user_id", "") or ""
if not await has_privilege(
redis,
user_id,
PRIVILEGES["UNSANDBOXED_EXEC"],
config,
):
logger.warning(
"SECURITY: User %s attempted %s without UNSANDBOXED_EXEC",
user_id,
tool_name,
)
return json.dumps(
{
"success": False,
"error": (
"The user does not have the UNSANDBOXED_EXEC privilege. "
"Ask an admin to grant it with the alter_privileges tool."
),
}
)
except ImportError:
return json.dumps({"success": False, "error": "Privilege system unavailable."})
return None
def _bad_token(s: str) -> bool:
"""Report whether a string contains control characters unsafe for an argv token.
Used to reject hosts and usernames that embed newlines, carriage returns, or
NUL bytes before they reach the ipmitool argv list. Commands are built as argv
lists rather than shell strings, so this guards against malformed/spoofed
arguments rather than shell injection.
Called by :func:`_host_ok` and :func:`_user_ok` within this module; no
external callers were found.
Args:
s: The candidate token to inspect (typically a host or username).
Returns:
bool: ``True`` if ``s`` contains a newline, carriage return, or NUL byte;
``False`` for an empty string or one free of those characters.
"""
if not s:
return False
return any(c in s for c in "\n\r\x00")
def _host_ok(host: str) -> bool:
"""Validate a BMC host/IP token before it is placed in the ipmitool argv.
Accepts a non-empty value up to 512 characters that contains no unsafe
control characters, delegating the character check to :func:`_bad_token`.
Called by :func:`run` and :func:`_ipmi_save_credentials` to reject bad hosts
before invoking ipmitool or saving a credential profile; no external callers
were found.
Args:
host: The BMC IP address or hostname to validate (leading/trailing
whitespace is stripped before checking).
Returns:
bool: ``True`` if the trimmed host is non-empty, at most 512 characters,
and free of control characters; ``False`` otherwise.
"""
h = (host or "").strip()
if not h or len(h) > 512 or _bad_token(h):
return False
return True
def _user_ok(user: str) -> bool:
"""Validate an IPMI username token before it is placed in the ipmitool argv.
Accepts a non-empty value up to 256 characters that contains no unsafe
control characters, delegating the character check to :func:`_bad_token`.
Called by :func:`run` and :func:`_ipmi_save_credentials` to reject bad
usernames before invoking ipmitool or saving a credential profile; no
external callers were found.
Args:
user: The IPMI username to validate (leading/trailing whitespace is
stripped before checking).
Returns:
bool: ``True`` if the trimmed username is non-empty, at most 256
characters, and free of control characters; ``False`` otherwise.
"""
u = (user or "").strip()
if not u or len(u) > 256 or _bad_token(u):
return False
return True
def _ipmitool_available() -> bool:
"""Report whether the ``ipmitool`` binary is present on the host ``PATH``.
Uses :func:`shutil.which` to probe for the executable so that :func:`run`
can return a clean error instead of failing inside subprocess creation when
the binary is not installed.
Called by :func:`run` before building/executing any command; no external
callers were found.
Returns:
bool: ``True`` if ``ipmitool`` is resolvable on ``PATH``, else ``False``.
"""
return shutil.which("ipmitool") is not None
def _build_cmd(host: str, user: str, password: str, action: str) -> list[str]:
"""Assemble the ``ipmitool`` argv list for a validated action.
Builds a ``-I lanplus`` over-the-network invocation with the host, user, and
password flags, then appends the action-specific subcommand: a power verb from
:data:`_POWER_SUBCOMMANDS`, or ``sensor`` for ``sensor_read``. The argv form
(no shell) is what keeps untrusted values from being interpreted as shell
syntax.
Called by :func:`run` after the action and credentials have already passed
validation; no external callers were found.
Args:
host: The BMC host/IP (whitespace-stripped into the argv).
user: The IPMI username (whitespace-stripped into the argv).
password: The IPMI password, passed verbatim as the ``-P`` argument.
action: One of the keys in :data:`_POWER_SUBCOMMANDS` or ``"sensor_read"``.
Returns:
list[str]: The complete ipmitool argument vector, ready for
:func:`_run_ipmitool`.
Raises:
ValueError: If ``action`` is neither a known power subcommand nor
``"sensor_read"``.
"""
base = [
"ipmitool",
"-I",
"lanplus",
"-H",
host.strip(),
"-U",
user.strip(),
"-P",
password,
]
if action in _POWER_SUBCOMMANDS:
base.extend(_POWER_SUBCOMMANDS[action])
elif action == "sensor_read":
base.extend(["sensor"])
else:
raise ValueError(f"unknown action: {action}")
return base
async def _run_ipmitool(argv: list[str], timeout: int = 120) -> tuple[int, str, str]:
"""Execute an ipmitool argv asynchronously and capture its result.
Spawns the subprocess with :func:`asyncio.create_subprocess_exec` (no shell),
awaits completion under a timeout, and decodes stdout/stderr as UTF-8 with
replacement, truncating them to :data:`_MAX_STDOUT` / :data:`_MAX_STDERR`. On
timeout the process is killed and reaped. This is the only place in the module
that touches the operating system / spawns a process — its side effect is the
real BMC interaction (e.g. powering a host on or off).
Called by :func:`run` with the argv produced by :func:`_build_cmd`; no
external callers were found.
Args:
argv: The full command vector to execute (first element is ``ipmitool``).
timeout: Seconds to wait for completion before killing the process.
Defaults to ``120``.
Returns:
tuple[int, str, str]: ``(exit_code, stdout, stderr)``. On timeout the exit
code is ``-1``, stdout is empty, and stderr is ``"ipmitool timed out."``;
a ``None`` return code from the process is normalized to ``-1``.
"""
proc = await asyncio.create_subprocess_exec(
*argv,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return -1, "", "ipmitool timed out."
code = proc.returncode if proc.returncode is not None else -1
out = stdout.decode("utf-8", errors="replace")[:_MAX_STDOUT]
err = stderr.decode("utf-8", errors="replace")[:_MAX_STDERR]
return code, out, err
TOOL_NAME = "ipmi_control"
TOOL_DESCRIPTION = (
"Generic IPMI over the network: power status/on/off/reset/cycle and full sensor "
"listing via the system ipmitool binary (-I lanplus). Use this for standards-based "
"IPMI when you do not need Redfish/HTTPS, Dell racadm, or Supermicro SMCIPMITool. "
"For HTTPS Redfish (HPE iLO, Dell iDRAC, Lenovo XCC, many BMCs), use bmc_redfish; "
"for Dell racadm-only features, use idrac_racadm; for Supermicro SMCIPMITool, use "
"smc_supermicro. Requires UNSANDBOXED_EXEC and ipmitool on the host."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": "BMC IP address or hostname.",
},
"user": {
"type": "string",
"description": "IPMI username.",
},
"password": {
"type": "string",
"description": "IPMI password.",
},
"credential_profile": {
"type": "string",
"default": "",
"description": "Load host, user, password from a saved encrypted profile.",
},
"action": {
"type": "string",
"description": (
"power_status, power_on, power_off, power_reset, power_cycle, or "
"sensor_read (full sensor listing)."
),
"enum": sorted(_VALID_ACTIONS),
},
},
"required": ["action"],
}
async def _ipmi_save_credentials(
host: str,
user: str,
password: str,
profile: str = "default",
ctx: Any = None,
) -> str:
"""Validate and persist IPMI host/user/password as an encrypted credential profile.
Handler for the ``ipmi_save_credentials`` tool. After enforcing the
``UNSANDBOXED_EXEC`` privilege and validating the inputs, it stores the three
fields under a named profile so later ``ipmi_control`` calls can supply only
``credential_profile``.
It calls :func:`_check_priv` for authorization, validates with :func:`_host_ok`
and :func:`_user_ok`, and delegates persistence to ``_cred_save``
(``tools._credential_profile_store.save_profile``), which AES-GCM-encrypts the
JSON with the per-user key and writes it to the Redis hash
``stargazer:ipmi_credentials:{user_id}`` under field ``profile``.
Registered in :data:`TOOLS` as the ``ipmi_save_credentials`` handler and
dispatched by the tool loader/executor; no direct internal callers were found.
Tests invoke it after patching :func:`_check_priv`.
Args:
host: BMC IP address or hostname to store.
user: IPMI username to store.
password: IPMI password to store (required, must be non-empty).
profile: Name of the profile slot to write. Defaults to ``"default"``.
ctx: The :class:`ToolContext` (provides ``redis``/``user_id``/``config``);
if ``None`` the call fails early.
Returns:
str: A JSON string. ``{"success": False, "error": ...}`` for a missing
context, denied privilege, or invalid/empty host/user/password; otherwise
the JSON result produced by the credential store's save path.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "ipmi_save_credentials")
if auth_err:
return auth_err
if not _host_ok(host):
return json.dumps({"success": False, "error": "Invalid or empty host."})
if not _user_ok(user):
return json.dumps({"success": False, "error": "Invalid or empty user."})
if password is None or str(password) == "":
return json.dumps({"success": False, "error": "password is required."})
data = {
"host": host.strip(),
"user": user.strip(),
"password": str(password),
}
return await _cred_save(CRED_PREFIX, profile, data, ctx)
async def _ipmi_list_credentials(ctx: Any = None) -> str:
"""List the names of the caller's saved IPMI credential profiles.
Handler for the ``ipmi_list_credentials`` tool. Returns only the profile
names (never the decrypted secrets), after enforcing the ``UNSANDBOXED_EXEC``
privilege.
It calls :func:`_check_priv` for authorization and delegates to ``_cred_list``
(``tools._credential_profile_store.list_profile_names``), which reads the
field names of the Redis hash ``stargazer:ipmi_credentials:{user_id}``.
Registered in :data:`TOOLS` as the ``ipmi_list_credentials`` handler and
dispatched by the tool loader/executor; no direct internal callers were found.
Args:
ctx: The :class:`ToolContext` (provides ``redis``/``user_id``); if
``None`` the call fails early.
Returns:
str: A JSON string. ``{"success": False, "error": ...}`` for a missing
context or denied privilege; otherwise the credential store's JSON listing
of profile names and their count.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "ipmi_list_credentials")
if auth_err:
return auth_err
return await _cred_list(CRED_PREFIX, ctx)
async def _ipmi_delete_credentials(profile: str = "default", ctx: Any = None) -> str:
"""Delete one of the caller's saved IPMI credential profiles.
Handler for the ``ipmi_delete_credentials`` tool. After enforcing the
``UNSANDBOXED_EXEC`` privilege, it removes the named profile slot.
It calls :func:`_check_priv` for authorization and delegates to
``_cred_delete`` (``tools._credential_profile_store.delete_profile``), which
issues an ``HDEL`` against the Redis hash
``stargazer:ipmi_credentials:{user_id}`` for field ``profile``.
Registered in :data:`TOOLS` as the ``ipmi_delete_credentials`` handler and
dispatched by the tool loader/executor; no direct internal callers were found.
Args:
profile: Name of the profile to delete. Defaults to ``"default"``.
ctx: The :class:`ToolContext` (provides ``redis``/``user_id``); if
``None`` the call fails early.
Returns:
str: A JSON string. ``{"success": False, "error": ...}`` for a missing
context or denied privilege; otherwise the credential store's JSON result
indicating whether a profile was deleted.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "ipmi_delete_credentials")
if auth_err:
return auth_err
return await _cred_delete(CRED_PREFIX, profile, ctx)
[docs]
async def run(
host: str,
user: str,
password: str,
action: str,
ctx: Any = None,
*,
credential_profile: str = "",
) -> str:
"""Execute an IPMI power or sensor action against a BMC and return its result.
Handler for the ``ipmi_control`` tool — the main entry point of this module.
It performs the privileged, real-world side effect of talking to bare-metal
hardware: querying or changing a host's power state, or reading its full
sensor list, over the network via the system ipmitool binary.
The flow gates on :func:`_check_priv` (``UNSANDBOXED_EXEC`` required), then —
when ``credential_profile`` is given — loads a saved encrypted bundle with
``_cred_load`` and overlays explicit args via ``_cred_merge`` so stored
host/user/password can fill in the blanks. It confirms ipmitool is installed
(:func:`_ipmitool_available`), validates the action against
:data:`_VALID_ACTIONS`, and validates the host/user/password with
:func:`_host_ok` / :func:`_user_ok`. It then builds the argv with
:func:`_build_cmd` and runs it through :func:`_run_ipmitool` (no shell). For
``sensor_read`` it additionally parses the pipe-delimited sensor table into
structured rows. Dispatched by the tool runtime as the ``ipmi_control``
handler (registered in :data:`TOOLS`); no direct internal callers were found.
Args:
host (str): BMC IP address or hostname (may be supplied by the profile).
user (str): IPMI username (may be supplied by the profile).
password (str): IPMI password (may be supplied by the profile).
action (str): One of :data:`_VALID_ACTIONS` — a power verb or
``sensor_read``.
ctx (Any): The :class:`ToolContext` for the privilege check and credential
store; ``None`` fails the call.
credential_profile (str): Optional saved profile name whose
host/user/password are merged in (explicit args take precedence).
Returns:
str: A JSON string with ``success``, ``action``, ``exit_code``, ``stdout``
and ``stderr`` (plus parsed ``sensors``/``sensor_count`` for a successful
``sensor_read``), or ``{"success": False, "error": ...}`` on a privilege,
validation, or environment failure.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
if credential_profile and str(credential_profile).strip():
loaded = await _cred_load(CRED_PREFIX, credential_profile.strip(), ctx)
if isinstance(loaded, str):
return json.dumps({"success": False, "error": loaded})
merged = _cred_merge(
loaded,
{"host": host, "user": user, "password": password},
)
host = str(merged.get("host") or "")
user = str(merged.get("user") or "")
password = str(merged.get("password") or "")
if not _ipmitool_available():
return json.dumps(
{
"success": False,
"error": "ipmitool not found in PATH.",
}
)
act = (action or "").strip()
if act not in _VALID_ACTIONS:
return json.dumps(
{
"success": False,
"error": f"Invalid action. Must be one of: {', '.join(sorted(_VALID_ACTIONS))}.",
}
)
if not _host_ok(host):
return json.dumps({"success": False, "error": "Invalid or empty host."})
if not _user_ok(user):
return json.dumps({"success": False, "error": "Invalid or empty user."})
if password is None or str(password) == "":
return json.dumps({"success": False, "error": "password is required."})
try:
argv = _build_cmd(host, user, str(password), act)
except ValueError as exc:
return json.dumps({"success": False, "error": str(exc)})
code, out, err = await _run_ipmitool(argv)
payload: dict[str, Any] = {
"success": code == 0,
"action": act,
"exit_code": code,
"stdout": out.rstrip(),
"stderr": err.rstrip(),
}
if act == "sensor_read" and code == 0:
rows: list[dict[str, str]] = []
for line in out.splitlines():
line = line.rstrip()
if not line or line.startswith("Locating") or line.startswith("Sensor"):
continue
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 3:
rows.append(
{
"name": parts[0],
"value": parts[1],
"unit": parts[2],
"status": parts[3] if len(parts) > 3 else "",
}
)
else:
rows.append({"raw": line})
payload["sensors"] = rows
payload["sensor_count"] = len(rows)
return json.dumps(payload, default=str)
_TOOLS_IPMI_DESC = (
TOOL_DESCRIPTION
+ " credential_profile on ipmi_control loads saved host/user/password."
)
TOOLS = [
{
"name": "ipmi_save_credentials",
"description": "Save IPMI/BMC host, user, and password encrypted per-user. Requires UNSANDBOXED_EXEC.",
"parameters": {
"type": "object",
"properties": {
"host": {"type": "string"},
"user": {"type": "string"},
"password": {"type": "string"},
"profile": {"type": "string", "default": "default"},
},
"required": ["host", "user", "password"],
},
"handler": _ipmi_save_credentials,
},
{
"name": "ipmi_list_credentials",
"description": "List saved IPMI profile names.",
"parameters": {"type": "object", "properties": {}},
"handler": _ipmi_list_credentials,
},
{
"name": "ipmi_delete_credentials",
"description": "Delete a saved IPMI profile.",
"parameters": {
"type": "object",
"properties": {"profile": {"type": "string", "default": "default"}},
"required": ["profile"],
},
"handler": _ipmi_delete_credentials,
},
{
"name": "ipmi_control",
"description": _TOOLS_IPMI_DESC,
"parameters": TOOL_PARAMETERS,
"handler": run,
},
]